专栏首页后端技术学习pmq学习一-一些典型的使用和套路

pmq学习一-一些典型的使用和套路

pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。

pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。

首先安装好maven、mysql,对下载下拉的包进行打包:

如果遇到时区问题,则可以调整时区问题。

1.MqBootstrapListener 观察者模式的使用

/**
 * mqBootstrap监听 实现 上下文监听 观察者模式
 */
@Component
public class MqBootstrapListener implements ApplicationListener<ContextRefreshedEvent>, Ordered {
   private static final Logger log = LoggerFactory.getLogger(MqBootstrapListener.class);
   private static boolean isInit = false;
   @Autowired
   private ReportService reportService;
   @Override
   public int getOrder() {
      // TODO Auto-generated method stub
      return Ordered.LOWEST_PRECEDENCE;
   }

   //使用观察者模式重写onApplicationEvent方法
   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      //判断是否初始化了,如果初始化,则启动定时任务
      if (!isInit) {
         try {
            startTimer();
            startPortalTimer();
            //注册report
            reportService.registerReport();
            //将其进行初始化
            isInit = true;
            log.info("mq初始化成功!");
         } catch (Exception e) {
            log.error("mq初始化异常", e);
            throw e;
         }
      }

   }


    //启动portal定时任务
   private void startPortalTimer() {
      Map<String, PortalTimerService> startedServices = SpringUtil.getBeans(PortalTimerService.class);
      if (startedServices != null) {
         startedServices.entrySet().forEach(t1 -> {
            try {
               t1.getValue().startPortal();
               log.info(t1.getKey() + "启动完成!");
            } catch (Exception e) {
               log.error(t1.getKey() + "启动异常!", e);
            }
         });
      }

   }

   //启动定时任务
   private void startTimer() {
      Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
      if (startedServices != null) {
         startedServices.entrySet().forEach(t1 -> {
            try {
               t1.getValue().start();
               log.info(t1.getKey() + "启动完成!");
            } catch (Exception e) {
               log.error(t1.getKey() + "启动异常!", e);
            }
         });
      }
   }

}

观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。

2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。

/**
 * 数据report服务
 */
@Component
public class DbReportService {
   //消息服务
   @Autowired
   private Message01Service message01Service;
   //数据节点服务
   @Autowired
   private DbNodeService dbNodeService;

   //原子对象 map
   private AtomicReference<Map<String, Integer>> conMapRef = new AtomicReference<Map<String, Integer>>(
         new HashMap<>());
   //线程池
   private ExecutorService executorService = Executors
         .newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true));

   //运行为true
   private volatile boolean isRunning = true;
   //度量指标
   private volatile Map<String, Boolean> metricMap = new ConcurrentHashMap<>();

   //启动标识
   private AtomicBoolean startFlag = new AtomicBoolean(false);

   //后置构造注解
   @PostConstruct
   public void report() {
      //使用cas进行比较
      if (startFlag.compareAndSet(false, true)) {
         //线程池服务提交服务
         executorService.submit(new Runnable() {
            @Override
            public void run() {
               while (isRunning) {
                  Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
                  Map<String, DbNodeEntity> dataSourceMap = new HashMap<>();
                  Map<String, Integer> conMap = new HashMap<>();
                  try {
                     for (long dbId : dbNodeMap.keySet()) {
                        if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) {
                           dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId));
                        }
                     }
                     for (String ip : dataSourceMap.keySet()) {
                        message01Service.setDbId(dataSourceMap.get(ip).getId());
                        //连接计数
                        int conCount = message01Service.getConnectionsCount();
                        conMap.put(ip, conCount);
                        if (!metricMap.containsKey(ip)) {
                           //System.out.println(ip);
                           metricMap.put(ip, true);
                           //度量单例 获取度量注册
                           MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip=" + ip,
                                 new Gauge<Integer>() {
                                    @Override
                                    public Integer getValue() {
                                       if (conMapRef.get().containsKey(ip)) {
                                          return conMapRef.get().get(ip);
                                       } else {
                                          return 0;
                                       }
                                    }
                                 });
                        }

                     }
                     conMapRef.set(conMap);
                  } catch (Exception e) {
                     // TODO: handle exception
                  }
                  Util.sleep(10000);
               }
            }
         });
      }
   }

   //销毁前将运行状态变成false,同时将线程池关闭
   @PreDestroy
   private void close() {
      isRunning = false;
      executorService.shutdown();
   }
}

3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。

/**
 * 权限过滤器
 */
@Order(1)
@WebFilter(filterName = "WebAuthFilter", urlPatterns = "/*")
public class AuthFilter implements Filter {
   Logger log = LoggerFactory.getLogger(this.getClass().getName());

   @Autowired
   private Message01Service message01Service;

   @Autowired
   UserInfoHolder userInfoHolder;

   @Override
   public void init(FilterConfig filterConfig) throws ServletException {
   }

   @Override
   public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
         throws IOException, ServletException {
      HttpServletRequest request = (HttpServletRequest) req;
      HttpServletResponse response = (HttpServletResponse) resp;
      String uri = request.getRequestURI();
      if (skipUri(uri)) {
         chain.doFilter(request, response);
      } else {
         try {
            Cookie cookie = CookieUtil.getCookie(request, "userSessionId");
            if (cookie == null) {
               response.sendRedirect("/login");
            } else {
               String userId = DesUtil.decrypt(cookie.getValue());
               userInfoHolder.setUserId(userId);
               chain.doFilter(request, response);
            }

         } catch (Exception e) {
            log.error("login fail", e);
            response.sendRedirect("/login");
         } finally {
            message01Service.clearDbId();
            userInfoHolder.clear();
         }
      }
   }

   private List<String> skipUrlLst = new ArrayList<>();

   public AuthFilter() {     
      skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic");
   }

   private boolean skipUri(String uri) {
      for(String t : skipUrlLst){
         if(uri.indexOf(t)!=-1){
            return true;
         }
      }
      return false;
   }

   @Override
   public void destroy() {
      // TODO Auto-generated method stub

   }

}

4.获取用户信息,使用threadLocal

/**
 * 默认用户信息holder
 */
@Service
public class DefaultUserInfoHolder implements UserInfoHolder {
   @Autowired
   private UserProviderService userProviderService;
   //使用threadLocal,用户idLocal
   private ThreadLocal<String> userIdLocal = new ThreadLocal<>();

   //获取用户信息
   @Override
   public UserInfo getUser() {
      String userId = userIdLocal.get();
      Map<String, UserInfo> mapUser = userProviderService.getUsers();
      if (mapUser.containsKey(userId)) {
         return mapUser.get(userId);
      }
      return null;
   }

   //获取用户id
   @Override
   public String getUserId() {
      return userIdLocal.get();

   }

   //设置用户id
   @Override
   public void setUserId(String userId) {
      userIdLocal.set(userId);

   }

   //执行清理操作Thread
   @Override
   public void clear() {
      userIdLocal.remove();
   }
}

这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。

5.cookie工具类:

/**
 * cookie工具类
 */
public class CookieUtil {
    //获取用户名称
    public static String getUserName(HttpServletRequest request){
        //获取一个cookie数组
        Cookie[] cookies = request.getCookies();
        String userName="";
        if (cookies!=null) {
            //获取cookie
            Cookie cookie = getCookie(request, "userSessionId");
            if (cookie == null) {
                return "";
            }
            try {
                //返回对数据进行DES加密的数据
                return DesUtil.decrypt(cookie.getValue());
            } catch (Exception e) {
                return "";
            }

        }
        return userName;
    }

    //获取cookie
    public static Cookie getCookie(HttpServletRequest request, String key) {
        Cookie[] cks = request.getCookies();
        if (cks != null) {
            for (Cookie temp : cks) {
                if (temp.getName().equals(key))
                    return temp;
            }
        }
        return null;
    }
}

常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-12-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • dubbo源码学习二

    昨天我们已经知道dubbo的入口可以通过自定义标签找到DubboNamespaceHandler找到解析自定义标签的相关类了。还有4个我们需要关注:Config...

    路行的亚洲
  • 对前端传入的json对象解析成多个对象

    multiRequestBodyDemo(@MultiRequestBody("dog")

    路行的亚洲
  • rocketmq学习2

    从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfi...

    路行的亚洲
  • Java 编程技巧之数据结构

    编写代码的"老司机"也是如此,"老司机"之所以被称为"老司机",原因也是"无他,唯手熟尔"。编码过程中踩过的坑多了,获得的编码经验也就多了,总结的编码技巧也就更...

    用户1516716
  • Java 编程技巧之数据结构

    编写代码的"老司机"也是如此,"老司机"之所以被称为"老司机",原因也是"无他,唯手熟尔"。编码过程中踩过的坑多了,获得的编码经验也就多了,总结的编码技巧也就更...

    吴延宝
  • Java高并发之设计模式

    原文出处:http://www.yund.tech/zdetail.html?type=1&id=34e52a515cd0e4d120255c90f33396a...

    大道七哥
  • 仅需四步,整合SpringSecurity+JWT实现登录认证 !

    macrozheng
  • Java微信公众平台开发_03_消息管理之被动回复消息

    上一节,我们启用服务器配置的时候,填写了一个服务器地址(url),如下图,这个url就是回调url,是开发者用来接收微信消息和事件的接口URL 。也就是说,用户...

    shirayner
  • 猿实战03——猿首战之手把手教你撸品牌

    猿实战是一个原创系列文章,通过实战的方式,采用前后端分离的技术结合SpringMVC Spring Mybatis,手把手教你撸一个完整的电商系统,跟着教程走下...

    山旮旯的胖子
  • 从零开始写简易读写分离,不难嘛!

    温安适

扫码关注云+社区

领取腾讯云代金券