前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo下的多版本并行开发测试解决方案(服务路由)

Dubbo下的多版本并行开发测试解决方案(服务路由)

作者头像
石臻臻的杂货铺[同名公众号]
发布2021-07-14 10:33:38
6860
发布2021-07-14 10:33:38
举报
文章被收录于专栏:kafka专栏

在很久之前的文章多版本并行开发测试解决方案 中挖了个坑 今天来给填上; 今天主要讲解实现方案;

主要思路

  1. 给不同版本的dubbo服务打上 标签version上
  2. 在dubbo 提供和消费的出入口上 带上 标签version
  3. 服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本

是不是很简单,就是打个标签,然后路由的时候找相同服务嘛

简单代码

打标签

写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;

代码语言:javascript
复制
/**
 * @author shirenchuang
 * Registry 的包装类
 * 修改URL 中的Application
 * @date 2019/12/5  8:35 下午
 */

public class DevVersionRegisterWrapper implements Registry {

    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    private Registry registry;

    /**
     * 注入Register
     * @param registry
     */
    public DevVersionRegisterWrapper(Registry registry) {
        this.registry = registry;
    }

    @Override
    public URL getUrl() {
        return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
    }

    @Override
    public boolean isAvailable() {
        return registry.isAvailable();
    }

    @Override
    public void destroy() {
        registry.destroy();
    }

    @Override
    public void register(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }

    @Override
    public void unregister(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }

    @Override
    public List<URL> lookup(URL url) {
        return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
}

写一个RegistryFactory的包装类

代码语言:javascript
复制
/**
 * @author shirenchuang
 * RegistryFactory 的包装类,在注册的时候 修改一下 Application
 * 如果是 迭代环境则把Appliacation=Application_迭代版本号
 * @date 2019/12/5  8:29 下午
 */

public class DevVersionRegisterFactoryWrapper implements RegistryFactory {

    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    private RegistryFactory registryFactory;
    /**
     * 注入RegisterFactory
     */
    public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }

    @Override
    public Registry getRegistry(URL url) {
        //获取当前环境的迭代版本号
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            logger.info("=====启动的服务是迭代版本服务  devVersion:{}=====",MyThreadLocal.localVersion);
            System.out.println("====启动的服务是迭代版本服务  devVersion:"+MyThreadLocal.localVersion);
            return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
        }
        logger.info("=====启动的服务是稳定版本====");
        System.out.println("=====启动的服务是稳定版本====");
        return registryFactory.getRegistry(url);
    }

    public static URL changeApplication(URL url){
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            String applicationKey = url.getParameter(Constants.APPLICATION_KEY)+MyThreadLocal.spiltString+MyThreadLocal.localVersion;
            URL url2 = url.addParameter(Constants.APPLICATION_KEY,
                    applicationKey);

            logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
            return url2;
        }
        return url;
    }
}

服务路由

Invoker 包装类

代码语言:javascript
复制
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterInvoker<T> implements Invoker<T> {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    private final Directory<T> directory;

    private final Invoker<T> invoker;

    public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
        this.directory = directory;
        this.invoker = invoker;
    }

    @Override
    public URL getUrl() {
        return directory.getUrl();
    }

    @Override
    public boolean isAvailable() {
        return directory.isAvailable();
    }

    @Override
    public void destroy() {
        this.invoker.destroy();
    }

    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 找到迭代版本号
        return doDevVersionInvoke(invocation, null);
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
        Result result ;
        Invoker<T> minvoker;

        List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
        if (devVersionInvokers==null||devVersionInvokers.size()==0) {
            logger.error("没有找到服务啊~~~~ ");
            throw new RpcException("没有找到服务啊~~~~");
        } else {
            minvoker = devVersionInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
        }
        return result;
    }

    private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
        String msg = "devVersion error : " + mt.getMessage();
        if (t != null) {
            msg = msg + ", invoke error is :" + StringUtils.toString(t);
        }
        return msg;
    }


    /**
     * 获取对应迭代版本服务
     * @param invocation
     * @return
     */
    private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        if (invocation instanceof RpcInvocation) {
            try {
                /**其实我们也可以给directory生生一个代理类,来做帅选操作**/
                invokers = directory.list(invocation);
                //经过了dubbo的栓选之后,我们来找自己需要的Invokes
                String devVersion = MyThreadLocal.getDevVersion();
                List<Invoker<T>> newInvokers = new ArrayList<>();
                List<Invoker<T>> stableInvokers = new ArrayList<>();

                for (Invoker invoker : invokers){
                    URL providerUrl ;
                    //获取应用名称
                    Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
                    getProviderUrl.setAccessible(true);
                    providerUrl = (URL)getProviderUrl.invoke(invoker);
                    String application  = providerUrl.getParameter(Constants.APPLICATION_KEY);

                    if(StringUtils.isEmpty(devVersion)){
                        if(application.indexOf(MyThreadLocal.spiltString)==-1){
                            //不是迭代过来或者本身不是迭代的请求    只能访问非迭代版本
                            newInvokers.add(invoker);
                        }
                    }else {
                        //是迭代的请求  就需要找对应的迭代服务
                        if(application.indexOf(MyThreadLocal.spiltString)!=-1){
                            String version = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
                            if(version.equals(devVersion)){
                                newInvokers.add(invoker);
                            }
                        }
                    }

                    //找到稳定环境
                    if(application.indexOf(MyThreadLocal.spiltString)==-1){
                        stableInvokers.add(invoker);
                    }



                }
                if(newInvokers==null||newInvokers.size()==0){
                    String serviceName = directory.getInterface().getName();
                    if(StringUtils.isEmpty(devVersion)){
                        String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>"+serviceName+" 的稳定版本!!";
                        logger.error(error);
                        throw new RuntimeException(error);
                    }else {
                        // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境  )
                        if(stableInvokers!=null&&stableInvokers.size()>0){
                            StringBuffer sb = new StringBuffer();
                            sb.append("=======当前cap请求的版本为:").append(devVersion)
                                    .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
                                    .append("没有找到与之对应的迭代版本;将会调用稳定版本");
                            logger.info(sb.toString());
                            return stableInvokers;
                        }else {
                            //可能有其他的迭代服务,但是不调用
                            logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
                            throw new RuntimeException("当前请求迭代版本:"+devVersion+",但是不存在迭代服务,也没有找到稳定服务;"+serviceName);
                        }
                    }
                }else {
                    return newInvokers;
                }

            } catch (RpcException e) {

                logger.error("获取 迭代版本 的服务时 发生错误~~:"+ directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        , e);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }
        return invokers;
    }



    @Override
    public String toString() {
        return "invoker :" + this.invoker + ",directory: " + this.directory;
    }


    public static void main(String[] args) {

        String application = "application"+MyThreadLocal.spiltString+"1.0.1";
        boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
        application = application.substring(application.indexOf(MyThreadLocal.spiltString)+5);
        System.out.print(application);

    }
}
代码语言:javascript
复制
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterWrapper implements Cluster {

    private Cluster cluster;

    public DevVersionClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        //如果自己是迭代环境,则使用包装


        return new DevVersionClusterInvoker(directory,
                this.cluster.join(directory));
    }

}

拦截器 带入 标签Version

消费者拦截器

代码语言:javascript
复制
/**
 * @Description 消费别人服务的时候会走到这里
 *              要把 迭代版本号 放到参数里面传到 服务提供者
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        String traceId = TraceUtil.getTraceId();
        RpcContext.getContext().setAttachment("myTraceId",traceId);

        String toDevVersion = MyThreadLocal.getDevVersion();
        RpcContext.getContext().setAttachment("devVersion",toDevVersion);
        doLog(invoker,invocation,traceId);
        return invoker.invoke(invocation);

    }

    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
                .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
        .append(" ;调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

提供者拦截器

代码语言:javascript
复制
/**
 * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
 *              然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
        TraceUtil.traceLocal.set(fromTraceId);
        String myTraceId = TraceUtil.getTraceId();

        String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
        //放入到本地线程存放
        MyThreadLocal.devVersion.set(fromDevVersion);
        doLog(invoker,invocation,myTraceId);
        return invoker.invoke(invocation);
    }


    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName + "." + method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append(" ;将被调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }



}

标签的存取

在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;

代码语言:javascript
复制
public class MyThreadLocal {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
    //public static ThreadLocal devVersion = new ThreadLocal();

    /**用户Application评价的固定字符;**/
    public static String spiltString = "_dev_";

    public static String localVersion  ;

    static {
        localVersion = System.getProperty("localVersion");
        logger.info("====devVersion:{}   ========",devVersion);
        System.out.println("s====devVersion:   ========"+devVersion);
    }


    public static String getFromVersion(){
        return devVersion.get();
    }

    /**
     * 如果本地变量没有  则可能是第一个发起方;
     * 则去当前服务的版本号,然后一直传递下去;
     * @return
     */
    public static String getDevVersion(){
        String fromVersion = getFromVersion();
        if(!StringUtils.isEmpty(fromVersion)){
            return fromVersion;
        }
        return localVersion;
    }

}

dubbo spi的配置

将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效

重点问题说明

  1. 上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖 参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用
  2. ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/01/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 主要思路
  • 简单代码
    • 打标签
      • 服务路由
        • 拦截器 带入 标签Version
          • 标签的存取
            • dubbo spi的配置
            • 重点问题说明
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档