pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。
pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。
首先安装好maven、mysql,对下载下拉的包进行打包:
如果遇到时区问题,则可以调整时区问题。
1.MqBootstrapListener 观察者模式的使用
/**
* mqBootstrap监听 实现 上下文监听 观察者模式
*/
@Component
public class MqBootstrapListener implements ApplicationListener<ContextRefreshedEvent>, Ordered {
private static final Logger log = LoggerFactory.getLogger(MqBootstrapListener.class);
private static boolean isInit = false;
@Autowired
private ReportService reportService;
@Override
public int getOrder() {
// TODO Auto-generated method stub
return Ordered.LOWEST_PRECEDENCE;
}
//使用观察者模式重写onApplicationEvent方法
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//判断是否初始化了,如果初始化,则启动定时任务
if (!isInit) {
try {
startTimer();
startPortalTimer();
//注册report
reportService.registerReport();
//将其进行初始化
isInit = true;
log.info("mq初始化成功!");
} catch (Exception e) {
log.error("mq初始化异常", e);
throw e;
}
}
}
//启动portal定时任务
private void startPortalTimer() {
Map<String, PortalTimerService> startedServices = SpringUtil.getBeans(PortalTimerService.class);
if (startedServices != null) {
startedServices.entrySet().forEach(t1 -> {
try {
t1.getValue().startPortal();
log.info(t1.getKey() + "启动完成!");
} catch (Exception e) {
log.error(t1.getKey() + "启动异常!", e);
}
});
}
}
//启动定时任务
private void startTimer() {
Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
if (startedServices != null) {
startedServices.entrySet().forEach(t1 -> {
try {
t1.getValue().start();
log.info(t1.getKey() + "启动完成!");
} catch (Exception e) {
log.error(t1.getKey() + "启动异常!", e);
}
});
}
}
}
观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。
2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。
/**
* 数据report服务
*/
@Component
public class DbReportService {
//消息服务
@Autowired
private Message01Service message01Service;
//数据节点服务
@Autowired
private DbNodeService dbNodeService;
//原子对象 map
private AtomicReference<Map<String, Integer>> conMapRef = new AtomicReference<Map<String, Integer>>(
new HashMap<>());
//线程池
private ExecutorService executorService = Executors
.newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true));
//运行为true
private volatile boolean isRunning = true;
//度量指标
private volatile Map<String, Boolean> metricMap = new ConcurrentHashMap<>();
//启动标识
private AtomicBoolean startFlag = new AtomicBoolean(false);
//后置构造注解
@PostConstruct
public void report() {
//使用cas进行比较
if (startFlag.compareAndSet(false, true)) {
//线程池服务提交服务
executorService.submit(new Runnable() {
@Override
public void run() {
while (isRunning) {
Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
Map<String, DbNodeEntity> dataSourceMap = new HashMap<>();
Map<String, Integer> conMap = new HashMap<>();
try {
for (long dbId : dbNodeMap.keySet()) {
if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) {
dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId));
}
}
for (String ip : dataSourceMap.keySet()) {
message01Service.setDbId(dataSourceMap.get(ip).getId());
//连接计数
int conCount = message01Service.getConnectionsCount();
conMap.put(ip, conCount);
if (!metricMap.containsKey(ip)) {
//System.out.println(ip);
metricMap.put(ip, true);
//度量单例 获取度量注册
MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip=" + ip,
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (conMapRef.get().containsKey(ip)) {
return conMapRef.get().get(ip);
} else {
return 0;
}
}
});
}
}
conMapRef.set(conMap);
} catch (Exception e) {
// TODO: handle exception
}
Util.sleep(10000);
}
}
});
}
}
//销毁前将运行状态变成false,同时将线程池关闭
@PreDestroy
private void close() {
isRunning = false;
executorService.shutdown();
}
}
3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。
/**
* 权限过滤器
*/
@Order(1)
@WebFilter(filterName = "WebAuthFilter", urlPatterns = "/*")
public class AuthFilter implements Filter {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
@Autowired
private Message01Service message01Service;
@Autowired
UserInfoHolder userInfoHolder;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) resp;
String uri = request.getRequestURI();
if (skipUri(uri)) {
chain.doFilter(request, response);
} else {
try {
Cookie cookie = CookieUtil.getCookie(request, "userSessionId");
if (cookie == null) {
response.sendRedirect("/login");
} else {
String userId = DesUtil.decrypt(cookie.getValue());
userInfoHolder.setUserId(userId);
chain.doFilter(request, response);
}
} catch (Exception e) {
log.error("login fail", e);
response.sendRedirect("/login");
} finally {
message01Service.clearDbId();
userInfoHolder.clear();
}
}
}
private List<String> skipUrlLst = new ArrayList<>();
public AuthFilter() {
skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic");
}
private boolean skipUri(String uri) {
for(String t : skipUrlLst){
if(uri.indexOf(t)!=-1){
return true;
}
}
return false;
}
@Override
public void destroy() {
// TODO Auto-generated method stub
}
}
4.获取用户信息,使用threadLocal
/**
* 默认用户信息holder
*/
@Service
public class DefaultUserInfoHolder implements UserInfoHolder {
@Autowired
private UserProviderService userProviderService;
//使用threadLocal,用户idLocal
private ThreadLocal<String> userIdLocal = new ThreadLocal<>();
//获取用户信息
@Override
public UserInfo getUser() {
String userId = userIdLocal.get();
Map<String, UserInfo> mapUser = userProviderService.getUsers();
if (mapUser.containsKey(userId)) {
return mapUser.get(userId);
}
return null;
}
//获取用户id
@Override
public String getUserId() {
return userIdLocal.get();
}
//设置用户id
@Override
public void setUserId(String userId) {
userIdLocal.set(userId);
}
//执行清理操作Thread
@Override
public void clear() {
userIdLocal.remove();
}
}
这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。
5.cookie工具类:
/**
* cookie工具类
*/
public class CookieUtil {
//获取用户名称
public static String getUserName(HttpServletRequest request){
//获取一个cookie数组
Cookie[] cookies = request.getCookies();
String userName="";
if (cookies!=null) {
//获取cookie
Cookie cookie = getCookie(request, "userSessionId");
if (cookie == null) {
return "";
}
try {
//返回对数据进行DES加密的数据
return DesUtil.decrypt(cookie.getValue());
} catch (Exception e) {
return "";
}
}
return userName;
}
//获取cookie
public static Cookie getCookie(HttpServletRequest request, String key) {
Cookie[] cks = request.getCookies();
if (cks != null) {
for (Cookie temp : cks) {
if (temp.getName().equals(key))
return temp;
}
}
return null;
}
}
常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!