前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊nacos的ConfigDataChangeEvent

聊聊nacos的ConfigDataChangeEvent

作者头像
code4it
发布2019-10-22 17:21:18
7320
发布2019-10-22 17:21:18
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下nacos的ConfigDataChangeEvent

Event

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/utils/event/EventDispatcher.java

代码语言:javascript
复制
    public interface Event {
    }
  • Event是个空接口,仅仅用于标名事件

ConfigDataChangeEvent

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigDataChangeEvent.java

代码语言:javascript
复制
public class ConfigDataChangeEvent implements Event {

    final public boolean isBeta;
    final public String dataId;
    final public String group;
    final public String tenant;
    final public String tag;
    final public long lastModifiedTs;

    public ConfigDataChangeEvent(String dataId, String group, long gmtModified) {
        this(false, dataId, group, gmtModified);
    }

    public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, String tenant, long gmtModified) {
        if (null == dataId || null == group) {
            throw new IllegalArgumentException();
        }
        this.isBeta = isBeta;
        this.dataId = dataId;
        this.group = group;
        this.tenant = tenant;
        this.tag = null;
        this.lastModifiedTs = gmtModified;
    }

    public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, long gmtModified) {
        this(isBeta, dataId, group, StringUtils.EMPTY, gmtModified);
    }

    public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, String tenant, String tag,
                                 long gmtModified) {
        if (null == dataId || null == group) {
            throw new IllegalArgumentException();
        }
        this.isBeta = isBeta;
        this.dataId = dataId;
        this.group = group;
        this.tenant = tenant;
        this.tag = tag;
        this.lastModifiedTs = gmtModified;
    }

}
  • ConfigDataChangeEvent实现了Event接口,它包含有isBeta、dataId、group、tenant、tag、lastModifiedTs属性

AbstractEventListener

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/utils/event/EventDispatcher.java

代码语言:javascript
复制
    static public abstract class AbstractEventListener {

        public AbstractEventListener() {
            /**
             * automatic register
             */
            EventDispatcher.addEventListener(this);
        }

        /**
         * 感兴趣的事件列表
         *
         * @return event list
         */
        abstract public List<Class<? extends Event>> interest();

        /**
         * 处理事件
         *
         * @param event event
         */
        abstract public void onEvent(Event event);
    }
  • AbstractEventListener接口定义了interest、onEvent两个抽象方法

AsyncNotifyService

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java

代码语言:javascript
复制
@Service
public class AsyncNotifyService extends AbstractEventListener {

    @Override
    public List<Class<? extends Event>> interest() {
        List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
        // 触发配置变更同步通知
        types.add(ConfigDataChangeEvent.class);
        return types;
    }

    @Override
    public void onEvent(Event event) {

        // 并发产生 ConfigDataChangeEvent
        if (event instanceof ConfigDataChangeEvent) {
            ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
            long dumpTs = evt.lastModifiedTs;
            String dataId = evt.dataId;
            String group = evt.group;
            String tenant = evt.tenant;
            String tag = evt.tag;
            List<?> ipList = serverListService.getServerList();

            // 其实这里任何类型队列都可以
            Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
            for (int i = 0; i < ipList.size(); i++) {
                queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
            }
            EXECUTOR.execute(new AsyncTask(httpclient, queue));
        }
    }

    @Autowired
    public AsyncNotifyService(ServerListService serverListService) {
        this.serverListService = serverListService;
        httpclient.start();
    }

    //......
}
  • AsyncNotifyService继承了AbstractEventListener接口,其interest返回的列表包含了ConfigDataChangeEvent;其onEvent方法判断是ConfigDataChangeEvent,则会通过serverListService.getServerList()获取ipList,然后遍历该列表往queue添加NotifySingleTask,然后使用该queue创建AsyncTask并提交线程池执行

AsyncTask

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java

代码语言:javascript
复制
    class AsyncTask implements Runnable {

        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
            this.httpclient = httpclient;
            this.queue = queue;
        }

        @Override
        public void run() {
            executeAsyncInvoke();
        }

        private void executeAsyncInvoke() {
            while (!queue.isEmpty()) {
                NotifySingleTask task = queue.poll();
                String targetIp = task.getTargetIP();
                if (serverListService.getServerList().contains(
                    targetIp)) {
                    // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
                    if (serverListService.isHealthCheck()
                        && ServerListService.getServerListUnhealth().contains(targetIp)) {
                        // target ip 不健康,则放入通知列表中
                        ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                            task.getLastModified(),
                            LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
                        // get delay time and set fail count to the task
                        asyncTaskExecute(task);
                    } else {
                        HttpGet request = new HttpGet(task.url);
                        request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                            String.valueOf(task.getLastModified()));
                        request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
                        if (task.isBeta) {
                            request.setHeader("isBeta", "true");
                        }
                        httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
                    }
                }
            }
        }

        private Queue<NotifySingleTask> queue;
        private CloseableHttpAsyncClient httpclient;

    }
  • AsyncTask实现了Runnable接口,其run方法执行executeAsyncInvoke方法;该方法会不断从queue取出NotifySingleTask,然后判断目标ip是否健康不健康则延迟再次执行该task;如果是健康的话则使用httpclient向目标地址发送get请求,然后注册AsyncNotifyCallBack

AsyncNotifyCallBack

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java

代码语言:javascript
复制
    class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {

        public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
            this.task = task;
            this.httpClient = httpClient;
        }

        @Override
        public void completed(HttpResponse response) {

            long delayed = System.currentTimeMillis() - task.getLastModified();

            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                ConfigTraceService.logNotifyEvent(task.getDataId(),
                    task.getGroup(), task.getTenant(), null, task.getLastModified(),
                    LOCAL_IP,
                    ConfigTraceService.NOTIFY_EVENT_OK, delayed,
                    task.target);
            } else {
                log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}",
                    task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode());
                ConfigTraceService.logNotifyEvent(task.getDataId(),
                    task.getGroup(), task.getTenant(), null, task.getLastModified(),
                    LOCAL_IP,
                    ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
                    task.target);

                //get delay time and set fail count to the task
                asyncTaskExecute(task);

                LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
                    task.target, task.getDataId(), task.getGroup(), task.getLastModified());

                MetricsMonitor.getConfigNotifyException().increment();
            }
            HttpClientUtils.closeQuietly(response);
        }

        @Override
        public void failed(Exception ex) {

            long delayed = System.currentTimeMillis() - task.getLastModified();
            log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}",
                task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString());
            ConfigTraceService.logNotifyEvent(task.getDataId(),
                task.getGroup(), task.getTenant(), null, task.getLastModified(),
                LOCAL_IP,
                ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed,
                task.target);

            //get delay time and set fail count to the task
            asyncTaskExecute(task);
            LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
                task.target, task.getDataId(), task.getGroup(), task.getLastModified());

            MetricsMonitor.getConfigNotifyException().increment();
        }

        @Override
        public void cancelled() {

            LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}",
                task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");

            //get delay time and set fail count to the task
            asyncTaskExecute(task);
            LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
                task.target, task.getDataId(), task.getGroup(), task.getLastModified());

            MetricsMonitor.getConfigNotifyException().increment();
        }

        private NotifySingleTask task;
        private CloseableHttpAsyncClient httpClient;
    }
  • AsyncNotifyCallBack实现了FutureCallback接口,其completed方法判断请求是否是HttpStatus.SC_OK,不是的话会再次延迟调度该任务;其failed、cancelled方法也是会再次延时调度该任务

小结

ConfigDataChangeEvent实现了Event接口,它包含有isBeta、dataId、group、tenant、tag、lastModifiedTs属性;AsyncNotifyService继承了AbstractEventListener接口,其interest返回的列表包含了ConfigDataChangeEvent;其onEvent方法判断是ConfigDataChangeEvent,则会通过serverListService.getServerList()获取ipList,然后遍历该列表往queue添加NotifySingleTask,然后使用该queue创建AsyncTask并提交线程池执行

doc

  • ConfigDataChangeEvent
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Event
  • ConfigDataChangeEvent
  • AbstractEventListener
  • AsyncNotifyService
  • AsyncTask
  • AsyncNotifyCallBack
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档