专栏首页一杯82年的JAVA从0.5到1写个rpc框架 - 1:服务注册/发现(eureka)

从0.5到1写个rpc框架 - 1:服务注册/发现(eureka)

首先来实现一个服务注册发现的功能

- acuprpc
    + acuprpc-core  //server/client核心处理逻辑
    + acuprpc-spring-boot-starter  //server端服务扫描,client端动态代理,服务注册/发现

Eureka Server

spring-cloud-starter-eureka-server

Eureka Client

原理就是利用eureka提供的客户端类来向Eureka Server发送注册请求,把自己提供服务的地址和端口(rpc服务端口,不是springboot启动的http端口)告诉注册中心,这样其他客户端(包括自身)就可以请求Eureka Server获取需要的服务节点信息。

/**
 * 在服务中心注册的实例
 */
@Getter
@Slf4j
public class RpcInstance {

    private EurekaClient eurekaClient;

    private ApplicationInfoManager applicationInfoManager;

    private RpcConf rpcConf;

    public RpcInstance(RpcConf rpcConf) {
        RpcEurekaInstanceConfig instanceConfig = new RpcEurekaInstanceConfig();

        instanceConfig.setAppGroupName(rpcConf.getAppGroup());
        instanceConfig.setAppname(rpcConf.getAppName());
        instanceConfig.setNonSecurePort(rpcConf.getPort());
        instanceConfig.setIpAddress(IpUtil.INTRANET_IP);
        instanceConfig.setHostname(IpUtil.HOSTNAME);

        RpcEurekaClientConfig clientConfig = new RpcEurekaClientConfig();
        clientConfig.getServiceUrl().put("default", rpcConf.getDiscoveryServiceUrl());
        clientConfig.setRegisterWithEureka(rpcConf.isRegisterWithDiscovery());

        InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
        this.applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
        this.eurekaClient = new DiscoveryClient(applicationInfoManager, clientConfig);
        this.rpcConf = rpcConf;
        log.info("protocol server -> " + rpcConf.getRpcServerClass());
        log.info("protocol client -> " + rpcConf.getRpcClientClass());
    }

    public void start() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.STARTING);
    }

    public void started() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
    }

    public void shutdown() {
        applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
        eurekaClient.shutdown();
    }

    /**
     * 创建一个rpc server,根据配置的调用方式(实现类)生成对象
     */
    @SneakyThrows
    public RpcServer newRpcServer() {
        return rpcConf.getRpcServerClass().getConstructor(RpcInstance.class).newInstance(this);
    }

    /**
     * 创建一个rpc client,根据配置的调用方式(实现类)生成对象
     */
    @SneakyThrows
    public RpcClient newRpcClient(NodeInfo nodeInfo) {
        return rpcConf.getRpcClientClass().getConstructor(NodeInfo.class).newInstance(nodeInfo);
    }
}

starter

构建一个自己的spring boot starter,这样别的项目只需要引入这个依赖,就能使用starter提供的服务了。

- resources
    - META-INF
        spring.factories // 定义@Configuration类的路径,有了这个声明依赖starter的项目就能获得starter中提供的bean
        spring-configuration-metadata.json // 配置信息(可选),有了它在IDE中编辑application配置文件可以看到提示信息

rpc server 服务管理

作为rpc服务提供者,需要在应用启动时把有注解(@Rpc)的服务管理起来,这样接收到rpc请求后可以快速查询到指定对象,执行指定方法。

实现接口BeanPostProcessor的bean即可得到处理spring中的所有bean(每个bean初始化完成后会调用接口方法)。

public class RpcServiceScanner implements BeanPostProcessor {

    private RpcServer rpcServer;

    public RpcServiceScanner(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // AOP代理类需要拿到原始的类,不然读不到类上的注解
        Class<?> beanClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
        val nrpc = beanClass.getAnnotation(Rpc.class);
        if (nrpc == null) {
            return bean;
        }

        Method[] methods = beanClass.getDeclaredMethods();
        if (methods.length == 0) {
            return bean;
        }
        Map<String, MethodInfo> methodInfoMap = new HashMap<>();
        for (Method method : methods) {
            methodInfoMap.put(method.getName(), new MethodInfo(method));
        }

        Class<?>[] interfaces = beanClass.getInterfaces();
        if (interfaces.length == 0) {
            return bean;
        }
        // client是通过接口调用server的,因此并不知道具体实现类的路径,只有接口名,因此把所有接口都注册一遍
        for (Class<?> serviceInterface : interfaces) {
            rpcServer.registerService(
                    new RpcServiceInfo(rpcServer.getRpcInstance().getRpcConf().getAppName(),
                            serviceInterface.getCanonicalName()),
                    bean,
                    serviceInterface,
                    methodInfoMap);
        }
        return bean;
    }
}

rpc client 远程服务代理

作为服务调用者,可以通过接口像调用本地代码一样调用远程服务,原理就是为接口创建一个代理,在代理中进行远程调用。

这里使用主动创建代理的方式。

public class RpcServiceConsumer {

    private RpcClientManager rpcClientManager;

    public RpcServiceConsumer(RpcClientManager rpcClientManager) {
        this.rpcClientManager = rpcClientManager;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(String appName, Class<T> serviceInterface) {
        RpcServiceInfo serviceInfo = new RpcServiceInfo(appName, serviceInterface.getCanonicalName());
        return (T) Proxy.newProxyInstance(
                serviceInterface.getClassLoader(),
                new Class<?>[]{serviceInterface},
                new RpcInvocationHandler(serviceInfo, rpcClientManager));
    }
}
public class RpcInvocationHandler implements InvocationHandler {

    private RpcServiceInfo rpcServiceInfo;

    private RpcServiceManager rpcServiceManager;

    private RpcClient rpcClient;

    public RpcInvocationHandler(RpcServiceInfo rpcServiceInfo, RpcServiceManager rpcServiceManager) {
        this.rpcServiceInfo = rpcServiceInfo;
        this.rpcServiceManager = rpcServiceManager;
        tryInitRpcClient(false);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcMethodInfo methodInfo = new RpcMethodInfo(rpcServiceInfo, method.getName(), method.getGenericReturnType());
        return tryGetRpcClient().invoke(methodInfo, args);
    }

    private RpcClient tryGetRpcClient() {
        if (rpcClient == null) {
            tryInitRpcClient(true);
        }
        return rpcClient;
    }

    private synchronized void tryInitRpcClient(boolean throwError) {
        if (rpcClient != null) {
            return;
        }
        try {
            rpcClient = rpcServiceManager.lookup(rpcServiceInfo);
        } catch (Exception e) {
            if (throwError) {
                throw e;
            }
        }
    }
}

注册一个ApplicationListener,接收springboot程序准备完后的信号,然后告诉注册中心准备好了。

public class RpcApplicationListener implements ApplicationListener<ApplicationReadyEvent> {

    private RpcServer rpcServer;

    public RpcApplicationListener(RpcServer rpcServer) {
        this.rpcServer = rpcServer;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        rpcServer.started();
    }
}

1:服务注册/发现(eureka)

2:远程服务调用(grpc)

3:远程服务调用(thrift)

4:request filter

5:服务监控和管理(actuator)

6:调用异常节点自动重试

7:网关支持(gateway)

关注我查看本系列全部内容~

本文分享自微信公众号 - 一杯82年的JAVA(acupjava),作者:acupt

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 从0.5到1写个rpc框架 - 2:远程服务调用(grpc)

    gRPC是Google开源的跨语言远程服务调用(RPC)框架,通信协议用的HTTP/2,数据传输默认用的protocol buffers(一种轻便高效的结构化数...

    acupt
  • JAVA中有趣的移位操作

    上次介绍了JAVA中有趣的位运算,知道了位运算是直接对一个整形的二进制位进行操作,效率上比起加减乘除高不少,因此常运用在对性能很敏感的场景。

    acupt
  • 从0.5到1写个rpc框架 - 5:服务监控和管理(actuator)

    springboot项目中只要引入spring-boot-starter-actuator就可以得到一些管理服务的接口,比如停止服务,获取服务信息等。他用的并不...

    acupt
  • springboot集成shiro

    是不是发现一般权限模块都是领导(或者技术大神)写的,你只管用就好了?不想当领导的程序猿不是好厨子。废话少说,先简单说下怎么用,黑猫白猫,会写权限才可能是高级猫,...

    小尘哥
  • 初探Linux

    操作系统(Operating System,OS)是软件的一部分,它是硬件基础上的第一层软件,是硬件和其它软件沟通的桥梁。

    鱼丸葱面
  • kubectl 命令自动补全

    原文地址: https://blog.csdn.net/wenwenxiong/article/details/53105287

    二狗不要跑
  • SSM框架使用POI技术导出Excel表

    POI框架是Apache开源的可以导出导入Excel表的,本博客介绍在SSM(Spring+SpringMVC+Mybatis)项目里,如何使用POI框架,导出...

    SmileNicky
  • RPC编程

    RPC全称Remote Procedure Call,即远程方法调用。它的表现形式是:

    小蜜蜂
  • android 有阻尼下拉刷新列表的实现方法

    虽然效果图看起来样子不太好看,主要是因为那个蓝色的背景对不对,没关系,这只是一个背景而已,在了解了我们这个下拉刷新列表的实现之后,你就可以很轻松地修改这个背景,...

    砸漏
  • MySQL之source命令

    今天上班的时候,开发的同事拿过来一个.zip的压缩包文件,说是要把里面的数据倒入到数据库里面,本来想着是成型的SQL,只需要复制粘贴一下,倒入到数据库中就可以...

    AsiaYe

扫码关注云+社区

领取腾讯云代金券