首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码分析ElasticJob分片机制

源码分析ElasticJob分片机制

作者头像
丁威
修改2019-06-11 17:59:45
1.5K0
修改2019-06-11 17:59:45
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

本文将重点分析ElasticJob的分片机制。

分片机制概述

ElasticJob分片工作机制: 1、ElasticJob在启动时,首先会启动是否需要重新分片的监听器。 代码入口如下:

1ListenerManager#startAllListeners {
2     ...; 
3     shardingListenerManager.start();
4     ...
5}

2、任务执行之前需要获取分片信息,如果需要重新分片,则由主服务器执行分片算法,其他从服务器等待直到分片完成。 代码入口如下:

1AbstractElasticJobExecutor#execute {
2    ...; 
3    jobFacade.getShardingContexts();
4    ...;
5}

其中其核心实现为jobFacade.getShard-ingContexts()。

判断是否需要重新分片

ElasticJob的事件监听管理器实现类为AbstractListenerManager,主要是在分片节点发送变化后,设置需要重新分片标记。

其类图为:

  • JobNodeStorage jobNodeStorage:Job node操作A-PI。

其核心方法:

  1. public abstract void start():启动监听管理器,由子类具体实现。
  2. protected void addDataListener(Tr-eeCacheListener listener):增加事件监听器。

ElasticJob的选主监听管理器、分片监听器管理器、故障转移监听管理器等都是AbstractListenerManager的子类。 分片相关的监听管理器类图如图所示:

  • ShardingListenerManager:分片监听管理器。
  • ShardingTotalCountChangedJobListener:监听总分片数量事件管理器,是TreeCacheListener(curator的事件监听器)子类。
  • ListenServersChangedJobListener:任务job服务器数量(运行时实例)发生变化后的事件监听器。
源码分析分片数量变更监听器
 1class ShardingTotalCountChangedJobListener extends AbstractJobListener {
 2        @Override
 3        protected void dataChanged(final String path, final Type eventType, final String data) {
 4            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
 5                int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
 6                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
 7                    shardingService.setReshardingFlag();
 8                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
 9                }
10            }
11        }
12    }

job配置的分片总节点数发生变化监听器(ElasticJob允许通过Web界面修改每个任务配置的分片总数量)。

job的配置信息存储在{namespace}/job-name/config节点上,存储内容为json格式的配置信息。

如果{namespace}/jobname/config节点的内容发生变化后,zk会触发该节点的节点数据变化事件,如果zk中存储的分片节点数量与内存中的分片数量(JobRe-gistry.getInstance())不相同的话,调用ShardingService设置需要重新分片标记(创建{namespace}/jobname/leader/s-harding/necessary持久节点)并更新内存中的分片节点总数,表示下次任务调度开始之前首先需要重新分片。

源码分析分片节点实例监听器
 1class ListenServersChangedJobListener extends AbstractJobListener {
 2        @Override
 3        protected void dataChanged(final String path, final Type eventType, final String data) {
 4            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
 5                shardingService.setReshardingFlag();
 6            }
 7        }
 8        private boolean isInstanceChange(final Type eventType, final String path) {
 9            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
10        }
11        private boolean isServerChange(final String path) {
12            return serverNode.isServerPath(path);
13        }
14    }

分片节点(实例数)发生变化事件监听器,当新的分片节点加入或原的分片实例宕机后,需要进行重新分片。

当{namespace}/jobname/servers或{na-mespace}/jobname/instances路径下的节点数量一旦发生变化,如果检测到发生变化,则设置需要重新分片标识。下次调度任务开始之前首先执行分片逻辑,分片信息计算完毕后再开始执行任务处理逻辑。

具体分片逻辑实现

上面详细分析了分片监听管理器,其职责就是监听特定的ZK目录,当发生变化后判断是否需要设置重新分片的标记,如果设置了需要重新分片标记后,在什么时候触发重新分片呢?

每个调度任务在执行之前,首先需要获取分片信息(分片上下文环境),然后根据分片信息从服务器拉取不同的数据,进行任务处理,其源码入口为: com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#execute。具体实现为jobFacade.getShardingConte-xts()方法。

具体实现方法代码为:

 1LiteJobFacade#getShardingContexts
 2public ShardingContexts getShardingContexts() {
 3        boolean isFailover = configService.load(true).isFailover();     // @1
 4        if (isFailover) {
 5            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
 6            if (!failoverShardingItems.isEmpty()) {
 7                return executionContextService.getJobShardingContext(failoverShardingItems);
 8            }
 9        }
10        shardingService.shardingIfNecessary();   // @2
11        List<Integer> shardingItems = shardingService.getLocalShardingItems(); // @3
12        if (isFailover) {
13            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
14        }
15        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));  // @4
16        return executionContextService.getJobShardingContext(shardingItems);  // @5
17    }

代码@1:是否启动故障转移,本篇重点关注ElasticJob的分片机制,故障转移在下篇文章中详细介绍,本文假定不开启故障转移功能。 代码@2:如果有必要,则执行分片,如果不存在分片信息(第一次分片)或需要重新分片,则执行分片算法,接下来详细分析分片的实现逻辑。 代码@3:获取本地的分片信息。遍历所有分片信息{namespace}/jobname/shar-ding/{分片item}下所有instance节点,判断其值jobinstanceId是否与当前的jobIn-stanceId相等,相等则认为是本节点的分片信息。 代码@4:移除本地禁用分片,本地禁用分片的存储目录为{namespace}/jobna-me/sharding/{分片item}/disable。 代码@5:返回当前节点的分片上下文环境,这个主要是根据配置信息(与当前的分片实例,构建ShardingContexts对象。

shardingIfNecessary详解
 1/**
 2     * 如果需要分片且当前节点为主节点, 则作业分片.
 3     * 
 4     * <p>
 5     * 如果当前无可用节点则不分片.
 6     * </p>
 7     */
 8    public void shardingIfNecessary() {
 9        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); // @1
10        if (!isNeedSharding() || availableJobInstances.isEmpty()) {  // @2
11            return;
12        }
13        if (!leaderService.isLeaderUntilBlock()) {  // @3
14            blockUntilShardingCompleted();           //@4
15            return;
16        }
17        waitingOtherJobCompleted();                  // @5
18        LiteJobConfiguration liteJobConfig = configService.load(false);
19        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();  // @5
20        log.debug("Job '{}' sharding begin.", jobName);
21        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");     // @6
22        resetShardingInfo(shardingTotalCount);  // @7
23        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());  // @8
24        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));   // @9
25        log.debug("Job '{}' sharding complete.", jobName);
26    }

代码@1:获取当前可用实例,首先获取{namespace}/jobname/instances目录下的所有子节点,并且判断该实例节点的IP所在服务器是否可用,{namespace}/j-obname/servers/ip节点存储的值如果不是DISABLE,则认为该节点可用。 代码@2:如果不需要重新分片({nam-espace}/jobname/leader/sharding/nece-ssary节点不存在)或当前不存在可用实例,则返回。 代码@3,判断是否是主节点,如果当前正在进行主节点选举,则阻塞直到选主完成,阻塞这里使用的代码如下:

 1while (!hasLeader() && serverService.hasAvailableServers()) {   
 2  aa// 如果不存在主节点摈弃有可用的实例,则Thread.sleep()一下,触发一次选主。
 3  log.info("Leader is electing, waiting for {} ms", 100);
 4  BlockUtils.waitingShortTime();
 5  if (!JobRegistry.getInstance().isShutdown(jobName) &&    
 6    serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
 7    electLeader();
 8  }
 9}
10return isLeader();

代码@4:如果当前节点不是主节点,则等待分片结束。分片是否结束的判断依据是:{namespace}/jobname/leader/s-harding/necessary节点存在或{namesp-ace}/jobname/leader/sharding/processi-ng节点存在(表示正在执行分片操作),如果分片未结束,使用Thread.sleep方法阻塞100毫米后再试。 代码@5:能进入到这里,说明该节点是主节点。主节点在执行分片之前,首先等待该批任务全部执行完毕。判断是否有其他任务在运行的方法是判断是否存在{namespace}/jobname/sharding/{分片item}/running,如果存在,则使用Th-read.sleep(100),然后再判断。 代码@6:创建临时节点{namespace}/j-obname/leader/sharding/processing节点,表示分片正在执行。 代码@7:重置分片信息。先删除{nam-espace}/jobname/sharding/{分片item}/instance节点,然后创建{namespace}/jobname/sharding/{分片item}节点(如有必要)。然后根据当前配置的分片总数量,如果当前{namespace}/jobname/sh-arding子节点数大于配置的分片节点数则删除多余的节点(从大到小删除)。 代码@8:获取配置的分片算法类,常用的分片算法为平均分片算(AverageAll-ocationJobShardingStrategy)。 代码@9:在一个事务内创建 相应的分片实例信息{namespace}/jobname/{分片item}/instance,节点存放的内容为JobIn-stance实例的ID。 在ZK中执行事务操作:JobNodeStora-ge#executeInTransaction

 1/**
 2     * 在事务中执行操作.
 3     * 
 4     * @param callback 执行操作的回调
 5     */
 6    public void executeInTransaction(final TransactionExecutionCallback callback) {
 7        try {
 8            CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();  // @1
 9            callback.execute(curatorTransactionFinal);   // @2
10            curatorTransactionFinal.commit();                 //@3
11        //CHECKSTYLE:OFF
12        } catch (final Exception ex) {
13        //CHECKSTYLE:ON
14            RegExceptionHandler.handleException(ex);
15        }
16    }

代码@1,使用CuratorFrameworkFac-tory的inTransaction()方法,级联调用ch-eck(),最后通过and()方法返回Curat-orTransactionFinal实例,由该实例执行事务中的所有更新节点命令。然后执行commit()命令统一提交(该方法可以保证要么全部成功,要么全部失败)。 代码@2,通过回调PersistShardingInfo-TransactionExecutionCallback方法执行具体的逻辑。 代码@3,提交事务,下面是该部分代码:

 1ShardingService$PersistShardingInfoTransactionExecutionCallback 
 2class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
 3       private final Map<JobInstance, List<Integer>> shardingResults;
 4        @Override
 5        public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
 6            for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
 7                for (int shardingItem : entry.getValue()) {
 8                    curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), 
 9                         entry.getKey().getJobInstanceId().getBytes()).and();   // @1
10                }
11            }
12            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();   // @2
13            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();  // @3
14        }
15    }

代码@1:所谓的分片,主要是创建{na-mespace}/jobname/sharding/{分片item}/instance,节点内容为JobInstance ID。 代码@2:删除{namespace}/jobname-/leader/sharding/necessary节点。 代码@3:删除{namespace}/jobname/leader/sharding/processing节点,表示分片结束。 下面以一张分片流程图来结束本节的讲述:

本文重点介绍了ElasticJob分片逻辑,其主要分成两个部分:

1、监听分片节点的数量是否发送变化,一点发送变化,就在ZK中增加一个持久节点,该节点的存在就表示需要重新分片。

2、任务执行之前,首先判断第一步节点是否存在,如果存在,则首先进行重新分片。


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分片机制概述
  • 判断是否需要重新分片
    • 源码分析分片数量变更监听器
      • 源码分析分片节点实例监听器
      • 具体分片逻辑实现
        • shardingIfNecessary详解
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档