目标:在“OTLP 优先、双链路(检索/可视化 + 真源旁路)”的大架构下,构建以 PostgreSQL 为“二级分析域”的 AIOps/LLM Agent 数据底座,覆盖 时序趋势(TimescaleDB)、向量检索(pgvector)、服务关系图(Apache AGE)、高基数与 TopK(HLL/Toolkit) 等能力,并与 OpenObserve/Grafana 的一线使用体验解耦。
[边缘节点/主机/Pod]
├─ Vector(Filelog / Prometheus / 进程指标 → 统一转 OTLP)
│ └─ 本地内存/磁盘缓冲 + 背压(retries + WAL-like)
│ → OTLP/gRPC → 区域 OTel Gateway(持久化队列 / file_storage)
│ ↘(可选旁路)→ Kafka.<region>.{logs_raw,metrics_raw,traces_raw}
└─ 应用/SDK Trace → 直接 OTLP → 区域 OTel Gateway
[区域网关(多副本,LB 前置)]
├─ OTelcol(接 OTLP/Kafka;file_storage + sending_queue)
│ └─ Exporter 扇出:
│ → OpenObserve(检索/告警/可视化)
│ → Kafka.<region>(旁路重放与 ETL 真源)
[存储与分析]
├─ OpenObserve(对象存储/本地盘 + WAL;Logs/Metrics/Traces)
├─ Kafka(RF≥3)→ ETL(Benthos/Redpanda Connect/Flink)
│ → PostgreSQL(二级分析域:Timescale/pgvector/AGE/HLL)
└─ (可选)cLoki/qryn 或 LGTM 替代检索面
定位:OpenObserve/LGTM 提供一线检索与可视化;PostgreSQL 提供深度分析与 AIOps Agent 的统一数据接口(趋势/相似性/拓扑因果/TopK)。
核心扩展
timescaledb
:超表/连续聚合/压缩/保留策略timescaledb_toolkit
:百分位/近似去重(HLL)/时间加权等超函数vector
(pgvector):嵌入向量检索(HNSW/IVFFLAT)age
(Apache AGE):原生 Cypher 图查询与图算法hll
(可选,Citus/PG HLL):近似去重、基数估计(若不使用 Toolkit 的 HLL)pg_partman
(可选):细粒度分区管理(若不完全依赖 Timescale)btree_gin
/ btree_gist
/ pg_trgm
:复合索引与模糊搜索优化硬件与实例建议
wal_compression
;shared_buffers=25% RAM
起步;effective_io_concurrency
依据盘数调优。pgBackRest
(全量 + 增量 + 归档 WAL)。CREATE TABLE metrics_points (
ts timestamptz NOT NULL,
metric text NOT NULL, -- e.g. http_server_duration_seconds
service text NOT NULL, -- 逻辑服务名
endpoint text NULL,
value double precision NOT NULL,
unit text NULL,
labels jsonb NOT NULL, -- 附加标签(仅少量索引)
host text NULL,
pod text NULL,
cluster text NULL,
tenant text NOT NULL DEFAULT 'default'
);
SELECT create_hypertable('metrics_points','ts', chunk_time_interval => interval '1 day');
-- 热数据索引(时间 + 维度前缀),高基数标签慎建索引
CREATE INDEX ON metrics_points (service, metric, ts DESC);
CREATE INDEX ON metrics_points USING GIN ((labels) jsonb_path_ops);
-- 压缩与保留策略
ALTER TABLE metrics_points SET (
timescaledb.compress,
timescaledb.compress_orderby = 'ts DESC',
timescaledb.compress_segmentby = 'service,metric'
);
SELECT add_compression_policy('metrics_points', INTERVAL '7 days');
SELECT add_retention_policy('metrics_points', INTERVAL '90 days');
连续聚合(5m/1h)
CREATE MATERIALIZED VIEW metrics_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', ts) AS bucket,
service, metric,
avg(value) AS avg_v,
max(value) AS p100,
approx_percentile( percentile_agg(value), 0.99) AS p99
FROM metrics_points
GROUP BY bucket, service, metric;
SELECT add_continuous_aggregate_policy('metrics_5m',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '5 minutes',
schedule_interval=> INTERVAL '1 minute');
CREATE TABLE logs_events (
ts timestamptz NOT NULL,
service text NOT NULL,
level text NULL, -- info/warn/error
message text NOT NULL,
fingerprint text GENERATED ALWAYS AS ( md5(coalesce(message,'')) ) STORED,
attrs jsonb NOT NULL,
trace_id text NULL,
span_id text NULL,
host text NULL,
pod text NULL,
cluster text NULL,
tenant text NOT NULL DEFAULT 'default'
);
SELECT create_hypertable('logs_events','ts', chunk_time_interval => interval '1 day');
CREATE INDEX ON logs_events (service, ts DESC);
CREATE INDEX ON logs_events (level, ts DESC);
CREATE INDEX ON logs_events (fingerprint, ts DESC);
CREATE INDEX ON logs_events USING GIN ((attrs) jsonb_path_ops);
ALTER TABLE logs_events SET (
timescaledb.compress,
timescaledb.compress_orderby = 'ts DESC',
timescaledb.compress_segmentby = 'service,level'
);
SELECT add_compression_policy('logs_events', INTERVAL '3 days');
SELECT add_retention_policy('logs_events', INTERVAL '30 days');
TopK(5m 窗口)
CREATE MATERIALIZED VIEW logs_topk_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', ts) AS bucket,
service,
topn(5, freq_agg(fingerprint)) AS top5_fingerprints
FROM logs_events
WHERE level IN ('error','warn')
GROUP BY bucket, service;
说明:若使用
timescaledb_toolkit
,freq_agg
/topn
提供近似 Top-K;若不用 Toolkit,可退化为count(*) ORDER BY count DESC LIMIT k
的连续聚合(成本更高)。
CREATE TABLE traces_spans (
ts_start timestamptz NOT NULL,
ts_end timestamptz NOT NULL,
trace_id text NOT NULL,
span_id text NOT NULL,
parent_span text NULL,
service text NOT NULL,
endpoint text NULL,
kind text NULL, -- server/client/internal
status text NULL, -- ok/error
attrs jsonb NOT NULL,
duration_ms double precision GENERATED ALWAYS AS (extract(epoch FROM (ts_end-ts_start))*1000) STORED
);
SELECT create_hypertable('traces_spans','ts_start', chunk_time_interval => interval '1 day');
CREATE INDEX ON traces_spans (service, ts_start DESC);
CREATE INDEX ON traces_spans (status, ts_start DESC);
错误率/延迟趋势(5m)
CREATE MATERIALIZED VIEW svc_latency_err_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', ts_start) AS bucket,
service,
avg(duration_ms) AS avg_ms,
approx_percentile( percentile_agg(duration_ms), 0.95) AS p95_ms,
sum(CASE WHEN status='error' THEN 1 ELSE 0 END)::float / count(*) AS err_rate
FROM traces_spans
GROUP BY bucket, service;
CREATE EXTENSION IF NOT EXISTS vector; -- 需预装
CREATE TABLE semantic_objects (
id bigserial PRIMARY KEY,
ts timestamptz NOT NULL DEFAULT now(),
object_type text NOT NULL, -- 'log','trace','doc','incident'
service text NULL,
window tstzrange NULL, -- 对应数据时间窗
title text NULL,
content text NOT NULL,
meta jsonb NOT NULL DEFAULT '{}'::jsonb,
embedding vector(1024) -- 维度按所选模型
);
-- 选择其一:HNSW(实时检索好,内存导向)或 IVFFLAT(批量构建、低内存)
CREATE INDEX ON semantic_objects USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=200);
-- CREATE INDEX ON semantic_objects USING ivfflat (embedding vector_cosine_ops) WITH (lists=200);
常用相似检索
-- 最近相似告警 / 处置手册
SELECT id, object_type, service, ts, title,
1 - (embedding <=> :qvec) AS score
FROM semantic_objects
WHERE object_type IN ('incident','doc')
ORDER BY embedding <=> :qvec
LIMIT 20;
LOAD 'age';
SET search_path = ag_catalog, "$user", public;
SELECT * FROM create_graph('svc_topo');
-- 每日(或每小时)由 traces_spans 聚合写入图:
-- 节点:Service{name}
-- 边:(:Service)-[:CALLS {p95_ms, err_rate, volume, window}]->(:Service)
-- 示例查询:从服务A出发,寻找 3 跳内高错误率路径
SELECT * FROM cypher('svc_topo', $$
MATCH p = (s:Service {name: $svc})- [e:CALLS*1..3] -> (t:Service)
WHERE ANY (edge IN e WHERE edge.err_rate > 0.02)
RETURN p
$$) as (p agtype);
示例:logs_raw → PG(明细+TopK 预聚合写入)
input:
kafka:
addresses: [ "kafka-1:9092","kafka-2:9092" ]
topics: [ "logs_raw" ]
consumer_group: "etl-logs-pg"
pipeline:
processors:
- bloblang: |
root.ts = this.ts
root.service = this.service
root.level = this.level
root.message = this.message
root.attrs = this
root.trace_id = this.trace_id
root.span_id = this.span_id
root.host = this.host
root.pod = this.pod
root.cluster = this.cluster
output:
sql_insert:
driver: "postgres"
dsn: "postgres://etl:***@pg:5432/obs?sslmode=disable"
table: "public.logs_events"
columns: ["ts","service","level","message","attrs","trace_id","span_id","host","pod","cluster"]
向量化(可异步):logs → embed → semantic_objects
http
处理器调用内部 Embedding 服务,将 text → embedding 后再 sql_insert
。metrics_raw
/traces_raw
写 PG;(service, ts, metric)
/ (trace_id, span_id)
)。[sources.app]
type = "file"
include = ["/var/log/app/*.log"]
read_from = "beginning"
[transforms.to_otlp]
type = "remap"
inputs = ["app"]
source = '''
.service = "svc.api"
.level = .level || "info"
.message = string!(.message)
'''
[sinks.otel]
type = "opentelemetry"
inputs = ["to_otlp"]
endpoint = "https://otel-gw.region.svc.plus:4317"
request.concurrency = 4
acknowledgement = "basic"
healthcheck.enabled = true
# 本地缓冲
[sinks.otel.buffer]
type = "disk"
max_size = 10737418240 # 10GiB
receivers:
otlp:
protocols:
grpc: { max_recv_msg_size_mib: 32 }
kafka:
brokers: ["k1:9092","k2:9092"]
topic: traces_raw
processors:
memory_limiter:
check_interval: 1s
limit_percentage: 75
spike_limit_percentage: 15
batch:
send_batch_size: 10000
timeout: 5s
exporters:
otlphttp/openobserve:
endpoint: https://oo.region.svc.plus/api/default/
headers:
Authorization: "Basic ..."
kafka/logs_raw:
brokers: ["k1:9092","k2:9092"]
topic: logs_raw
kafka/metrics_raw:
brokers: ["k1:9092","k2:9092"]
topic: metrics_raw
extensions:
file_storage:
directory: /var/lib/otelcol/storage
service:
extensions: [file_storage]
pipelines:
logs:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [otlphttp/openobserve, kafka/logs_raw]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [otlphttp/openobserve, kafka/metrics_raw]
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [otlphttp/openobserve]
输入:{ service, window, symptom }
输出:{ trend, anomalies, topk_errors, impacted_paths, similar_incidents }
关键查询
-- (1) 趋势 + 异常(简易基线:窗口外位于 p95 之上)
WITH base AS (
SELECT bucket, avg_ms, p95_ms, err_rate FROM svc_latency_err_5m
WHERE service = :svc AND bucket >= :t0 AND bucket < :t1
)
SELECT *, (avg_ms > p95_ms) AS anomaly
FROM base
ORDER BY bucket;
-- (2) TopK 错误指纹
SELECT (topn_values(top5_fingerprints)).* AS fingerprint
FROM logs_topk_5m
WHERE service=:svc AND bucket >= :t0 AND bucket < :t1
ORDER BY bucket DESC LIMIT 10;
-- (3) 受影响路径(Cypher)
-- 由外层服务执行 Cypher RPC,或将结果落库后查询
-- (4) 相似历史(向量)
SELECT id, title, 1 - (embedding <=> :qvec) AS score
FROM semantic_objects
WHERE object_type IN ('incident','doc') AND service = :svc
ORDER BY embedding <=> :qvec LIMIT 10;
label_kv(id, k, v)
,事实表存 label_ids int[]
)。service,metric
segment)、热 7~30 天、冷 90~180 天。tenant
列,视图 + RLS(Row Level Security)隔离:ALTER TABLE metrics_points ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON metrics_points
USING (tenant = current_setting('app.tenant', true));sqitch
/atlas
管理 DDL;CI 校验 EXPLAIN
计划变化。pg_stat_statements
;对慢视图加物化与刷新策略。VACUUM (VERBOSE, ANALYZE)
。pgBackRest
多地备份,演练 PITR(Point-in-time Recovery)。metrics_points
/ logs_events
/ traces_spans
,开启 5m 连续聚合;Grafana 指向 PG 视图验收。semantic_objects
与基础相似检索。-- A) 指标趋势 + 百分位
SELECT * FROM metrics_5m WHERE service='svc.api' AND metric='http_server_duration_seconds'
AND bucket >= now() - interval '6 hours' ORDER BY bucket;
-- B) 最近 30 分钟 Top5 错误指纹(含样本)
WITH t AS (
SELECT DISTINCT fingerprint
FROM logs_topk_5m
WHERE service='svc.api' AND bucket >= now() - interval '30 minutes'
CROSS JOIN LATERAL topn_values(top5_fingerprints)
)
SELECT l.fingerprint, array_agg(l.message ORDER BY l.ts DESC LIMIT 3) AS samples
FROM logs_events l JOIN t USING (fingerprint)
WHERE l.service='svc.api' AND l.level IN ('error','warn')
GROUP BY l.fingerprint
ORDER BY count(*) DESC LIMIT 5;
-- C) 将高错误率边写入图(示意)
INSERT INTO cypher('svc_topo', $$
MATCH (s:Service {name: $s}), (t:Service {name: $t})
MERGE (s)-[e:CALLS]->(t)
SET e.p95_ms=$p95, e.err_rate=$er, e.volume=$vol, e.window=$win
$$) as (nothing agtype);
-- D) 语义相似的历史事件
SELECT id, title, 1-(embedding <=> :qvec) AS score
FROM semantic_objects WHERE object_type='incident'
ORDER BY embedding <=> :qvec LIMIT 10;
本方案强调“检索面与分析面解耦”:一线检索=OpenObserve/LGTM,深度分析=PostgreSQL 二级域。在不改变你现有 OTel/OO 的情况下,把 AIOps/LLM Agent 需要的趋势、相似性、拓扑与 TopK 统一到 PG 层,做到“可扩、可回灌、可近似、可控成本”。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。