目标读者:平台/SRE/数据工程团队、应用开发者、架构评审委员会undefined文档状态:v1 草案(可直接落地 PoC)undefined关键词:Vector、OpenTelemetry Collector、Kafka、OpenObserve、TimescaleDB、pgvector、Apache AGE、HLL、Benthos
[边缘节点/主机/Pod]
├─ Vector(Filelog/Prom/进程指标→统一转 OTLP)
│ └─ 本地内存/磁盘缓冲 + 背压
│ → OTLP/gRPC → 区域 OTel Gateway(持久化队列/WAL)
│ ↘(可选旁路)→ Kafka.<region>.logs_raw / metrics_raw / traces_raw
└─ 应用/SDK Trace → 直接 OTLP → 区域 OTel Gateway
[区域网关(多副本,LB 前置)]
├─ OTelcol(接 OTLP/Kafka;file_storage + sending_queue)
│ └─ Exporter 扇出:
│ → OpenObserve(检索/告警/可视化)
│ → Kafka.<region>(旁路重放与 ETL 真源)
[存储与分析]
├─ OpenObserve(对象存储/本地盘 + WAL;Logs/Metrics/Traces)
├─ Kafka(RF≥3)→ ETL(Benthos/Redpanda Connect/Flink)
│ → PostgreSQL(明细 & 汇总:Timescale/pgvector/AGE/HLL)
└─ (可选)cLoki/qryn 或 LGTM 替代检索面
设计动机:端兜压、域兜险、旁路兜底。主链保证近线可视化与告警,旁路留存原始真源,以备回放、回溯与下游再加工。
[Producers / Agents / Apps]
└─ (OTLP gRPC/HTTP, Filelog/Prom→OTLP)
→ OTel Gateway (Region-A, N副本, LB)
├─ 统一打标签/规范化/生成 event_id
├─ 扇出 → OpenObserve.A(近线检索/告警)
└─ 扇出 → Kafka.A(*_raw 旁路:logs/metrics/traces)
↘(ETL)→ PostgreSQL.A(明细/聚合/关系/向量)
file_storage
(WAL) + sending_queue
(进程重启/下游不可用时持久化并重试)。*_raw
主题,RF≥3,acks=all
+ min.insync.replicas≥2
),保留期 ≥ 回放窗口。event_id
(内容指纹/雪花/ULID);PG 侧以 UPSERT
幂等入库,允许“重复不缺失”。logs_raw
:Key=event_id
(或 service+ts_hash
),压缩 zstd
,分区数按峰值 EPS 规划(基线:每分区 10–20k msg/s)。metrics_raw
:Key=series_id
(metric+labels 哈希),冷热分离可用 *_rollup
。traces_raw
:Key=trace_id
,保证同一 trace 尽量同分区。*_norm
:下游归一化后的“去重/标准化”流,Topic cleanup.policy=compact
以便去重/校正。default.replication.factor>=3
,min.insync.replicas>=2
。acks=all
,enable.idempotence=true
,linger.ms=5~20
,batch.size=512KB~1MB
,max.in.flight.requests.per.connection=1~5
。retention.ms
≥ 7–30 天(按回放窗口),segment.bytes=1~2GB
。event_id
必须一致。ts_bucket|service|source|fingerprint(payload)
)。trace_id/span_id
(若存在)。func EventID(e Event) string {
// 1) 归一化:去除波动字段(如动态偏移、读写偏移量等)
base := fmt.Sprintf("%s|%s|%s|%d|%s",
e.Service, e.Source, e.Kind, e.Timestamp.UnixMilli()/100, Canonicalize(e.Payload))
// 2) 指纹:xxhash 或 blake3
sum := xxhash.Sum64([]byte(base))
// 3) 可选:混入 trace/span 以增强可定位性
return fmt.Sprintf("%s_%016x", e.TraceID, sum)
}
receivers:
otlp:
protocols:
grpc: { max_recv_msg_size_mib: 64 }
http: {}
kafka:
brokers: ["kafka-a-1:9092","kafka-a-2:9092","kafka-a-3:9092"]
topic: logs_raw
encoding: otlp_proto
processors:
attributes/eventid:
actions:
- key: event_id
action: insert
value: "${env:EVENT_ID}" # 实际用自定义扩展或 connector 生成
batch: { send_batch_size: 10000, timeout: 5s }
memory_limiter: { check_interval: 1s, limit_percentage: 75, spike_limit_percentage: 15 }
resourcedetection/system:
detectors: [system]
exporters:
otlphttp/oo:
endpoint: https://oo.region-a/api/default/
headers: { Authorization: "Basic xxx" }
tls: { insecure_skip_verify: false }
kafka/raw:
brokers: ["kafka-a-1:9092","kafka-a-2:9092","kafka-a-3:9092"]
topic: logs_raw
encoding: otlp_proto
extensions:
file_storage:
directory: /var/lib/otelcol/wal
service:
extensions: [file_storage]
pipelines:
logs:
receivers: [otlp]
processors: [resourcedetection/system, attributes/eventid, batch, memory_limiter]
exporters: [otlphttp/oo, kafka/raw]
telemetry:
logs: { level: info }
metrics: { address: ":8888" }
# 关键:全局发送队列(persist)
queue:
enabled: true
num_consumers: 8
storage: file_storage
说明:实际
event_id
生成可通过 connector/processor 扩展实现(Go),以上仅示意。
# sources
[sources.filelog]
type = "file"
include = ["/var/log/**/*.log"]
ignore_older = 604800 # 7d
[sources.metrics]
type = "prometheus_scrape"
endpoints = ["http://localhost:9100/metrics","http://localhost:9256/metrics"]
# transforms(可选:降噪/打标签)
[transforms.add_fields]
type = "remap"
inputs = ["filelog"]
source = '''
.service = env!("SERVICE","unknown")
.region = env!("REGION","a")
'''
# sinks(OTLP 主链)
[sinks.to_otlp]
type = "otlp"
inputs = ["add_fields","metrics"]
endpoint = "https://otel-gw.region-a:4317"
request.concurrency = 2
# 缓冲与重试
buffer.type = "disk"
buffer.max_size = 20_000_000_000 # 20GB per node
buffer.when_full = "block" # 背压
acknowledgements.enabled = true
retry.max_duration = 24h
# (可选)旁路直写 Kafka
[sinks.to_kafka]
type = "kafka"
inputs = ["add_fields"]
bootstrap_servers = "kafka-a-1:9092,kafka-a-2:9092"
topic = "logs_raw"
encoding.codec = "json"
acknowledgements.enabled = true
说明:Vector 的 磁盘缓冲 + 背压 是边缘“兜压”的关键;可按节点盘量做 10–50GB 级别配置。
service
, level
, event_id
, trace_id
)。input:
kafka_franz:
addresses: [ kafka-a-1:9092, kafka-a-2:9092 ]
topics: [ logs_raw ]
consumer_group: etl-logs-v1
pipeline:
processors:
- mapping: |
root.ts = this.ts
root.service = this.service
root.level = this.level.or("INFO")
root.event_id = this.event_id
root.msg = this.message
root.labels = this.labels
output:
sql_insert:
driver: postgres
dsn: postgres://etl:***@pg-a:5432/obs?sslmode=disable
table: logs_events
columns: [ts, service, level, event_id, msg, labels]
args_mapping: |
root = [ this.ts, this.service, this.level, this.event_id, this.msg, this.labels ]
init_statement: |
CREATE TABLE IF NOT EXISTS logs_events (
ts timestamptz not null,
service text,
level text,
event_id text primary key,
msg text,
labels jsonb
);
logs_events
:event_id
PK,ts
按 Timescale hypertable 分区;labels
JSONB。CREATE INDEX ON logs_events USING gin (labels);
ON CONFLICT(event_id) DO NOTHING/UPDATE
。metrics_points
:(ts, metric, value, labels)
;存储直方图/分位数可建陪表。traces_spans
:(trace_id, span_id, parent_id, service, ts, dur, attrs)
;semantic_objects(id uuid, ts, service, kind, text, embedding vector(1024))
+ pgvector
。Apache AGE
建图谱 CALLS
(服务依赖),用于故障扩散分析。HLL
存常见维度去重计数(user_id/ip/url 等)。audit_log
)。logs_raw
指定时间窗,重跑 ETL,校验去重与幂等。1) PoC(2–3 周):单区打通 Vector→OTel→OO 与 Kafka→Benthos→PG 全链路;验证 event_id 与 UPSERT 去重。
2) 小规模试点(1–2 月):TOP 3 服务接入;完成看板/告警;演练断链与回放。
3) 多区域推广(2–3 月):A 主区上线统一查询;B/C 镜像与跨区回放;形成运维制度与 SLO。
4) 增强(持续):引入 cLoki/qryn 替代面评估、Trace Tail-based Sampling、异常检测/根因分析(向量+图)。
service
, env
, region
, host
, pod
, event_id
, trace_id
, span_id
, level
, code
。ts
(毫秒),入库做时区归一化(UTC)。msg
;高维标签谨慎(HLL估计 + TopK 采样)。结束语:先把“可靠采集、可回放、可校验闭合”这三件事跑顺,再在此之上做更聪明的分析与自动化。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。