什么是点点直连
Dubbo正常的请求模型,都是消费端从注册中心拉取服务提供者列表,然后采用适当的负载均衡策略,挑选出一个服务提供者URL,随机发起请求。
但是,Dubbo也给我们提供了一种方式,可以在没有注册中心的时候,直接使用提前设置好的URL发起请求,或者在有注册的中心的时候,绕过注册中心,使用设置好的URL发起请求,这种方式也被称为点点直连。
那么点点直连在实际项目开发过程中,究竟有没有用处呢?
下面我们跟随着实际需求的视角,具体来看看吧。
订单系统这边由于入库订单的状态异常,导致该笔订单消息及时无法推送到供应商系统,从而阻碍了该笔订单在供应商侧的功能运转。
为了争取最短时间内恢复这笔订单的功能运转,我们需要尽快修改这条推送记录在数据库的状态,此时我们可能会想到以下几个做法:
Java代码从编译到执行的流程如下所示:
开发者编写的“Java 源代码”被编译后变成 class 字节码文件,然后字节码文件被 JVM 加载,直到变成可使用的类。
在这样的开发过程中,动态编译一般有两种方式:
出于简单性考虑,本文使用groovy插件实现java代码的动态编译。
由于需要将用于修复的代码上传到生产环境的机器上执行,因此每一个生产环境服务都需要对外暴露一个接口,用于接收动态调试请求:
由于修复代码需要上传到生产环境执行,因此为了避免引发不必要的产线事故,我们一般会拿某台机器节点做个小范围的验证,也就是说,这里需要用到一开始讲到的点点直连技术。
那么下一个问题就来了,如何实现点点直连呢?
Dubbo在ReferenceConfig的父类ReferenceConfigBase类中提供了一个名为Url的字段:
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
/**
* The url for peer-to-peer invocation
* 专为点到点连接而设计的
*/
protected String url;
....
那么该字段的构成规则是怎样的呢? 又是如何起的作用的呢?
下面我们来简单追踪一下url被使用到的地方:
//ReferenceConfig
private T createProxy(Map<String, String> referenceParameters) {
...
// 是否配置了客户端用户点点直连的Url
if (StringUtils.isNotEmpty(url)) {
//如果消费者端配置了url属性,那么dubbo会认为该rul是一个点对点地址,或者是一个注册中心的地址
parseUrl(referenceParameters);
} else {
// dubbo走从注册中心拉取服务提供者url那套逻辑
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
aggregateUrlFromRegistry(referenceParameters);
}
}
createInvokerForRemote();
...
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
parseUrl方法负责解析用户配置的点对点直连URL:
//ReferenceConfig
private void parseUrl(Map<String, String> referenceParameters) {
//按照空格,;对消费者端设置的url进行切分,这里说明一个url属性中,我们可以通过空格或者;设置多个服务提供者的直连地址
//或者指定一个或者多个专属的注册中心地址
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (ArrayUtils.isNotEmpty(us)) {
for (String u : us) {
//解析当前url字符串,并解析为一个Dubbo提供的URL对象返回
URL url = URL.valueOf(u);
//url内部对象urlAddress对象的path属性--具体看下图
if (StringUtils.isEmpty(url.getPath())) {
//大部分情况下我们不会指定path,因此一般path值默认为服务接口名
url = url.setPath(interfaceName);
}
...
//判断我们设置的是否是一个注册中心地址
if (UrlUtils.isRegistry(url)) {
//添加进urls集合保存,并且使用REFER_KEY属性表明当前url代表的是注册中心地址
urls.add(url.putAttribute(REFER_KEY, referenceParameters));
} else {
//将referenceParameters集合中的参数以&的形式拼接在当前url后面,类比http的请求参数url的拼接方式
//然后将拼接完整的url添加进urls集合
URL peerUrl = getScopeModel().getApplicationModel().getBeanFactory().getBean(ClusterUtils.class).mergeUrl(url, referenceParameters);
peerUrl = peerUrl.putAttribute(PEER_KEY, true);
urls.add(peerUrl);
}
}
}
}
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000
从parseUrl方法逻辑可知,dubbo会将客户端各种配置参数以类似http请求参数的url拼接方式组织起来。
createInvokerForRemote方法负责构造发起请求调用的Invoker对象:
private void createInvokerForRemote() {
//此处我们的urls集合中的url只有一个,有多个逻辑这里跳过不看
//如果urls的长度为1,说明只有一个服务提供者,则直接通过protocolSPI.refer方法创建一个Invoker实例,
//如果这个服务提供者不是注册中心,则使用StaticDirectory对这个Invoker进行包装。
//StaticDirectory是Dubbo框架中的一个类,用于将一组Invoker封装成一个目录,以便消费者调用
if (urls.size() == 1) {
URL curUrl = urls.get(0);
//这里根据urlAddress内部的protocol属性作为key,通过dubbo的SPI机制寻找对应协议的实现类
//这里实际调用的是DubboProtocol的refer方法,因为我们这里urlAddress的protocol值为dubbo
invoker = protocolSPI.refer(interfaceClass, curUrl);
//如果当前url并非指代一个注册中心地址
if (!UrlUtils.isRegistry(curUrl)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
//默认情况下Cluster会通过Registry拿到一堆服务提供方的IP地址列表后,然后通过一定的路由和负载均衡策略决定具体选择调用哪一个Provider
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
} else {
...
}
}
public class Protocol$Adaptive
implements Protocol {
public Invoker refer(Class clazz, URL uRL) throws RpcException {
String string;
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
if (string == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + uRL + ") use keys([protocol])");
}
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(uRL.getScopeModel(), Protocol.class);
Protocol protocol = scopeModel.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.refer(clazz, uRL);
}
...
DubboProtocol的refer方法实现如下:
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
return protocolBindingRefer(type, url);
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
...
// create rpc invoker.
// dubbo总共分为十层,各个层之间的交互主要是通过Inovker完成的,可以理解分层的实现是Invoker套Invoker
//这里只需要知道invoker的doInvoke方法中会完成本层应该做的逻辑
//例如这里DubboInvoker会在protocol层完成相关逻辑处理
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
...
return invoker;
}
通过上面的分析可知,dubbo为我们在客户端配置中提供了一个url参数用来实现点点直连,url的构成规则为:
[protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000
可见dubbo的url 的构成规则,居然和 http 的构成规则如出一辙,那我们试着通过赋值 url 为dubbo://[机器IP结点]:[机器IP提供Dubbo服务的端口]
,应该就大功告成了。
准备一个页面,填入 5 个字段信息,接口类名、接口方法名、接口方法参数类名、指定的 URL 节点、修复问题的 Java 代码,然后将这 5 个字段通过 HTTP 请求发往 Web 服务器,Web 服务器接收到请求后组装泛化所需对象,最后通过泛化调用的形式完成功能修复。
@RestController
public class DynamicDebugController {
private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";
@PostMapping("/gateway/dynamic/debug/request")
public Object repairRequest(@RequestBody DynamicDebugRequest dynamicDebugRequest) {
// 将入参的req转为下游方法的入参对象,并发起远程调用
return commonInvoke(dynamicDebugRequest);
}
private Object commonInvoke(DynamicDebugRequest dynamicDebugRequest) {
// 然后试图通过类信息对象想办法获取到该类对应的实例对象
ReferenceConfig<GenericService> referenceConfig =
createReferenceConfig(dynamicDebugRequest.getClassName(), dynamicDebugRequest.getUrl());
// 远程调用
GenericService genericService = referenceConfig.get();
return genericService.$invoke(
dynamicDebugRequest.getMtdName(),
new String[]{dynamicDebugRequest.getParameterTypeName()},
new Object[]{dynamicDebugRequest.getParamsMap()});
}
private static ReferenceConfig<GenericService> createReferenceConfig(String className, String url) {
DubboBootstrap dubboBootstrap = DubboBootstrap.getInstance();
// 设置应用服务名称
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName(dubboBootstrap.getApplicationModel().getApplicationName());
// 设置注册中心的地址
RegistryConfig registryConfig = new RegistryConfig(zookeeperAddress);
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(className);
// 设置泛化调用形式
referenceConfig.setGeneric("true");
// 设置默认超时时间5秒
referenceConfig.setTimeout(5 * 1000);
// 设置点对点连接的地址
referenceConfig.setUrl(url);
return referenceConfig;
}
}
@Setter
@Getter
public class DynamicDebugRequest {
/**
* <h2>接口类名,例:com.provider.one.DynamicDebugService</h2>
**/
private String className;
/**
* <h2>接口方法名,例:dynamicDebug</h2>
**/
private String mtdName;
/**
* <h2>接口方法参数类名,例:com.provider.one.DynamicRequest</h2>
**/
private String parameterTypeName;
/**
* <h2>指定的URL节点,例:dubbo://ip:port</h2>
**/
private String url;
/**
* <h2>可以是调用具体接口的请求参数,也可以是修复问题的Java代码</h2>
**/
private Map<String,String> paramsMap;
}
public class Provider {
private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";
public static void main(String[] args) throws InterruptedException {
//启动内嵌的zk
new EmbeddedZooKeeper(2181, false).start();
//创建ApplicationConfig
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("dynamic-debug-service-provider");
//创建注册中心配置
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(zookeeperAddress);
//新建服务实现类,注意要使用GenericService接收
DynamicDebugService helloService = new DynamicDebugServiceImpl();
//创建服务相关配置
ServiceConfig<DynamicDebugService> service = new ServiceConfig<>();
service.setApplication(applicationConfig);
service.setRegistry(registryConfig);
service.setInterface(DynamicDebugService.class);
service.setRef(helloService);
service.export();
new CountDownLatch(1).await();
}
}
public interface DynamicDebugService {
/**
* 定义了一个专门处理万能修复逻辑的Dubbo接口
*/
Object dynamicDebug(Map<String,String> req);
}
public class DynamicDebugServiceImpl implements DynamicDebugService {
private final GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
@SneakyThrows
@Override
public Object dynamicDebug(Map<String,String> req) {
// 编译 Java 代码,然后变成 JVM 可识别的 Class 对象信息
Class<?> javaClass = compile(req.get("code"));
//和spring结合的扩展思路: 创建实例对象,并经过spring的后置处理
// Object bean = instantiationAndPostProcessBean(javaClass);
//这里没有和spring结合,直接简单实例化即可
Object bean = javaClass.newInstance();
if(!(bean instanceof Function)){
throw new IllegalArgumentException("动态java类并非Function类型");
}
Function<Map<String,String>, Object> function = (Function) bean;
// 执行单例对象的方法即可
return function.apply(req);
}
/**
* 利用 groovy-all.jar 中的 groovyClassLoader 来编译 Java 代码
*/
private Class<?> compile(String javaCode) {
return groovyClassLoader.parseClass(javaCode);
}
/**
* 实例化bean,并经过spring的所有后置处理,但是不放入Spring容器中
*/
private Object instantiationAndPostProcessBean(Class<?> javaClass) {
return ((DefaultListableBeanFactory) SpringUtil.getBeanFactory()).createBean(javaClass);
}
}
测试,首先我们需要准备一个测试类:
public class TestJavaCode implements Function<Map<String,String>,String> {
@Override
public String apply(Map<String,String> s) {
System.out.println("执行动态方法: "+s);
return "res: "+s;
}
}
该类会作为请求参数传递给动态调试控制器,然后由动态调试控制器通过泛化调用,来调用服务端的动态调试服务接口,最终执行测试的apply方法。
{
"className":"com.provider.one.DynamicDebugService",
"mtdName":"dynamicDebug",
"parameterTypeName":"java.util.Map",
"url":"dubbo://192.168.154.1:20880/com.provider.one.DynamicDebugService",
"paramsMap": {
"code": "package com.provider.one.code;import java.util.function.Function;public class TestJavaCode implements Function<Map<String,String>,String> {@Override public String apply(Map<String,String> s) {System.out.println(\"执行动态方法: \"+s);return \"res: \"+s;}}"
}
}
哪些应用场景需要用到点点直连呢?
点点直连实现简单来说,分为如下几步: