本文主要研究一下skywalking的spring-webflux-plugin
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.java
public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("handle");
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName("org.springframework.web.reactive.DispatcherHandler");
}
}
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.java
public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
EnhancedInstance instance = getInstance(allArguments[0]);
ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
ContextCarrier carrier = new ContextCarrier();
CarrierItem next = carrier.items();
HttpHeaders headers = exchange.getRequest().getHeaders();
while (next.hasNext()) {
next = next.next();
List<String> header = headers.get(next.getHeadKey());
if (header != null && header.size() > 0) {
next.setHeadValue(header.get(0));
}
}
AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier);
span.setComponent(ComponentsDefine.SPRING_WEBFLUX);
SpanLayer.asHttp(span);
Tags.URL.set(span, exchange.getRequest().getURI().toString());
HTTP.METHOD.set(span, exchange.getRequest().getMethodValue());
instance.setSkyWalkingDynamicField(ContextManager.capture());
span.prepareForAsync();
ContextManager.stopSpan(span);
return ((Mono) ret).doFinally(s -> {
try {
Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE);
if (pathPattern != null) {
span.setOperationName(((PathPattern) pathPattern).getPatternString());
}
HttpStatus httpStatus = exchange.getResponse().getStatusCode();
// fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support
if (httpStatus != null) {
Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));
if (httpStatus.isError()) {
span.errorOccurred();
}
}
} finally {
span.asyncFinish();
}
});
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
public static EnhancedInstance getInstance(Object o) {
EnhancedInstance instance = null;
if (o instanceof DefaultServerWebExchange) {
instance = (EnhancedInstance) o;
} else if (o instanceof ServerWebExchangeDecorator) {
ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();
return getInstance(delegate);
}
return instance;
}
}
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.java
public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor";
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName("org.springframework.web.server.adapter.DefaultServerWebExchange");
}
}
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.java
public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
}
}
DispatcherHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor增强org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor增强org.springframework.web.server.adapter.DefaultServerWebExchange的所有方法