pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。
pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。
首先安装好maven、mysql,对下载下拉的包进行打包:
如果遇到时区问题,则可以调整时区问题。
1.MqBootstrapListener 观察者模式的使用
/** * mqBootstrap监听 实现 上下文监听 观察者模式 */ @Component public class MqBootstrapListener implements ApplicationListener<ContextRefreshedEvent>, Ordered { private static final Logger log = LoggerFactory.getLogger(MqBootstrapListener.class); private static boolean isInit = false; @Autowired private ReportService reportService; @Override public int getOrder() { // TODO Auto-generated method stub return Ordered.LOWEST_PRECEDENCE; } //使用观察者模式重写onApplicationEvent方法 @Override public void onApplicationEvent(ContextRefreshedEvent event) { //判断是否初始化了,如果初始化,则启动定时任务 if (!isInit) { try { startTimer(); startPortalTimer(); //注册report reportService.registerReport(); //将其进行初始化 isInit = true; log.info("mq初始化成功!"); } catch (Exception e) { log.error("mq初始化异常", e); throw e; } } } //启动portal定时任务 private void startPortalTimer() { Map<String, PortalTimerService> startedServices = SpringUtil.getBeans(PortalTimerService.class); if (startedServices != null) { startedServices.entrySet().forEach(t1 -> { try { t1.getValue().startPortal(); log.info(t1.getKey() + "启动完成!"); } catch (Exception e) { log.error(t1.getKey() + "启动异常!", e); } }); } } //启动定时任务 private void startTimer() { Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class); if (startedServices != null) { startedServices.entrySet().forEach(t1 -> { try { t1.getValue().start(); log.info(t1.getKey() + "启动完成!"); } catch (Exception e) { log.error(t1.getKey() + "启动异常!", e); } }); } } }
观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。
2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。
/** * 数据report服务 */ @Component public class DbReportService { //消息服务 @Autowired private Message01Service message01Service; //数据节点服务 @Autowired private DbNodeService dbNodeService; //原子对象 map private AtomicReference<Map<String, Integer>> conMapRef = new AtomicReference<Map<String, Integer>>( new HashMap<>()); //线程池 private ExecutorService executorService = Executors .newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true)); //运行为true private volatile boolean isRunning = true; //度量指标 private volatile Map<String, Boolean> metricMap = new ConcurrentHashMap<>(); //启动标识 private AtomicBoolean startFlag = new AtomicBoolean(false); //后置构造注解 @PostConstruct public void report() { //使用cas进行比较 if (startFlag.compareAndSet(false, true)) { //线程池服务提交服务 executorService.submit(new Runnable() { @Override public void run() { while (isRunning) { Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache(); Map<String, DbNodeEntity> dataSourceMap = new HashMap<>(); Map<String, Integer> conMap = new HashMap<>(); try { for (long dbId : dbNodeMap.keySet()) { if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) { dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId)); } } for (String ip : dataSourceMap.keySet()) { message01Service.setDbId(dataSourceMap.get(ip).getId()); //连接计数 int conCount = message01Service.getConnectionsCount(); conMap.put(ip, conCount); if (!metricMap.containsKey(ip)) { //System.out.println(ip); metricMap.put(ip, true); //度量单例 获取度量注册 MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip=" + ip, new Gauge<Integer>() { @Override public Integer getValue() { if (conMapRef.get().containsKey(ip)) { return conMapRef.get().get(ip); } else { return 0; } } }); } } conMapRef.set(conMap); } catch (Exception e) { // TODO: handle exception } Util.sleep(10000); } } }); } } //销毁前将运行状态变成false,同时将线程池关闭 @PreDestroy private void close() { isRunning = false; executorService.shutdown(); } }
3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。
/** * 权限过滤器 */ @Order(1) @WebFilter(filterName = "WebAuthFilter", urlPatterns = "/*") public class AuthFilter implements Filter { Logger log = LoggerFactory.getLogger(this.getClass().getName()); @Autowired private Message01Service message01Service; @Autowired UserInfoHolder userInfoHolder; @Override public void init(FilterConfig filterConfig) throws ServletException { } @Override public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException { HttpServletRequest request = (HttpServletRequest) req; HttpServletResponse response = (HttpServletResponse) resp; String uri = request.getRequestURI(); if (skipUri(uri)) { chain.doFilter(request, response); } else { try { Cookie cookie = CookieUtil.getCookie(request, "userSessionId"); if (cookie == null) { response.sendRedirect("/login"); } else { String userId = DesUtil.decrypt(cookie.getValue()); userInfoHolder.setUserId(userId); chain.doFilter(request, response); } } catch (Exception e) { log.error("login fail", e); response.sendRedirect("/login"); } finally { message01Service.clearDbId(); userInfoHolder.clear(); } } } private List<String> skipUrlLst = new ArrayList<>(); public AuthFilter() { skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic"); } private boolean skipUri(String uri) { for(String t : skipUrlLst){ if(uri.indexOf(t)!=-1){ return true; } } return false; } @Override public void destroy() { // TODO Auto-generated method stub } }
4.获取用户信息,使用threadLocal
/** * 默认用户信息holder */ @Service public class DefaultUserInfoHolder implements UserInfoHolder { @Autowired private UserProviderService userProviderService; //使用threadLocal,用户idLocal private ThreadLocal<String> userIdLocal = new ThreadLocal<>(); //获取用户信息 @Override public UserInfo getUser() { String userId = userIdLocal.get(); Map<String, UserInfo> mapUser = userProviderService.getUsers(); if (mapUser.containsKey(userId)) { return mapUser.get(userId); } return null; } //获取用户id @Override public String getUserId() { return userIdLocal.get(); } //设置用户id @Override public void setUserId(String userId) { userIdLocal.set(userId); } //执行清理操作Thread @Override public void clear() { userIdLocal.remove(); } }
这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。
5.cookie工具类:
/** * cookie工具类 */ public class CookieUtil { //获取用户名称 public static String getUserName(HttpServletRequest request){ //获取一个cookie数组 Cookie[] cookies = request.getCookies(); String userName=""; if (cookies!=null) { //获取cookie Cookie cookie = getCookie(request, "userSessionId"); if (cookie == null) { return ""; } try { //返回对数据进行DES加密的数据 return DesUtil.decrypt(cookie.getValue()); } catch (Exception e) { return ""; } } return userName; } //获取cookie public static Cookie getCookie(HttpServletRequest request, String key) { Cookie[] cks = request.getCookies(); if (cks != null) { for (Cookie temp : cks) { if (temp.getName().equals(key)) return temp; } } return null; } }
常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!
本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2020-12-22
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句