前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的WorkerHealthReporter

聊聊PowerJob的WorkerHealthReporter

作者头像
code4it
发布2023-12-26 18:03:51
870
发布2023-12-26 18:03:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的WorkerHealthReporter

WorkerHealthReporter

tech/powerjob/worker/background/WorkerHealthReporter.java

代码语言:javascript
复制
@Slf4j
@RequiredArgsConstructor
public class WorkerHealthReporter implements Runnable {

    private final WorkerRuntime workerRuntime;

    @Override
    public void run() {

        // 没有可用Server,无法上报
        String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
        if (StringUtils.isEmpty(currentServer)) {
            log.warn("[WorkerHealthReporter] no available server,fail to report health info!");
            return;
        }

        SystemMetrics systemMetrics;

        if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
            systemMetrics = SystemInfoUtils.getSystemMetrics();
        } else {
            systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
        }

        WorkerHeartbeat heartbeat = new WorkerHeartbeat();

        heartbeat.setSystemMetrics(systemMetrics);
        heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
        heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
        heartbeat.setAppId(workerRuntime.getAppId());
        heartbeat.setHeartbeatTime(System.currentTimeMillis());
        heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
        heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
        heartbeat.setClient("KingPenguin");
        heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());

        // 上报 Tracker 数量
        heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize());
        heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize());
        // 是否超载
        if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()){
            heartbeat.setOverload(true);
        }
        // 获取当前加载的容器列表
        heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
        // 发送请求
        if (StringUtils.isEmpty(currentServer)) {
            return;
        }
        // log
        log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}" ,
                heartbeat.getAppId(),
                heartbeat.getAppName(),
                heartbeat.isOverload(),
                workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(),
                heartbeat.getLightTaskTrackerNum(),
                workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
                heartbeat.getHeavyTaskTrackerNum()
        );

        TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
    }
}

WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报

reportWorkerHeartbeat

tech/powerjob/worker/common/utils/TransportUtils.java

代码语言:javascript
复制
    public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
        final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
        transporter.tell(url, req);
    }

    public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
        HandlerLocation handlerLocation = new HandlerLocation()
                .setRootPath(rootPath)
                .setMethodPath(handlerPath);
        return new URL()
                .setServerType(serverType)
                .setAddress(Address.fromIpv4(address))
                .setLocation(handlerLocation);
    }    

reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat

processWorkerHeartbeat

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

代码语言:javascript
复制
    @Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
    public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
        long startMs = System.currentTimeMillis();
        WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
                .setAppName(heartbeat.getAppName())
                .setAppId(heartbeat.getAppId())
                .setVersion(heartbeat.getVersion())
                .setProtocol(heartbeat.getProtocol())
                .setTag(heartbeat.getTag())
                .setWorkerAddress(heartbeat.getWorkerAddress())
                .setDelayMs(startMs - heartbeat.getHeartbeatTime())
                .setScore(heartbeat.getSystemMetrics().getScore());
        processWorkerHeartbeat0(heartbeat, event);
        monitorService.monitor(event);
    }

processWorkerHeartbeat方法将heartbeat转换为WorkerHeartbeatEvent,然后执行processWorkerHeartbeat0及monitorService.monitor(event)

processWorkerHeartbeat0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

代码语言:javascript
复制
    protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
        WorkerClusterManagerService.updateStatus(heartbeat);
    }

processWorkerHeartbeat0通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态

WorkerClusterManagerService.updateStatus

tech/powerjob/server/remote/worker/WorkerClusterManagerService.java

代码语言:javascript
复制
    public static void updateStatus(WorkerHeartbeat heartbeat) {
        Long appId = heartbeat.getAppId();
        String appName = heartbeat.getAppName();
        ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
        clusterStatusHolder.updateStatus(heartbeat);
    }

updateStatus先获取appId对应的clusterStatusHolder,然后更新status

ClusterStatusHolder.updateStatus

tech/powerjob/server/remote/worker/ClusterStatusHolder.java

代码语言:javascript
复制
    public void updateStatus(WorkerHeartbeat heartbeat) {

        String workerAddress = heartbeat.getWorkerAddress();
        long heartbeatTime = heartbeat.getHeartbeatTime();

        WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
            WorkerInfo wf = new WorkerInfo();
            wf.refresh(heartbeat);
            return wf;
        });
        long oldTime = workerInfo.getLastActiveTime();
        if (heartbeatTime < oldTime) {
            log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
            return;
        }

        workerInfo.refresh(heartbeat);

        List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
        if (!CollectionUtils.isEmpty(containerInfos)) {
            containerInfos.forEach(containerInfo -> {
                Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                infos.put(workerAddress, containerInfo);
            });
        }
    }

ClusterStatusHolder的updateStatus方法先获取workerInfo,判断其heartbeatTime是否小于lastActiveTime,是则返回,否则执行workerInfo.refresh(heartbeat),最后更新一下heartbeat.getContainerInfos()

refresh

tech/powerjob/server/common/module/WorkerInfo.java

代码语言:javascript
复制
    public void refresh(WorkerHeartbeat workerHeartbeat) {
        address = workerHeartbeat.getWorkerAddress();
        lastActiveTime = workerHeartbeat.getHeartbeatTime();
        protocol = workerHeartbeat.getProtocol();
        client = workerHeartbeat.getClient();
        tag = workerHeartbeat.getTag();
        systemMetrics = workerHeartbeat.getSystemMetrics();
        containerInfos = workerHeartbeat.getContainerInfos();

        lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
        heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();

        if (workerHeartbeat.isOverload()) {
            overloading = true;
            lastOverloadTime = workerHeartbeat.getHeartbeatTime();
            log.warn("[WorkerInfo] worker {} is overload!", getAddress());
        } else {
            overloading = false;
        }
    }

WorkerInfo的refresh方法根据workerHeartbeat更新lastActiveTime及overloading等信息

DisconnectedWorkerFilter

tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java

代码语言:javascript
复制
@Slf4j
@Component
public class DisconnectedWorkerFilter implements WorkerFilter {

    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        boolean timeout = workerInfo.timeout();
        if (timeout) {
            log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
        }
        return timeout;
    }
}

DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout()

timeout

tech/powerjob/server/common/module/WorkerInfo.java

代码语言:javascript
复制
	private static final long WORKER_TIMEOUT_MS = 60000;

    public boolean timeout() {
        long timeout = System.currentTimeMillis() - lastActiveTime;
        return timeout > WORKER_TIMEOUT_MS;
    }

timeout方法判断当前时间与lastActiveTime的时间差,之后与默认的WORKER_TIMEOUT_MS(60s)对比

getSuitableWorkers

tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

代码语言:javascript
复制
    public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

        workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

        DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
        switch (dispatchStrategy) {
            case RANDOM:
                Collections.shuffle(workers);
                break;
            case HEALTH_FIRST:
                workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                break;
            default:
                // do nothing
        }

        // 限定集群大小(0代表不限制)
        if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
            workers = workers.subList(0, jobInfo.getMaxWorkerCount());
        }
        return workers;
    }

    private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        for (WorkerFilter filter : workerFilters) {
            if (filter.filter(workerInfo, jobInfo)) {
                return true;
            }
        }
        return false;
    }    

getSuitableWorkers方法会remove掉filterWorker(workerInfo, jobInfo)为true的worker

小结

PowerJob的WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报;reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat;服务端通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态,主要是执行WorkerInfo的refresh方法,它根据workerHeartbeat更新lastActiveTime及overloading等信息;而DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout(),它会将心跳超时的worker给排除掉。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-12-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • WorkerHealthReporter
  • reportWorkerHeartbeat
  • processWorkerHeartbeat
  • processWorkerHeartbeat0
  • WorkerClusterManagerService.updateStatus
  • ClusterStatusHolder.updateStatus
  • refresh
  • DisconnectedWorkerFilter
  • timeout
  • getSuitableWorkers
  • 小结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档