分布式作业 Elastic-Job-Lite 源码分析 —— 作业分片

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-sharding/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

1. 概述2. 作业分片条件3. 分配作业分片项4. 获取作业分片上下文集合666. 彩


1. 概述

本文主要分享 Elastic-Job-Lite 作业分片

涉及到主要类的类图如下( 打开大图 ):

  • 粉色的类在 com.dangdang.ddframe.job.lite.internal.sharding 包下,实现了 Elastic-Job-Lite 作业分片。
  • ShardingService,作业分片服务。
  • ShardingNode,作业分片数据存储路径。
  • ShardingListenerManager,作业分片监听管理器。

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门

2. 作业分片条件

当作业满足分片条件时,不会立即进行作业分片分配,而是设置需要重新进行分片的标记,等到作业分片获取时,判断有该标记后执行作业分配。

设置需要重新进行分片的标记的代码如下:

// ShardingService.java
/**
* 设置需要重新分片的标记.
*/
public void setReshardingFlag() {
   jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}

// JobNodeStorage.java
/**
* 如果存在则创建作业节点.
* 如果作业根节点不存在表示作业已经停止, 不再继续创建节点.
* 
* @param node 作业节点名称
*/
public void createJobNodeIfNeeded(final String node) {
   if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
       regCenter.persist(jobNodePath.getFullPath(node), "");
   }
}
  • 调用 #setReshardingFlag() 方法设置需要重新分片的标记/${JOB_NAME}/leader/sharding/necessary。该 Zookeeper 数据节点是永久节点,存储空串( ""),使用 zkClient 查看如下: [zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding [necessary] [zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding/necessary
  • 设置标记之后,通过调用 #isNeedSharding() 方法即可判断是否需要重新分片。 // ShardingService.java /** * 判断是否需要重分片. * * @return 是否需要重分片 */ public boolean isNeedSharding() { return jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY); } // JobNodeStorage.java /** * 判断作业节点是否存在. * * @param node 作业节点名称 * @return 作业节点是否存在 */ public boolean isJobNodeExisted(final String node) { return regCenter.isExisted(jobNodePath.getFullPath(node)); }

设置需要重新进行分片有 4 种情况

第一种,注册作业启动信息时。

// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // ... 省略无关代码
   // 设置 需要重新分片的标记
   shardingService.setReshardingFlag();
  // ... 省略无关代码
}

第二种,作业分片总数( JobCoreConfiguration.shardingTotalCount )变化时。

// ShardingTotalCountChangedJobListener.java
class ShardingTotalCountChangedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (configNode.isConfigPath(path)
               && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
           int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
           if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { // 作业分片总数变化
               // 设置需要重新分片的标记
               shardingService.setReshardingFlag();
               // 设置当前分片总数
               JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
           }
       }
   }
}

第三种,服务器变化时。

// ShardingListenerManager.java
class ListenServersChangedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && (isInstanceChange(eventType, path)
                   || isServerChange(path))) {
           shardingService.setReshardingFlag();
       }
   }

   private boolean isInstanceChange(final Type eventType, final String path) {
       return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
   }

   private boolean isServerChange(final String path) {
       return serverNode.isServerPath(path);
   }
}
  • 服务器变化有两种情况。
  • 第一种,#isServerChange(…) 服务器被开启或禁用。
  • 第二种,#isInstanceChange(…) 作业节点新增或者移除。

第四种,在《Elastic-Job-Lite 源码解析 —— 自诊断修复》详细分享。

3. 分配作业分片项

调用 ShardingService#shardingIfNecessary() 方法,如果需要分片且当前节点为主节点, 则作业分片。

总体流程如下顺序图:( 打开大图 ):

实现代码如下:

// ShardingService.java
/**
* 如果需要分片且当前节点为主节点, 则作业分片.
* 
* 如果当前无可用节点则不分片.
*/
public void shardingIfNecessary() {
   List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
   if (!isNeedSharding() // 判断是否需要重新分片
           || availableJobInstances.isEmpty()) {
       return;
   }
   // 【非主节点】等待 作业分片项分配完成
   if (!leaderService.isLeaderUntilBlock()) { // 判断是否为【主节点】
       blockUntilShardingCompleted();
       return;
   }
   // 【主节点】作业分片项分配
   // 等待 作业未在运行中状态
   waitingOtherJobCompleted();
   //
   LiteJobConfiguration liteJobConfig = configService.load(false);
   int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
   // 设置 作业正在重分片的标记
   log.debug("Job '{}' sharding begin.", jobName);
   jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
   // 重置 作业分片项信息
   resetShardingInfo(shardingTotalCount);
   // 【事务中】设置 作业分片项信息
   JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
   jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
   log.debug("Job '{}' sharding complete.", jobName);
}
  • 调用 #isNeedSharding() 方法判断是否需要重新分片。
  • 调用 LeaderService#isLeaderUntilBlock() 方法判断是否为主节点。作业分片项的分配过程:
    • 【主节点】执行作业分片项分配。
    • 【非主节点】等待作业分片项分配完成。
    • LeaderService#isLeaderUntilBlock() 方法在《Elastic-Job-Lite 源码分析 —— 主节点选举》「3. 选举主节点」有详细分享。
  • 调用 #blockUntilShardingCompleted() 方法【非主节点】等待作业分片项分配完成。 private void blockUntilShardingCompleted() { while (!leaderService.isLeaderUntilBlock() // 当前作业节点不为【主节点】 && (jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY) // 存在作业需要重分片的标记 || jobNodeStorage.isJobNodeExisted(ShardingNode.PROCESSING))) { // 存在作业正在重分片的标记 log.debug("Job '{}' sleep short time until sharding completed.", jobName); BlockUtils.waitingShortTime(); } }
    • 调用 #LeaderService#isLeaderUntilBlock() 方法判断是否为主节点。为什么上面判断了一次,这里又判断一次?主节点作业分片项分配过程中,不排除自己挂掉了,此时【非主节点】若选举成主节点,无需继续等待,当然也不能等待,因为已经没节点在执行作业分片项分配,所有节点都会卡在这里。
    • 作业需要重分片的标记作业正在重分片的标记 都不存在时,意味着作业分片项分配已经完成,下文 PersistShardingInfoTransactionExecutionCallback 类里我们会看到。
  • 调用 #waitingOtherJobCompleted() 方法等待作业未在运行中状态。作业是否在运行中需要 LiteJobConfiguration.monitorExecution = true,《Elastic-Job-Lite 源码分析 —— 作业执行》「4.6 执行普通触发的作业」有详细分享。
  • 调用 ConfigurationService#load(…) 方法从注册中心获取作业配置( 非缓存 ),避免主节点本地作业配置可能非最新的,主要目的是获得作业分片总数( shardingTotalCount )。
  • 调用 jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "") 设置作业正在重分片的标记 /${JOB_NAME}/leader/sharding/processing。该 Zookeeper 数据节点是临时节点,存储空串( "" ),仅用于标记作业正在重分片,无特别业务逻辑。
  • 调用 #resetShardingInfo(...) 方法重置作业分片信息。 private void resetShardingInfo(final int shardingTotalCount) { // 重置 有效的作业分片项 for (int i = 0; i < shardingTotalCount; i++) { jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getInstanceNode(i)); // 移除 /${JOB_NAME}/sharding/${ITEM_ID}/instance jobNodeStorage.createJobNodeIfNeeded(ShardingNode.ROOT + "/" + i); // 创建 /${JOB_NAME}/sharding/${ITEM_ID} } // 移除 多余的作业分片项 int actualShardingTotalCount = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT).size(); if (actualShardingTotalCount > shardingTotalCount) { for (int i = shardingTotalCount; i < actualShardingTotalCount; i++) { jobNodeStorage.removeJobNodeIfExisted(ShardingNode.ROOT + "/" + i); // 移除 /${JOB_NAME}/sharding/${ITEM_ID} } } }
  • 调用 JobShardingStrategy#sharding(…) 方法计算每个节点分配的作业分片项。《Elastic-Job-Lite 源码分析 —— 作业分片策略》有详细分享。
  • 调用 JobNodeStorage#executeInTransaction(...) + PersistShardingInfoTransactionExecutionCallback#execute() 方法实现在事务设置每个节点分配的作业分片项。 // PersistShardingInfoTransactionExecutionCallback.java class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback { /** * 作业分片项分配结果 * key:作业节点 * value:作业分片项 */ private final Map<JobInstance, List<Integer>> shardingResults; @Override public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception { // 设置 每个节点分配的作业分片项 for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { for (int shardingItem : entry.getValue()) { curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)) , entry.getKey().getJobInstanceId().getBytes()).and(); } } // 移除 作业需要重分片的标记、作业正在重分片的标记 curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and(); curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and(); } } // JobNodeStorage.java /** * 在事务中执行操作. * * @param callback 执行操作的回调 */ public void executeInTransaction(final TransactionExecutionCallback callback) { try { CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and(); callback.execute(curatorTransactionFinal); curatorTransactionFinal.commit(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } } [zk: localhost:2181(CONNECTED) 0] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance 192.168.3.2@-@31492
    • 设置临时数据节点 /${JOB_NAME}/sharding/${ITEM_ID}/instance 为分配的作业节点的作业实例主键( jobInstanceId )。使用 zkClient 查看如下:

作业分片项分配整体流程有点长,耐着心看,毕竟是核心代码哟。如果中间有任何疑问,欢迎给我公众号:芋道源码 留言。

4. 获取作业分片上下文集合

在《Elastic-Job-Lite 源码分析 —— 作业执行的》「4.2 获取当前作业服务器的分片上下文」中,我们可以看到作业执行器( AbstractElasticJobExecutor ) 执行作业时,会获取当前作业服务器的分片上下文进行执行。获取过程总体如下顺序图( 打开大图 ):

  • 橘色叉叉在《Elastic-Job-Lite 源码解析 —— 作业失效转移》有详细分享。

实现代码如下:

// LiteJobFacade.java
@Override
public ShardingContexts getShardingContexts() {
   // 【忽略,作业失效转移详解】获得 失效转移的作业分片项
   boolean isFailover = configService.load(true).isFailover();
   if (isFailover) {
       List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
       if (!failoverShardingItems.isEmpty()) {
           return executionContextService.getJobShardingContext(failoverShardingItems);
       }
   }
   // 作业分片,如果需要分片且当前节点为主节点
   shardingService.shardingIfNecessary();
   // 获得 分配在本机的作业分片项
   List<Integer> shardingItems = shardingService.getLocalShardingItems();
   // 【忽略,作业失效转移详解】移除 分配在本机的失效转移的作业分片项目
   if (isFailover) {
       shardingItems.removeAll(failoverService.getLocalTakeOffItems());
   }
   // 移除 被禁用的作业分片项
   shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
   // 获取当前作业服务器分片上下文
   return executionContextService.getJobShardingContext(shardingItems);
}
  • 调用 ShardingService#shardingIfNecessary() 方法,如果需要分片且当前节点为主节点,作业分片项分配不是每次都需要作业分片,必须满足「2. 作业分片条件」才执行作业分片
  • 调用 ShardingService#getLocalShardingItems()方法,获得分配在本机的作业分片项,即 /${JOB_NAME}/sharding/${ITEM_ID}/instance 为本机的作业分片项。 // ShardingService.java /** * 获取运行在本作业实例的分片项集合. * * @return 运行在本作业实例的分片项集合 */ public List<Integer> getLocalShardingItems() { if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { return Collections.emptyList(); } return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } /** * 获取作业运行实例的分片项集合. * * @param jobInstanceId 作业运行实例主键 * @return 作业运行实例的分片项集合 */ public List<Integer> getShardingItems(final String jobInstanceId) { JobInstance jobInstance = new JobInstance(jobInstanceId); if (!serverService.isAvailableServer(jobInstance.getIp())) { return Collections.emptyList(); } List<Integer> result = new LinkedList<>(); int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount(); for (int i = 0; i < shardingTotalCount; i++) { // /${JOB_NAME}/sharding/${ITEM_ID}/instance if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) { result.add(i); } } return result; }
  • 调用 shardingItems.removeAll(executionService.getDisabledItems(shardingItems)),移除被禁用的作业分片项,即 /${JOB_NAME}/sharding/${ITEM_ID}/disabled 存在的作业分片项。 // ExecutionService.java /** * 获取禁用的任务分片项. * * @param items 需要获取禁用的任务分片项 * @return 禁用的任务分片项 */ public List<Integer> getDisabledItems(final List<Integer> items) { List<Integer> result = new ArrayList<>(items.size()); for (int each : items) { // /¨E123EJOB¨E95ENAME¨E125E/sharding/<annotation encoding="application style="color: rgb(150, 152, 150);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">d<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="margin-right:0.03588em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">g/{ITEM_ID}/disabled if (jobNodeStorage.isJobNodeExisted(ShardingNode.getDisabledNode(each))) { result.add(each); } } return result; } </span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application>
  • 调用 ExecutionContextService#getJobShardingContext(…) 方法,获取当前作业服务器分片上下文。

获取当前作业服务器分片上下文

调用 ExecutionContextService#getJobShardingContext(...) 方法,获取当前作业服务器分片上下文:

// ExecutionContextService.java
public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
   LiteJobConfiguration liteJobConfig = configService.load(false);
   // 移除 正在运行中的作业分片项
   removeRunningIfMonitorExecution(liteJobConfig.isMonitorExecution(), shardingItems);
   //
   if (shardingItems.isEmpty()) {
       return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 
               liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), Collections.<Integer, String>emptyMap());
   }
   // 解析分片参数
   Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
   // 创建 分片上下文集合
   return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), //
           liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
           liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(),
           getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap)); // 获得当前作业节点的分片参数
}
  • 调用 #removeRunningIfMonitorExecution() 方法,移除正在运行中的作业分片项。 private void removeRunningIfMonitorExecution(final boolean monitorExecution, final List<Integer> shardingItems) { if (!monitorExecution) { return; } List<Integer> runningShardingItems = new ArrayList<>(shardingItems.size()); for (int each : shardingItems) { if (isRunning(each)) { runningShardingItems.add(each); // /¨E123EJOB¨E95ENAME¨E125E/sharding/<annotation encoding="application style="color: rgb(150, 152, 150);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">d<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="margin-right:0.03588em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">g/{ITEM_ID}/running } } shardingItems.removeAll(runningShardingItems); } private boolean isRunning(final int shardingItem) { return jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(shardingItem)); } </span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application>
  • 使用 ShardingItemParameters 解析作业分片参数。例如作业分片参数( JobCoreConfiguration.shardingItemParameters="0=Beijing,1=Shanghai,2=Guangzhou") 解析结果:
    • ShardingItemParameters 代码清晰易懂,点击链接直接查看。
  • 调用 #buildTaskId(...) 方法,创建作业任务ID( ShardingContexts.taskId ): private String buildTaskId(final LiteJobConfiguration liteJobConfig, final List<Integer> shardingItems) { JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName); return Joiner.on("@-@").join(liteJobConfig.getJobName(), Joiner.on(",").join(shardingItems), "READY", null == jobInstance.getJobInstanceId() ? "127.0.0.1@-@1" : jobInstance.getJobInstanceId()); }
    • taskId = ${JOB_NAME} + @-@ + ${SHARDING_ITEMS} + @-@ + READY + @-@ + ${IP} + @-@ + ${PID}。例如:javaSimpleJob@-@0,1,2@-@READY@-@192.168.3.2@-@38330
  • 调用 #getAssignedShardingItemParameterMap(...) 方法,获得当前作业节点的分片参数。 private Map<Integer, String> getAssignedShardingItemParameterMap(final List<Integer> shardingItems, final Map<Integer, String> shardingItemParameterMap) { Map<Integer, String> result = new HashMap<>(shardingItemParameterMap.size(), 1); for (int each : shardingItems) { result.put(each, shardingItemParameterMap.get(each)); } return result; }
  • ShardingContexts,分片上下文集合。 public final class ShardingContexts implements Serializable {private static final long serialVersionUID = -4585977349142082152L; /** * 作业任务ID. */ private final String taskId; /** * 作业名称. */ private final String jobName; /** * 分片总数. */ private final int shardingTotalCount; /** * 作业自定义参数. * 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例. */ private final String jobParameter; /** * 分配于本作业实例的分片项和参数的Map. */ private final Map&lt;Integer, String&gt; shardingItemParameters; /** * 作业事件采样统计数. */ private int jobEventSamplingCount; /** * 当前作业事件采样统计数. */ @Setter private int currentJobEventSamplingCount; /** * 是否允许可以发送作业事件. */ @Setter private boolean allowSendJobEvent = true; }
    • jobEventSamplingCountcurrentJobEventSamplingCount 在 Elastic-Job-Lite 暂未还使用,在 Elastic-Job-Cloud 使用。

666. 彩蛋

旁白君:小伙伴,更新了干货嘛,双击 666。 芋道君:那必须的嘛,而且这么勤快更新!是不是应该分享一波朋友圈。

道友,赶紧上车,分享一波朋友圈!



原文发布于微信公众号 - 芋道源码(YunaiV)

原文发表时间:2018-11-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

Spark Job的提交与task本地化分析(源码阅读八)

  我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL...

1152
来自专栏技术博客

Asp.Net Mvc3.0(MEF依赖注入实例)

在http://www.cnblogs.com/aehyok/p/3386650.html前面一节主要是对MEF进行简单的介绍。本节主要来介绍如何在Asp.Ne...

882
来自专栏AILearning

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Even...

1.7K6
来自专栏岑玉海

Hadoop源码系列(一)FairScheduler申请和分配container的过程

1、如何申请资源 1.1 如何启动AM并申请资源 1.1.1 如何启动AM val yarnClient = YarnClient.createYarnClie...

4313
来自专栏祝威廉

Spark Shuffle Write阶段磁盘文件分析

上篇写了 Spark Shuffle 内存分析 后,有不少人提出了疑问,大家也对如何落文件挺感兴趣的,所以这篇文章会详细介绍,Sort Based Shuff...

691
来自专栏芋道源码1024

分布式作业 Elastic-Job-Lite 源码分析 —— 作业数据存储

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。

1202
来自专栏听雨堂

Log4Net使用心得

winform程序使用Log4net   1.引用dll   2.添加log4net.config,设置“始终复制”   4.assemblyinfo.cs中添...

22010
来自专栏知识分享

关于STM32 IAP

转眼间天亮了...... 然后就想起了一个朋友QQ的个性签名:年轻人总是要为一些自己认为有意义的事情而废寝忘食,通宵达旦,直至白发方休........ 对了这篇...

5334
来自专栏岑玉海

Spark源码系列(一)spark-submit提交作业过程

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。 ? 这个是Spark的App运行图,它通过一个Driver来和集...

4736
来自专栏分布式系统进阶

Kafka源码分析-启动流程

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

1970

扫码关注云+社区

领取腾讯云代金券