上篇博客讲到了Protocol接口的export()方法。
在这篇博客讲解前,希望读者可以先了解一下SPI机制:SPI(Service Provider Interface)服务提供者接口
是提供给服务厂商,或者框架扩展者的接口,例如JDK
中的java.sql.Driver
,dubbo
中的com.alibaba.dubbo.rpc.Protocol
等等。
不同服务厂商只需对接口进行自己的实现,然后在META-INF下services目录中创建以接口全限定名为名字的文件,然后写入自己的实现类的全限定名即可。
Java提供了一种服务发现类:Serviceloader,下面是一个小例子。
对java.sql.Driver接口添加两个实现类AImplDriver,BImplDriver
将两个实现类打包为jar,并且在META-INF/services/下添加java.sql.Driver文件,并将jar包添加到依赖中
编写Test.java进行测试
public class Test {
public static void main(String[] args) throws SQLException {
ServiceLoader<Driver> loader = ServiceLoader.load(Driver.class);
Iterator<Driver> iterator = loader.iterator();
while (iterator.hasNext()) {
iterator.next().connect(null, null);
}
}
}
那么,理想情况下,应该是两个实现类都被加载,那么console应该打印
A Impl connector ......
B Impl connector ......
那么实际情况呢?
跟预料的一样。
其实mysql和oracle等的驱动包等等也是这样做的。
//file: java.sql.Driver
com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver
我认为提供SPI这种机制,可以实现良好的热插拔,加入减少jar包,无需修改代码,提供动态的服务发现。
dubbo针对SPI机制实现了自己的服务发现类ExtensionLoader
用以加载SPI的实现类。它会加载META-INF/dubbo/internal
、META-INF/dubbo
、META-INF/services
下的以SPI接口名命名的文件中的实现类。
然后根据自定义,或者javasist/jdk
生成的适配类来调用这些实现类。这里不对其进行多讲,想了解的可以百度dubbo spi
机制和查看ExtensionLoader
的实现。
因为我们没有提供默认的适配类,因此生成的Protocol实例为javasist生成的Protocol$Adapter适配类,它最终会调用根据我们url协议对应的Protocol实现类,因为当前我们的invoker的URL协议为registry所以应该调用RegistryPortocol的export()方法。为什么说最终呢?因为它会被构造方法为此SPI的SPI实现类所包装。
Exporter<?> exporter = protocol.export(wrapperInvoker);
#1 在本地暴露服务
#2 与注册中心建立连接获取Registry实例,一般上用的是项目中用的是zookeeper,这里就是ZookeeperRegistry
#3 向注册中心注册服务
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); #1
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker); #2
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl); #3
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
在此方法中,invoker的协议为dubbo,因为会调用DubboRegistry的export()方法,将暴露后返回的exporter缓存起来。
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
#1 将服务名+端口号作为key,netty服务端会根据key来调用对应的exporter,以调用invoker
#2 构建exporter对象,并缓存到exporterMap,以便后续进行选择调用。
#3 是否为本地存根
#4 启动netty服务端,开始监听。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url); #1
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); #2
exporterMap.put(key, exporter);
//export an stub service for dispatching event #3
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url); #4
optimizeSerialization(url);
return exporter;
}
完成以上工作后,netty客户端就可以根据key,在服务端找到对应的exporter,然后根据客户端传入的接口名方法名,参数等信息,调用invoker中ref对应的方法,然后服务端将结果返回给客户端就ok了!
#1 获取url中ip:port for example : 127.0.0.1:20880
#2 判断是客户端还是服务端
#3 没有服务端的话,就创建一个.
#4 如果已经存在服务端,则需要根据url中的心跳,和心跳超时时间等参数reset服务端。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) { #2
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url)); #3
} else {
// server supports reset, use together with override
server.reset(url); #4
}
}
}
server = Exchangers.bind(url, requestHandler); 的作用是启动netty服务端,它的ChannelHandler逻辑是在requestHandler中
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); ####
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
reply()是对入站数据进行处理的方法,也就是客户端传给我们的要调用的类,方法的信息。
message已被解码为Invocation接口类型,invocation封装了获取方法名,参数类型,参数...等方法。
#1 从exporterMap中根据接口名和端口作为key获取exporter对象中的invoker以用来调用。
#2 如果是回调服务,则会检验此方法是否存在
#3 进行实际方法的调用,将返回结果用RpcResult封装,编码后返回给客户端。就完成了远程调用的操作
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv); #1
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { #2
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv); #3
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。