首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >T咖啡馆|数据采集多级持久化与可重放

T咖啡馆|数据采集多级持久化与可重放

原创
作者头像
行者深蓝
发布2025-08-23 13:55:26
发布2025-08-23 13:55:26
1250
举报

IT咖啡馆|数据采集多级持久化与可重放(技术方案 v1)

目标读者:平台/SRE/数据工程团队、应用开发者、架构评审委员会undefined文档状态:v1 草案(可直接落地 PoC)undefined关键词:Vector、OpenTelemetry Collector、Kafka、OpenObserve、TimescaleDB、pgvector、Apache AGE、HLL、Benthos


0. 摘要(TL;DR)

  • 核心主张:在“端—边—域—库”的链路上,构建 端到端至少一次(At-least-once) 的传输语义和 多级持久化,并引入 旁路可重放总线(Kafka) 作为真源。近线侧用 OpenObserve 快速检索/告警,离线/准实时用 ETL→PostgreSQL 做明细与聚合沉淀,支持关系/向量/基数估计等分析。
  • 两条链路: 1) 主链 OTLP:边缘 Vector(或 App/SDK)→ 区域 OTel Gateway(WAL+队列)→ 扇出 OpenObserve(近线) & Kafka(旁路)。 2) 旁路 Kafka:边缘可选直写 Kafka.*_raw,后续可从 Kafka 做回放与 ETL。
  • 不丢包设计:Vector 缓冲+背压 → OTelcol file_storage(WAL)+sending_queue → Kafka acks=all, RF≥3 → OO WAL→对象存储;再叠加 事件 ID(event_id)幂等 UPSERT 去重。
  • 多区域:各区自洽(OO/Kafka/PG 独立),A 为主区提供统一查询/汇总与跨区镜像/聚合。

1. 范围与非目标

1.1 范围

  • 采集:日志(Filelog, Journald, 应用日志)、指标(Prometheus/Host/Process)、链路(OTLP Traces)。
  • 传输:OTLP(gRPC/HTTP)与 Kafka 并行。
  • 网关:区域 OTel Collector(多副本,LB 前置)。
  • 按需落地:OpenObserve(近线检索/告警)、Kafka(真源与重放)、PostgreSQL(明细/聚合/关系/向量)。
  • ETL:Benthos/Redpanda Connect/Flink(选一种即可先跑通)。

1.2 非目标

  • 不在本方案内替代现有 APM/SIEM 的全部功能;先满足 高可靠采集与可重放 的基础设施级能力。

2. 总体架构(OTLP 优先、两条链路)

代码语言:bash
复制
[边缘节点/主机/Pod]
  ├─ Vector(Filelog/Prom/进程指标→统一转 OTLP)
  │    └─ 本地内存/磁盘缓冲 + 背压
  │         → OTLP/gRPC →  区域 OTel Gateway(持久化队列/WAL)
  │         ↘(可选旁路)→ Kafka.<region>.logs_raw / metrics_raw / traces_raw
  └─ 应用/SDK Trace → 直接 OTLP → 区域 OTel Gateway

[区域网关(多副本,LB 前置)]
  ├─ OTelcol(接 OTLP/Kafka;file_storage + sending_queue)
  │    └─ Exporter 扇出:
  │        → OpenObserve(检索/告警/可视化)
  │        → Kafka.<region>(旁路重放与 ETL 真源)

[存储与分析]
  ├─ OpenObserve(对象存储/本地盘 + WAL;Logs/Metrics/Traces)
  ├─ Kafka(RF≥3)→ ETL(Benthos/Redpanda Connect/Flink)
  │        → PostgreSQL(明细 & 汇总:Timescale/pgvector/AGE/HLL)
  └─ (可选)cLoki/qryn 或 LGTM 替代检索面

设计动机:端兜压、域兜险、旁路兜底。主链保证近线可视化与告警,旁路留存原始真源,以备回放、回溯与下游再加工。


3. 多区域设计 / 区域内拓扑

3.1 区域内拓扑(A 区示例)

代码语言:bash
复制
[Producers / Agents / Apps]
  └─ (OTLP gRPC/HTTP, Filelog/Prom→OTLP)
        → OTel Gateway (Region-A, N副本, LB)
              ├─ 统一打标签/规范化/生成 event_id
              ├─ 扇出 → OpenObserve.A(近线检索/告警)
              └─ 扇出 → Kafka.A(*_raw 旁路:logs/metrics/traces)
                     ↘(ETL)→ PostgreSQL.A(明细/聚合/关系/向量)

3.2 多区域(A 主 + B/C 从;统一查询入口)

  • 各区独立:OpenObserve.A/B/C、Kafka.A/B/C、Postgres.A/B/C。
  • 主区(A) 提供统一查询/汇总(API Gateway / Query Proxy);默认“就近或主区”路由。
  • 跨区镜像:
    • 关键数据(*_norm 或抽样 traces)双写 A+B。
    • 或以 MirrorMaker2/Cluster Linking 做 Kafka 跨区镜像到中央集群;中央再做归一化与合并视图。

4. “不丢包设计”的专业化表述

4.1 端到端 At-least-once 传输语义 + 多级持久化

  • 边缘层:Vector 磁盘/内存缓冲 + 背压(出口拥塞时不丢,优先囤在边缘)。
  • 区域网关:OTelcol file_storage(WAL) + sending_queue(进程重启/下游不可用时持久化并重试)。
  • 旁路总线:Kafka 作为“真源与回放库”(*_raw 主题,RF≥3,acks=all + min.insync.replicas≥2),保留期 ≥ 回放窗口。
  • 落地层:OpenObserve 侧 WAL→对象存储(Parquet),降低成本并简化横向扩展。

4.2 扇出与去重策略

  • 区域 OTel Gateway 一写多发:OpenObserve(近线检索/告警) + Kafka(回放与 ETL 真源)。
  • 在 Gateway 统一生成 event_id(内容指纹/雪花/ULID);PG 侧以 UPSERT 幂等入库,允许“重复不缺失”。
  • 采样与降噪:在 Vector/OTel 侧进行(如仅保留 Error/Warn、TopK 热点或 trace tail-based sampling)。

4.3 SRE 可观测与演练

  • 监控指标:Exporter 失败率、Gateway 发送队列长度、Vector 缓冲水位、Kafka Lag、PG UPSERT 冲突率、OO ingest/compaction 延迟等。
  • 演练项:断网 30–60 分钟、重启 Gateway/ETL、下游限流(429/5xx)、历史主题回放验证统计闭合。

5. 关键参数与 Topic 规划

5.1 Kafka 主题

  • logs_raw:Key=event_id(或 service+ts_hash),压缩 zstd,分区数按峰值 EPS 规划(基线:每分区 10–20k msg/s)。
  • metrics_raw:Key=series_id(metric+labels 哈希),冷热分离可用 *_rollup
  • traces_raw:Key=trace_id,保证同一 trace 尽量同分区
  • (可选)*_norm:下游归一化后的“去重/标准化”流,Topic cleanup.policy=compact 以便去重/校正。

5.2 可靠性与吞吐参数

  • Broker:default.replication.factor>=3min.insync.replicas>=2
  • Producer:acks=allenable.idempotence=truelinger.ms=5~20batch.size=512KB~1MBmax.in.flight.requests.per.connection=1~5
  • 保留:retention.ms ≥ 7–30 天(按回放窗口),segment.bytes=1~2GB

6. 事件 ID(event_id)规范

6.1 生成原则

  • 幂等性:同一事件在多链路/多次传输下 event_id 必须一致。
  • 抗碰撞:使用 内容指纹(xxhash64/128 或 blake3) + 归一化字段(ts_bucket|service|source|fingerprint(payload))。
  • 可追溯:可包含 trace_id/span_id(若存在)。

6.2 伪代码(Go)

代码语言:go
复制
func EventID(e Event) string {
  // 1) 归一化:去除波动字段(如动态偏移、读写偏移量等)
  base := fmt.Sprintf("%s|%s|%s|%d|%s",
    e.Service, e.Source, e.Kind, e.Timestamp.UnixMilli()/100, Canonicalize(e.Payload))
  // 2) 指纹:xxhash 或 blake3
  sum := xxhash.Sum64([]byte(base))
  // 3) 可选:混入 trace/span 以增强可定位性
  return fmt.Sprintf("%s_%016x", e.TraceID, sum)
}

7. OTel Gateway(Collector)落地配置示例(精简)

代码语言:yaml
复制
receivers:
  otlp:
    protocols:
      grpc: { max_recv_msg_size_mib: 64 }
      http: {}
  kafka:
    brokers: ["kafka-a-1:9092","kafka-a-2:9092","kafka-a-3:9092"]
    topic: logs_raw
    encoding: otlp_proto

processors:
  attributes/eventid:
    actions:
      - key: event_id
        action: insert
        value: "${env:EVENT_ID}" # 实际用自定义扩展或 connector 生成
  batch: { send_batch_size: 10000, timeout: 5s }
  memory_limiter: { check_interval: 1s, limit_percentage: 75, spike_limit_percentage: 15 }
  resourcedetection/system:
    detectors: [system]

exporters:
  otlphttp/oo:
    endpoint: https://oo.region-a/api/default/
    headers: { Authorization: "Basic xxx" }
    tls: { insecure_skip_verify: false }
  kafka/raw:
    brokers: ["kafka-a-1:9092","kafka-a-2:9092","kafka-a-3:9092"]
    topic: logs_raw
    encoding: otlp_proto

extensions:
  file_storage:
    directory: /var/lib/otelcol/wal

service:
  extensions: [file_storage]
  pipelines:
    logs:
      receivers: [otlp]
      processors: [resourcedetection/system, attributes/eventid, batch, memory_limiter]
      exporters: [otlphttp/oo, kafka/raw]
  telemetry:
    logs: { level: info }
    metrics: { address: ":8888" }
  # 关键:全局发送队列(persist)
  queue:
    enabled: true
    num_consumers: 8
    storage: file_storage

说明:实际 event_id 生成可通过 connector/processor 扩展实现(Go),以上仅示意。


8. Vector(边缘)落地配置示例(精简)

代码语言:toml
复制
# sources
[sources.filelog]
  type = "file"
  include = ["/var/log/**/*.log"]
  ignore_older = 604800 # 7d

[sources.metrics]
  type = "prometheus_scrape"
  endpoints = ["http://localhost:9100/metrics","http://localhost:9256/metrics"]

# transforms(可选:降噪/打标签)
[transforms.add_fields]
  type = "remap"
  inputs = ["filelog"]
  source = '''
  .service = env!("SERVICE","unknown")
  .region  = env!("REGION","a")
  '''

# sinks(OTLP 主链)
[sinks.to_otlp]
  type = "otlp"
  inputs = ["add_fields","metrics"]
  endpoint = "https://otel-gw.region-a:4317"
  request.concurrency = 2
  # 缓冲与重试
  buffer.type = "disk"
  buffer.max_size = 20_000_000_000 # 20GB per node
  buffer.when_full = "block"       # 背压
  acknowledgements.enabled = true
  retry.max_duration = 24h

# (可选)旁路直写 Kafka
[sinks.to_kafka]
  type = "kafka"
  inputs = ["add_fields"]
  bootstrap_servers = "kafka-a-1:9092,kafka-a-2:9092"
  topic = "logs_raw"
  encoding.codec = "json"
  acknowledgements.enabled = true

说明:Vector 的 磁盘缓冲 + 背压 是边缘“兜压”的关键;可按节点盘量做 10–50GB 级别配置。


9. OpenObserve(OO)建议

  • 存储:对象存储(S3 兼容)为主,本地盘为热缓存;WAL 启用。
  • 分层:短保留(热)+ 长保留(冷,对象存储),按租户/索引策略管理成本。
  • 索引少索引、重标签;字段规范化(service, level, event_id, trace_id)。
  • 多租户:基于组织/项目维度;鉴权走 Token/Basic;Ingress/网关做速率与配额控制。
  • 告警:近线基于向量/阈值/模板,告警消息可回发 Kafka 以供编排。

10. ETL → PostgreSQL(Timescale/pgvector/AGE/HLL)

10.1 Benthos 管道(示例)

代码语言:yaml
复制
input:
  kafka_franz:
    addresses: [ kafka-a-1:9092, kafka-a-2:9092 ]
    topics: [ logs_raw ]
    consumer_group: etl-logs-v1

pipeline:
  processors:
    - mapping: |
        root.ts = this.ts
        root.service = this.service
        root.level = this.level.or("INFO")
        root.event_id = this.event_id
        root.msg = this.message
        root.labels = this.labels

output:
  sql_insert:
    driver: postgres
    dsn: postgres://etl:***@pg-a:5432/obs?sslmode=disable
    table: logs_events
    columns: [ts, service, level, event_id, msg, labels]
    args_mapping: |
      root = [ this.ts, this.service, this.level, this.event_id, this.msg, this.labels ]
    init_statement: |
      CREATE TABLE IF NOT EXISTS logs_events (
        ts timestamptz not null,
        service text,
        level text,
        event_id text primary key,
        msg text,
        labels jsonb
      );

10.2 PG 表与索引(建议)

  • 日志(明细) logs_events
    • event_id PK,ts 按 Timescale hypertable 分区;labels JSONB。
    • 倒排/GIN:CREATE INDEX ON logs_events USING gin (labels);
    • 去重:UPSERT ON CONFLICT(event_id) DO NOTHING/UPDATE
  • 指标(点/分桶) metrics_points(ts, metric, value, labels);存储直方图/分位数可建陪表。
  • 链路 traces_spans(trace_id, span_id, parent_id, service, ts, dur, attrs)
  • 向量搜索semantic_objects(id uuid, ts, service, kind, text, embedding vector(1024)) + pgvector
  • 关系图Apache AGE 建图谱 CALLS(服务依赖),用于故障扩散分析。
  • 基数估计HLL 存常见维度去重计数(user_id/ip/url 等)。

11. 查询面与统一入口

  • 主区(A)Query Proxy
    • 对接 OpenObserve.A/B/C(多集群路由)与 PG.A(物化视图/聚合)。
    • “就近优先,主区兜底”;失败降级只读。
  • Grafana:OO/PG 双数据源;看板模板化(SLO、TopK、异常分布、错误热力)。

12. 安全与多租户

  • 传输安全:全链路 TLS;OTLP mTLS(证书由私有 CA/Cert-Manager 签发)。
  • 鉴权:OTel Gateway 限流/配额;Kafka ACL(SASL/SCRAM 或 mTLS);PG 细粒度角色(最小权限)。
  • 隔离:租户级 Token/前缀;Topic 按租户/环境划分;PG schema 按租户/域分库分表。
  • 审计:采集/变更/回放均落审计日志(Kafka audit_log)。

13. 容量规划(首版估算方法)

  • 日志吞吐:峰值 EPS(每秒事件数)× 平均消息大小 = 带宽;按 每分区 10–20k msg/s 估分区;初始 RF=3。
  • 边缘缓冲:按“最长断链时间 × 峰值带宽 × 安全系数1.3” 规划 Vector 磁盘缓冲;典型 10–50GB/节点。
  • Gateway WAL:按“重试窗口 × 平均入站速率 × 副本数”估算;初始 50–200GB/副本。
  • OO 对象存储:按保留期与压缩比;日志 Parquet 压缩 3–6× 为常见范围。
  • PG:热层(近 7–14 天)+ 冷层(归档/分区);开启 Timescale/TOAST 压缩策略。

14. 运维与演练(Runbook)

  • 常见告警
    • Vector buffer 使用率 > 80%,OTelcol 发送队列滞留 > 5 分钟。
    • Kafka ISR 降低、Lag 升高、控制器切换频繁。
    • OO ingest 错误率、compaction 延迟、对象存写入失败。
    • PG autovacuum 积压、物化视图刷新延迟。
  • 故障演练
    • 断网 30–60 分钟,验证回放后 计数闭合(OO/PG 与 Kafka 消费偏移核对)。
    • 强制滚动重启 Gateway/ETL,确认 WAL/队列无数据丢失。
    • 旁路回放:回放 logs_raw 指定时间窗,重跑 ETL,校验去重与幂等。

15. 渐进式落地路线图

1) PoC(2–3 周):单区打通 Vector→OTel→OO 与 Kafka→Benthos→PG 全链路;验证 event_id 与 UPSERT 去重。

2) 小规模试点(1–2 月):TOP 3 服务接入;完成看板/告警;演练断链与回放。

3) 多区域推广(2–3 月):A 主区上线统一查询;B/C 镜像与跨区回放;形成运维制度与 SLO。

4) 增强(持续):引入 cLoki/qryn 替代面评估、Trace Tail-based Sampling、异常检测/根因分析(向量+图)。


16. 风险与备选

  • 风险:跨团队接口不一致、字段规范化不足、链路黑盒化导致定位困难、对象存访问带宽受限等。
  • 缓解:统一 Schema/标签字典;灰度接入;强制仪表化(采集自身指标);默认旁路 Kafka 保底。
  • 备选
    • 检索面:cLoki / qryn / LGTM;
    • ETL:Redpanda Connect / Flink;
    • DB:ClickHouse(日志聚合与冷数据仓)。

17. 附录 A:字段与标签规范(节选)

  • 核心标签:service, env, region, host, pod, event_id, trace_id, span_id, level, code
  • 时间:统一 ts(毫秒),入库做时区归一化(UTC)。
  • 规范化:消息体尽量结构化(JSON);自由文本落 msg;高维标签谨慎(HLL估计 + TopK 采样)。

18. 附录 B:版本与依赖(建议)

  • Vector ≥ 0.36;OTel Collector ≥ 0.98;OpenObserve ≥ 最新稳定版;Kafka ≥ 3.6;Timescale ≥ 2.14;pgvector ≥ 0.7;AGE ≥ 1.5;Benthos ≥ 4.x。

结束语:先把“可靠采集、可回放、可校验闭合”这三件事跑顺,再在此之上做更聪明的分析与自动化。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • IT咖啡馆|数据采集多级持久化与可重放(技术方案 v1)
    • 0. 摘要(TL;DR)
    • 1. 范围与非目标
      • 1.1 范围
      • 1.2 非目标
    • 2. 总体架构(OTLP 优先、两条链路)
    • 3. 多区域设计 / 区域内拓扑
      • 3.1 区域内拓扑(A 区示例)
      • 3.2 多区域(A 主 + B/C 从;统一查询入口)
    • 4. “不丢包设计”的专业化表述
      • 4.1 端到端 At-least-once 传输语义 + 多级持久化
      • 4.2 扇出与去重策略
      • 4.3 SRE 可观测与演练
    • 5. 关键参数与 Topic 规划
      • 5.1 Kafka 主题
      • 5.2 可靠性与吞吐参数
    • 6. 事件 ID(event_id)规范
      • 6.1 生成原则
      • 6.2 伪代码(Go)
    • 7. OTel Gateway(Collector)落地配置示例(精简)
    • 8. Vector(边缘)落地配置示例(精简)
    • 9. OpenObserve(OO)建议
    • 10. ETL → PostgreSQL(Timescale/pgvector/AGE/HLL)
      • 10.1 Benthos 管道(示例)
      • 10.2 PG 表与索引(建议)
    • 11. 查询面与统一入口
    • 12. 安全与多租户
    • 13. 容量规划(首版估算方法)
    • 14. 运维与演练(Runbook)
    • 15. 渐进式落地路线图
    • 16. 风险与备选
    • 17. 附录 A:字段与标签规范(节选)
    • 18. 附录 B:版本与依赖(建议)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档