前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊skywalking的http-async-client-plugin

聊聊skywalking的http-async-client-plugin

原创
作者头像
code4it
修改2020-03-11 10:07:36
7890
修改2020-03-11 10:07:36
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下skywalking的http-async-client-plugin

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/resources/skywalking-plugin.def

代码语言:javascript
复制
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation
  • skywalking的http-async-client-plugin定义了三个增强,分别是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

HttpAsyncClientInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncClientInstrumentation.java

代码语言:javascript
复制
public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
​
    private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient";
    private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient";
    private static final String METHOD = "execute";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor";
    private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer";
​
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return null;
    }
​
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(METHOD).and(takesArguments(4)
                        .and(takesArgument(0, named(FIRST_ARG_TYPE))));
            }
​
            @Override
            public String getMethodsInterceptor() {
                return INTERCEPTOR_CLASS;
            }
​
            @Override
            public boolean isOverrideArgs() {
                return true;
            }
        }
        };
    }
​
    @Override
    protected ClassMatch enhanceClass() {
        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL);
    }
}
  • HttpAsyncClientInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor增强了org.apache.http.impl.nio.client.MinimalHttpAsyncClient以及org.apache.http.impl.nio.client.InternalHttpAsyncClient类的有四个参数且第一个参数类型为org.apache.http.nio.protocol.HttpAsyncRequestProducer的execute方法

HttpAsyncClientInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptor.java

代码语言:javascript
复制
public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor {
​
​
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1];
        HttpContext context = (HttpContext) allArguments[2];
        FutureCallback callback = (FutureCallback) allArguments[3];
        allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer);
        allArguments[3] = new FutureCallbackWrapper(callback);
        CONTEXT_LOCAL.set(context);
    }
​
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }
​
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
​
    }
}
  • HttpAsyncClientInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法使用HttpAsyncResponseConsumerWrapper包装了第二个参数,使用FutureCallbackWrapper包装了第四个参数,最后执行CONTEXT_LOCAL.set(context);

SessionRequestInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/SessionRequestInstrumentation.java

代码语言:javascript
复制
public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
​
    private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor";
    private static final String COMPLETED_METHOD = "completed";
    private static final String TIMEOUT_METHOD = "timeout";
    private static final String FAILED_METHOD = "failed";
    private static final String CANCEL_METHOD = "cancel";
    private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor";
    private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor";
    private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";
​
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return any();
            }
​
            @Override
            public String getConstructorInterceptor() {
                return CONSTRUCTOR_INTERCEPTOR_CLASS;
            }
        }
        };
    }
​
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(COMPLETED_METHOD);
            }
​
            @Override
            public String getMethodsInterceptor() {
                return SUCCESS_INTERCEPTOR_CLASS;
            }
​
            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        },new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD)));
            }
​
            @Override
            public String getMethodsInterceptor() {
                return FAIL_INTERCEPTOR_CLASS;
            }
​
            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
        };
    }
​
    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);
    }
}
  • SessionRequestInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它增强的类为org.apache.http.impl.nio.reactor.SessionRequestImpl;它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor增强了构造器方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor增强了completed方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor增强了timeout、failed、cancel方法

SessionRequestConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestConstructorInterceptor.java

代码语言:javascript
复制
public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            if (ContextManager.activeSpan().isExit()) {
                CONTEXT_LOCAL.remove();
                return;
            }
            ContextSnapshot snapshot = ContextManager.capture();
            objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()});
        }
        CONTEXT_LOCAL.remove();
    }
}
  • SessionRequestConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法执行ContextManager.capture()并将snapshot及CONTEXT_LOCAL设置到object的skyWalkingDynamicField

SessionRequestCompleteInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestCompleteInterceptor.java

代码语言:javascript
复制
public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor {
​
    public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>();
​
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        Object[] array = (Object[]) objInst.getSkyWalkingDynamicField();
        if (array == null || array.length == 0) {
            return;
        }
        ContextSnapshot snapshot = (ContextSnapshot) array[0];
        ContextManager.createLocalSpan("httpasyncclient/local");
        if (snapshot != null) {
            ContextManager.continued(snapshot);
        }
        CONTEXT_LOCAL.set((HttpContext) array[1]);
​
​
    }
​
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }
​
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
​
    }
}
  • SessionRequestCompleteInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法主要是获取snapshot,执行ContextManager.continued(snapshot),并设置CONTEXT_LOCAL

SessionRequestFailInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestFailInterceptor.java

代码语言:javascript
复制
public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor {
​
​
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        //this means actual request will not started. so the span has not been created,we cannot log the status.
        CONTEXT_LOCAL.remove();
        objInst.setSkyWalkingDynamicField(null);
    }
​
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }
​
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
​
    }
}
  • SessionRequestFailInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行CONTEXT_LOCAL.remove()及objInst.setSkyWalkingDynamicField(null)

HttpAsyncRequestExecutorInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncRequestExecutorInstrumentation.java

代码语言:javascript
复制
public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
​
    private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";
    private static final String METHOD = "requestReady";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor";
​
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return null;
    }
​
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(METHOD);
            }
​
            @Override
            public String getMethodsInterceptor() {
                return INTERCEPTOR_CLASS;
            }
​
            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
        };
    }
​
    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);
    }
}
  • HttpAsyncRequestExecutorInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor增强org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor的requestReady方法

HttpAsyncRequestExecutorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncRequestExecutorInterceptor.java

代码语言:javascript
复制
public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor {
​
​
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        HttpContext context = CONTEXT_LOCAL.get();
        CONTEXT_LOCAL.remove();
        if (context == null) {
            return;
        }
        final ContextCarrier contextCarrier = new ContextCarrier();
        HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST);
        HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST);
​
        RequestLine requestLine = requestWrapper.getRequestLine();
        String uri = requestLine.getUri();
        String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri;
        int port = httpHost.getPort();
        AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port));
        span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);
        Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri());
        Tags.HTTP.METHOD.set(span, requestLine.getMethod());
        SpanLayer.asHttp(span);
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue());
        }
    }
​
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }
​
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
​
    }
}
  • HttpAsyncRequestExecutorInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.createExitSpan,然后设置url、method的tag,最后将contextCarrier.items()通过requestWrapper的header进行透传

小结

skywalking的http-async-client-plugin定义了三个增强,分别是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • skywalking-plugin.def
  • HttpAsyncClientInstrumentation
    • HttpAsyncClientInterceptor
    • SessionRequestInstrumentation
      • SessionRequestConstructorInterceptor
        • SessionRequestCompleteInterceptor
          • SessionRequestFailInterceptor
          • HttpAsyncRequestExecutorInstrumentation
            • HttpAsyncRequestExecutorInterceptor
            • 小结
            • doc
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档