前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习一-一些典型的使用和套路

pmq学习一-一些典型的使用和套路

作者头像
路行的亚洲
发布2021-01-05 12:34:12
7960
发布2021-01-05 12:34:12
举报
文章被收录于专栏:后端技术学习后端技术学习

pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。

pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。

首先安装好maven、mysql,对下载下拉的包进行打包:

如果遇到时区问题,则可以调整时区问题。

1.MqBootstrapListener 观察者模式的使用

代码语言:javascript
复制
/**
 * mqBootstrap监听 实现 上下文监听 观察者模式
 */
@Component
public class MqBootstrapListener implements ApplicationListener<ContextRefreshedEvent>, Ordered {
   private static final Logger log = LoggerFactory.getLogger(MqBootstrapListener.class);
   private static boolean isInit = false;
   @Autowired
   private ReportService reportService;
   @Override
   public int getOrder() {
      // TODO Auto-generated method stub
      return Ordered.LOWEST_PRECEDENCE;
   }

   //使用观察者模式重写onApplicationEvent方法
   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      //判断是否初始化了,如果初始化,则启动定时任务
      if (!isInit) {
         try {
            startTimer();
            startPortalTimer();
            //注册report
            reportService.registerReport();
            //将其进行初始化
            isInit = true;
            log.info("mq初始化成功!");
         } catch (Exception e) {
            log.error("mq初始化异常", e);
            throw e;
         }
      }

   }


    //启动portal定时任务
   private void startPortalTimer() {
      Map<String, PortalTimerService> startedServices = SpringUtil.getBeans(PortalTimerService.class);
      if (startedServices != null) {
         startedServices.entrySet().forEach(t1 -> {
            try {
               t1.getValue().startPortal();
               log.info(t1.getKey() + "启动完成!");
            } catch (Exception e) {
               log.error(t1.getKey() + "启动异常!", e);
            }
         });
      }

   }

   //启动定时任务
   private void startTimer() {
      Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
      if (startedServices != null) {
         startedServices.entrySet().forEach(t1 -> {
            try {
               t1.getValue().start();
               log.info(t1.getKey() + "启动完成!");
            } catch (Exception e) {
               log.error(t1.getKey() + "启动异常!", e);
            }
         });
      }
   }

}

观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。

2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。

代码语言:javascript
复制
/**
 * 数据report服务
 */
@Component
public class DbReportService {
   //消息服务
   @Autowired
   private Message01Service message01Service;
   //数据节点服务
   @Autowired
   private DbNodeService dbNodeService;

   //原子对象 map
   private AtomicReference<Map<String, Integer>> conMapRef = new AtomicReference<Map<String, Integer>>(
         new HashMap<>());
   //线程池
   private ExecutorService executorService = Executors
         .newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true));

   //运行为true
   private volatile boolean isRunning = true;
   //度量指标
   private volatile Map<String, Boolean> metricMap = new ConcurrentHashMap<>();

   //启动标识
   private AtomicBoolean startFlag = new AtomicBoolean(false);

   //后置构造注解
   @PostConstruct
   public void report() {
      //使用cas进行比较
      if (startFlag.compareAndSet(false, true)) {
         //线程池服务提交服务
         executorService.submit(new Runnable() {
            @Override
            public void run() {
               while (isRunning) {
                  Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
                  Map<String, DbNodeEntity> dataSourceMap = new HashMap<>();
                  Map<String, Integer> conMap = new HashMap<>();
                  try {
                     for (long dbId : dbNodeMap.keySet()) {
                        if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) {
                           dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId));
                        }
                     }
                     for (String ip : dataSourceMap.keySet()) {
                        message01Service.setDbId(dataSourceMap.get(ip).getId());
                        //连接计数
                        int conCount = message01Service.getConnectionsCount();
                        conMap.put(ip, conCount);
                        if (!metricMap.containsKey(ip)) {
                           //System.out.println(ip);
                           metricMap.put(ip, true);
                           //度量单例 获取度量注册
                           MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip=" + ip,
                                 new Gauge<Integer>() {
                                    @Override
                                    public Integer getValue() {
                                       if (conMapRef.get().containsKey(ip)) {
                                          return conMapRef.get().get(ip);
                                       } else {
                                          return 0;
                                       }
                                    }
                                 });
                        }

                     }
                     conMapRef.set(conMap);
                  } catch (Exception e) {
                     // TODO: handle exception
                  }
                  Util.sleep(10000);
               }
            }
         });
      }
   }

   //销毁前将运行状态变成false,同时将线程池关闭
   @PreDestroy
   private void close() {
      isRunning = false;
      executorService.shutdown();
   }
}

3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。

代码语言:javascript
复制
/**
 * 权限过滤器
 */
@Order(1)
@WebFilter(filterName = "WebAuthFilter", urlPatterns = "/*")
public class AuthFilter implements Filter {
   Logger log = LoggerFactory.getLogger(this.getClass().getName());

   @Autowired
   private Message01Service message01Service;

   @Autowired
   UserInfoHolder userInfoHolder;

   @Override
   public void init(FilterConfig filterConfig) throws ServletException {
   }

   @Override
   public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
         throws IOException, ServletException {
      HttpServletRequest request = (HttpServletRequest) req;
      HttpServletResponse response = (HttpServletResponse) resp;
      String uri = request.getRequestURI();
      if (skipUri(uri)) {
         chain.doFilter(request, response);
      } else {
         try {
            Cookie cookie = CookieUtil.getCookie(request, "userSessionId");
            if (cookie == null) {
               response.sendRedirect("/login");
            } else {
               String userId = DesUtil.decrypt(cookie.getValue());
               userInfoHolder.setUserId(userId);
               chain.doFilter(request, response);
            }

         } catch (Exception e) {
            log.error("login fail", e);
            response.sendRedirect("/login");
         } finally {
            message01Service.clearDbId();
            userInfoHolder.clear();
         }
      }
   }

   private List<String> skipUrlLst = new ArrayList<>();

   public AuthFilter() {     
      skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic");
   }

   private boolean skipUri(String uri) {
      for(String t : skipUrlLst){
         if(uri.indexOf(t)!=-1){
            return true;
         }
      }
      return false;
   }

   @Override
   public void destroy() {
      // TODO Auto-generated method stub

   }

}

4.获取用户信息,使用threadLocal

代码语言:javascript
复制
/**
 * 默认用户信息holder
 */
@Service
public class DefaultUserInfoHolder implements UserInfoHolder {
   @Autowired
   private UserProviderService userProviderService;
   //使用threadLocal,用户idLocal
   private ThreadLocal<String> userIdLocal = new ThreadLocal<>();

   //获取用户信息
   @Override
   public UserInfo getUser() {
      String userId = userIdLocal.get();
      Map<String, UserInfo> mapUser = userProviderService.getUsers();
      if (mapUser.containsKey(userId)) {
         return mapUser.get(userId);
      }
      return null;
   }

   //获取用户id
   @Override
   public String getUserId() {
      return userIdLocal.get();

   }

   //设置用户id
   @Override
   public void setUserId(String userId) {
      userIdLocal.set(userId);

   }

   //执行清理操作Thread
   @Override
   public void clear() {
      userIdLocal.remove();
   }
}

这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。

5.cookie工具类:

代码语言:javascript
复制
/**
 * cookie工具类
 */
public class CookieUtil {
    //获取用户名称
    public static String getUserName(HttpServletRequest request){
        //获取一个cookie数组
        Cookie[] cookies = request.getCookies();
        String userName="";
        if (cookies!=null) {
            //获取cookie
            Cookie cookie = getCookie(request, "userSessionId");
            if (cookie == null) {
                return "";
            }
            try {
                //返回对数据进行DES加密的数据
                return DesUtil.decrypt(cookie.getValue());
            } catch (Exception e) {
                return "";
            }

        }
        return userName;
    }

    //获取cookie
    public static Cookie getCookie(HttpServletRequest request, String key) {
        Cookie[] cks = request.getCookies();
        if (cks != null) {
            for (Cookie temp : cks) {
                if (temp.getName().equals(key))
                    return temp;
            }
        }
        return null;
    }
}

常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档