本文主要研究一下skywalking的cluster-nacos-plugin
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosConfig.java
public class ClusterModuleNacosConfig extends ModuleConfig {
@Setter @Getter private String serviceName;
@Setter @Getter private String hostPort;
@Setter @Getter private String namespace = "public";
}
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosProvider.java
public class ClusterModuleNacosProvider extends ModuleProvider {
private final ClusterModuleNacosConfig config;
private NamingService namingService;
public ClusterModuleNacosProvider() {
super();
this.config = new ClusterModuleNacosConfig();
}
@Override
public String name() {
return "nacos";
}
@Override
public Class<? extends ModuleDefine> module() {
return ClusterModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
try {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, config.getHostPort());
properties.put(PropertyKeyConst.NAMESPACE, config.getNamespace());
namingService = NamingFactory.createNamingService(properties);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
NacosCoordinator coordinator = new NacosCoordinator(namingService, config);
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
@Override
public void start() throws ServiceNotProvidedException {
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
}
}
skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/NacosCoordinator.java
public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {
private final NamingService namingService;
private final ClusterModuleNacosConfig config;
private volatile Address selfAddress;
public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) {
this.namingService = namingService;
this.config = config;
}
@Override
public List<RemoteInstance> queryRemoteNodes() {
List<RemoteInstance> result = new ArrayList<>();
try {
List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);
if (CollectionUtils.isNotEmpty(instances)) {
instances.forEach(instance -> {
Address address = new Address(instance.getIp(), instance.getPort(), false);
if (address.equals(selfAddress)) {
address.setSelf(true);
}
result.add(new RemoteInstance(address));
});
}
} catch (NacosException e) {
throw new ServiceQueryException(e.getErrMsg());
}
return result;
}
@Override
public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
String host = remoteInstance.getAddress().getHost();
int port = remoteInstance.getAddress().getPort();
try {
namingService.registerInstance(config.getServiceName(), host, port);
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
this.selfAddress = remoteInstance.getAddress();
TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
}
}
ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现