我有一个开放的遥测运行一个python FastApi应用程序。痕迹被发送到Jaeger,我可以查看它们。
我正在做大量的IO密集型工作,所以我将与ThreadPoolExecutor并行进行。在ThreadPoolExecutor执行的函数中创建的Spans没有进入Jaeger。
有人能为我指明正确的方向吗?目前,我正在使用禁用并发性来记录跟踪以进行性能调试,但这在生产中行不通。
发布于 2022-03-03 09:31:33
我有更多的时间来研究这个问题,并创建了这个类来解决这个问题:
from opentelemetry import context as otel_context
class TracedThreadPoolExecutor(ThreadPoolExecutor):
"""Implementation of :class:`ThreadPoolExecutor` that will pass context into sub tasks."""
def __init__(self, tracer: Tracer, *args, **kwargs):
self.tracer = tracer
super().__init__(*args, **kwargs)
def with_otel_context(self, context: otel_context.Context, fn: Callable):
otel_context.attach(context)
return fn()
def submit(self, fn, *args, **kwargs):
"""Submit a new task to the thread pool."""
# get the current otel context
context = otel_context.get_current()
if context:
return super().submit(
lambda: self.with_otel_context(
context, lambda: fn(*args, **kwargs)
),
)
else:
return super().submit(lambda: fn(*args, **kwargs))
这个类可以如下所示:
from opentelemetry import trace
import multiprocessing
tracer = trace.get_tracer(__name__)
executor = TracedThreadPoolExecutor(tracer, max_workers=multiprocessing.cpu_count())
# from here you can use it as you would a regular ThreadPoolExecutor
result = executor.submit(some_work)
executor.shutdown(wait=True)
# do something with the result
https://stackoverflow.com/questions/70772036
复制相似问题