前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的ServerController

聊聊PowerJob的ServerController

原创
作者头像
code4it
发布2024-02-06 20:50:40
880
发布2024-02-06 20:50:40
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的ServerController

ServerController

tech/powerjob/server/web/controller/ServerController.java

代码语言:javascript
复制
@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {

    private ServerInfo serverInfo;
    private final TransportService transportService;

    private final ServerElectionService serverElectionService;

    private final AppInfoRepository appInfoRepository;

    private final WorkerClusterQueryService workerClusterQueryService;

    @GetMapping("/assert")
    public ResultDTO<Long> assertAppName(String appName) {
        Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
        return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId())).
                orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));
    }

    @GetMapping("/assertV2")
    public ResultDTO<WorkerAppInfo> assertAppNameV2(String appName) {
        Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
        return appInfoOpt.map(appInfoDO -> {
                    WorkerAppInfo workerAppInfo = new WorkerAppInfo().setAppId(appInfoDO.getId());
                    return ResultDTO.success(workerAppInfo);
                }).
                orElseGet(() -> ResultDTO.failed(String.format("app(%s) is not registered! Please register the app in oms-console first.", appName)));
    }

    @GetMapping("/acquire")
    public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
        return ResultDTO.success(serverElectionService.elect(request));
    }

    @GetMapping("/hello")
    public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
        JSONObject res = new JSONObject();
        res.put("localHost", NetUtils.getLocalHost());
        res.put("serverInfo", serverInfo);
        res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
        res.put("serverTimeTs", System.currentTimeMillis());
        res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
        res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet());
        if (debug) {
            res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(workerClusterQueryService.getAppId2ClusterStatus())));
        }

        try {
            res.put("defaultAddress", JSONObject.toJSON(transportService.defaultProtocol()));
        } catch (Exception ignore) {
        }

        return ResultDTO.success(res);
    }

    @Override
    public void setServerInfo(ServerInfo serverInfo) {
        this.serverInfo = serverInfo;
    }
}

ServerController实现了ServerInfoAware接口,它提供了assert、assertV2、acquire、hello接口;其中assert接口用于判断指定的appName是否存在,assertV2返回的是WorkerAppInfo;acquire委托给了serverElectionService.elect(request);hello接口返回server端的localhost、serverInfo、serverTime等信息

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

代码语言:javascript
复制
    public String elect(ServerDiscoveryRequest request) {
        if (!accurate()) {
            final String currentServer = request.getCurrentServer();
            // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
            Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
            if (localProtocolInfoOpt.isPresent()) {
                if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
                    log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId());
                    return currentServer;
                }
            }
        }
        return getServer0(request);
    }

    private boolean accurate() {
        return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;
    }    

ServerElectionService的elect方法接收ServerDiscoveryRequest,它先判断是否accurate(判断100以内的随机数是否小于accurateSelectServerPercentage,默认50),是则执行getServer0,否则则先判断ProtocolInfo的address是否是currentServer,是则直接返回,否则还是走getServer0

getServer0

代码语言:javascript
复制
    private String getServer0(ServerDiscoveryRequest discoveryRequest) {

        final Long appId = discoveryRequest.getAppId();
        final String protocol = discoveryRequest.getProtocol();
        Set<String> downServerCache = Sets.newHashSet();

        for (int i = 0; i < RETRY_TIMES; i++) {

            // 无锁获取当前数据库中的Server
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if (!appInfoOpt.isPresent()) {
                throw new PowerJobException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            String activeAddress = activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty(activeAddress)) {
                return activeAddress;
            }

            // 无可用Server,重新进行Server选举,需要加锁
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.tryLock(lockName, 30000);
            if (!lockStatus) {
                try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {

                // 可能上一台机器已经完成了Server选举,需要再次判断
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty(address)) {
                    return address;
                }

                // 篡位,如果本机存在协议,则作为Server调度该 worker
                final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
                if (targetProtocolInfo != null) {
                    // 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址
                    appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                    appInfo.setGmtModified(new Date());

                    appInfoRepository.saveAndFlush(appInfo);
                    log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                    return targetProtocolInfo.getExternalAddress();
                }
            }catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
            } finally {
                lockService.unlock(lockName);
            }
        }
        throw new PowerJobException("server elect failed for app " + appId);
    }

getServer0执行一个循环(最大10次),它先根据appId获取AppInfoDO,然后通过activeAddress判断server是否存活,是则返回,否则重新进行server选举

activeAddress

代码语言:javascript
复制
    private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {

        if (downServerCache.contains(serverAddress)) {
            return null;
        }
        if (StringUtils.isEmpty(serverAddress)) {
            return null;
        }

        Ping ping = new Ping();
        ping.setCurrentTime(System.currentTimeMillis());

        URL targetUrl = ServerURLFactory.ping2Friend(serverAddress);
        try {
            AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class)
                    .toCompletableFuture()
                    .get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (response.isSuccess()) {
                // 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址
                final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol);
                if (protocolInfo != null) {
                    downServerCache.remove(serverAddress);
                    ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class);
                    log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol);
                    // 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容
                    return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress());
                } else {
                    log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress);
                }
            }
        } catch (TimeoutException te) {
            log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress);
        } catch (Exception e) {
            log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e);
        }
        downServerCache.add(serverAddress);
        return null;
    }

activeAddress通过transportService.ask请求ping接口,1s超时,若成功则从downServerCache移除,返回remoteProtocol.getAddress(),若失败则将该serverAddress加入downServerCache

小结

ServerController实现了ServerInfoAware接口,它提供了assert、assertV2、acquire、hello接口;其中assert接口用于判断指定的appName是否存在,assertV2返回的是WorkerAppInfo;acquire委托给了serverElectionService.elect(request);hello接口返回server端的localhost、serverInfo、serverTime等信息。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ServerController
  • elect
    • getServer0
      • activeAddress
      • 小结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档