前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的DesignateServer

聊聊PowerJob的DesignateServer

原创
作者头像
code4it
发布2024-01-17 09:13:26
1120
发布2024-01-17 09:13:26
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下PowerJob的DesignateServer

DesignateServer

tech/powerjob/server/remote/server/redirector/DesignateServer.java

代码语言:javascript
复制
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DesignateServer {

    /**
     * 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称,默认为 appId
     * @return appId 参数名称
     */
    String appIdParameterName() default "appId";
}

DesignateServer注解定义了appIdParameterName属性,默认是appId

DesignateServerAspect

tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java

代码语言:javascript
复制
@Slf4j
@Aspect
@Component
@Order(0)
@RequiredArgsConstructor
public class DesignateServerAspect {

    private final TransportService transportService;
    private final AppInfoRepository appInfoRepository;

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Around(value = "@annotation(designateServer))")
    public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {

        // 参数
        Object[] args = point.getArgs();
        // 方法名
        String methodName = point.getSignature().getName();
        // 类名
        String className = point.getSignature().getDeclaringTypeName();
        Signature signature = point.getSignature();
        // 方法签名
        MethodSignature methodSignature = (MethodSignature) signature;
        String[] parameterNames = methodSignature.getParameterNames();
        String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new);

        Long appId = null;
        for (int i = 0; i < parameterNames.length; i++) {
            if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) {
                appId = Long.parseLong(String.valueOf(args[i]));
                break;
            }
        }

        if (appId == null) {
            throw new PowerJobException("can't find appId in params for:" + signature);
        }

        // 获取执行机器
        AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info"));
        String targetServer = appInfo.getCurrentServer();

        // 目标IP为空,本地执行
        if (StringUtils.isEmpty(targetServer)) {
            return point.proceed();
        }

        // 目标IP与本地符合则本地执行
        if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) {
            return point.proceed();
        }

        log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer);
        // 转发请求,远程执行后返回结果
        RemoteProcessReq remoteProcessReq = new RemoteProcessReq()
                .setClassName(className)
                .setMethodName(methodName)
                .setParameterTypes(parameterTypes)
                .setArgs(args);

        final URL friendUrl = ServerURLFactory.process2Friend(targetServer);

        CompletionStage<AskResponse> askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class);
        AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

        if (!askResponse.isSuccess()) {
            throw new PowerJobException("remote process failed: " + askResponse.getMessage());
        }

        // 考虑范型情况
        Method method = methodSignature.getMethod();
        JavaType returnType = getMethodReturnJavaType(method);

        return OBJECT_MAPPER.readValue(askResponse.getData(), returnType);
    }

    //......
}    

DesignateServerAspect拦截了@DesignateServer注解,它先解析方法参数名,取出参数名与@DesignateServer的appIdParameterName一致的参数值,再通过appInfoRepository.findById找到AppInfoDO,获取appInfo.getCurrentServer();若currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求

示例

tech/powerjob/server/core/instance/InstanceLogService.java

代码语言:javascript
复制
    /**
     * 获取日志的下载链接
     * @param appId AOP 专用
     * @param instanceId 任务实例 ID
     * @return 下载链接
     */
    @DesignateServer
    public String fetchDownloadUrl(Long appId, Long instanceId) {
        String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
        log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
        return url;
    }

fetchDownloadUrl指定了@DesignateServer注解,会根据appId的值限定在指定server执行

小结

PowerJob的DesignateServer注解定义了appIdParameterName属性,默认是appId;DesignateServerAspect拦截了@DesignateServer注解,它判断currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求到指定server执行。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DesignateServer
  • DesignateServerAspect
  • 示例
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档