前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊powerjob的执行机器地址

聊聊powerjob的执行机器地址

原创
作者头像
code4it
发布2024-03-16 17:30:10
610
发布2024-03-16 17:30:10
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下powerjob的执行机器地址(designatedWorkers)

SaveJobInfoRequest

powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java

代码语言:javascript
复制
@Data
public class SaveJobInfoRequest {

    /**
     * id of the job. set null to create or non-null to update the job.
     */
    private Long id;

    //......

    /* ************************** PowerJob-worker cluster property ************************** */
    /**
     * Designated PowerJob-worker nodes. Blank value indicates that there is
     * no limit. Non-blank value means to run the corresponding machine(s) only.
     * example: 192.168.1.1:27777,192.168.1.2:27777
     */
    private String designatedWorkers;
    /**
     * Max count of PowerJob-worker nodes.
     */
    private Integer maxWorkerCount = 0;

    //......
}    

SaveJobInfoRequest定义了designatedWorkers,用于指定woker节点的地址

JobInfoDO

powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java

代码语言:javascript
复制
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {
        @Index(name = "idx01_job_info", columnList = "appId,status,timeExpressionType,nextTriggerTime"),
})
public class JobInfoDO {


    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
    @GenericGenerator(name = "native", strategy = "native")
    private Long id;

    //......

    /**
     * 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割)
     */
    private String designatedWorkers;
    /**
     * 最大机器数量
     */
    private Integer maxWorkerCount;

    //......

}    

JobInfoDO定义了designatedWorkers,用于指定运行的机器地址

DesignatedWorkerFilter

powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java

代码语言:javascript
复制
@Slf4j
@Component
public class DesignatedWorkerFilter implements WorkerFilter {

    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {

        String designatedWorkers = jobInfo.getDesignatedWorkers();

        // no worker is specified, no filter of any
        if (StringUtils.isEmpty(designatedWorkers)) {
            return false;
        }

        Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers));

        for (String tagOrAddress : designatedWorkersSet) {
            if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
                return false;
            }
        }

        return true;
    }

}

DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true

getSuitableWorkers

powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

代码语言:javascript
复制
    public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

        workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

        DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
        switch (dispatchStrategy) {
            case RANDOM:
                Collections.shuffle(workers);
                break;
            case HEALTH_FIRST:
                workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                break;
            default:
                // do nothing
        }

        // 限定集群大小(0代表不限制)
        if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
            workers = workers.subList(0, jobInfo.getMaxWorkerCount());
        }
        return workers;
    }

WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉

小结

powerjob的执行机器地址(designatedWorkers)用于指定该job运行的worker机器列表,DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true,WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SaveJobInfoRequest
  • JobInfoDO
  • DesignatedWorkerFilter
  • getSuitableWorkers
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档