PipelineDB是一个PostgreSQL的一个流式数据库,是pg社区的一个扩展。
下面来引入PipelineDB里面的一些概念:
流是一种允许客户端将时序数据写入流视图的抽象管道。流里面的一行数据(或者简单称作 event),与数据表中的行数据是很相似的,并且二者的写入也是完全一致的。然而,流和数据表的语义是完全不同的。简单来说,在PipelineDB中是一个foreign table,该表仅仅写数据,不可读取。
public | wiki_stream | foreign table | postgres
当从该表中读取报错如下:
postgres=# select * from wiki_stream;
ERROR: "wiki_stream" is a stream
HINT: Streams can only be read by a continuous view's FROM clause.
此时便引出流视图,可以达到"流和表中的数据组合后作为输入并进行实时增量更新"的效果。
流数据一旦被流视图读取后就会被销毁,流数据不会存储在任何地方。只有诸如 SELECT * FROM that_view
查询返回的结果才会被持久化,也就是说,流视图可以被视为高吞吐量、实时的物化视图。
创建流视图如下语法,在原生SQL中扩展action行为。
CREATE VIEW name [WITH (action=materialize [, ...])] AS query
使用如下:
---创建流
CREATE FOREIGN TABLE test_stream_targets_stream (x int) SERVER pipelinedb;
---创建流视图
CREATE VIEW test_stream_targets0 AS SELECT COUNT(*) FROM test_stream_targets_stream;
流转换是一个比较有趣的概念,可以在不存储数据的情况下,将数据作为另外一个流的输入或者写入流视图中。由于在进行流转换过程中数据不会被存储,因此流转换不支持聚合操作。
语法:
CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query
例如:
---流转换
CREATE VIEW t WITH (action=transform) AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
---将流转换的数据进行存储
CREATE VIEW v WITH (action=materialize) AS
SELECT sum(y) FROM output_of('t');
当然,这里也可以传递outputfunc,可以自定义,function_name 是一个用户传入的函数,它的返回类型为 trigger
,并且会作用到流转换的每一行输出上。arguments 是一系列逗号分隔的参数,在触发器执行时传给函数,只能为字符串常量。
例如:
CREATE VIEW ct1 WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('ct_stream0')) AS SELECT x::int % 4 AS x FROM ct_stream1 WHERE x > 10 AND x < 50;
PipelineDB最核心的功能便是高性能的连续聚合。其中包括:Bloom Filter、Count-Min Sketch、Top-K等算法。这里着重学习Bloom Filter。
使用角度非常简单了,如下Demo:
CREATE TYPE test_cont_complex_type AS (
x int,
y int,
z text
);
CREATE FOREIGN TABLE cont_complex_stream (r test_cont_complex_type, x int, y int, z text) SERVER pipelinedb;
CREATE VIEW test_cont_complex1 AS SELECT bloom_agg(r::test_cont_complex_type) FROM cont_complex_stream;
INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world')::test_cont_complex_type);
SELECT bloom_cardinality(bloom_agg) FROM test_cont_complex1;
首先创建了一个自定义类型,创建了一个**流(cont_complex_stream)、流视图(test_cont_complex1)**。
随后对流插入数据,最后通过Bloom Filter查看Bloom中包含的元素数量。
假设继续往里面插入数据,最后查出来的数据数量依旧是2,这种聚合将在去重角度非常有用。
INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello')::test_cont_complex_type);
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world')::test_cont_complex_type);
SELECT bloom_cardinality(bloom_agg) FROM test_cont_complex1;
查看上述流视图的表结构,可以看到通过下面这个语句会创建出bloom类型。
CREATE VIEW test_cont_complex1 AS SELECT bloom_agg(r::test_cont_complex_type) FROM cont_complex_stream;
postgres=# \d test_cont_complex1
View "public.test_cont_complex1"
Column | Type | Collation | Nullable | Default
-----------+-------+-----------+----------+---------
bloom_agg | bloom | | |
bloom_agg是一个聚集函数,如下:
CREATE AGGREGATE bloom_agg(anyelement) (
sfunc = bloom_agg_trans,
stype = bloom,
combinefunc = bloom_union_agg_trans,
parallel = safe
);
其内部实现会调用bloom_agg_trans,代码位于:src/bloomfuncs.c,在该实现文件中实现了诸如:bloom_agg_trans
、bloom_intersection_agg
等聚合函数,而这些逻辑也都非常简单,调用底层src/bloom.c。
bloom_agg_trans
会将所有的元素插入到Bloom Filter中,调用BF的create、add等操作,下面来看看BF底层实现,算法侧采用MurmurHash,像HBase、Impala都采用了这个算法。
BF几个参数:
假阳性概率
在这里默认采用n=16384,p=0.02进行计算,计算公式如下:
n = ceil(m / (-k / log(1 - exp(log(p) / k))))
p = pow(1 - exp(-k / (m / n)), k)
m = ceil((n * log(p)) / log(1 / pow(2, log(2))));
k = round((m / n) * log(2));
算法侧插入、查询都比较简单,分别是|
与&
,这里涉及到的一个问题是:如何支持不同pg类型?
在PipelineDB中处理的方式是统一类型位Bloom类型。
CREATE TYPE bloom (
input = bloom_in,
output = bloom_out,
receive = bloom_recv,
send = bloom_send,
alignment = int4,
storage = extended
);
CREATE FUNCTION bloom_in(cstring)
RETURNS bloom
AS 'MODULE_PATHNAME', 'bloom_in'
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
CREATE FUNCTION bloom_out(bloom)
RETURNS cstring
AS 'MODULE_PATHNAME', 'bloom_out'
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;
......
BF添加元素时,会创建一个StringInfo,并将pg的不同类型Datum通过DatumToBytes转换为StringInfo结构,最后将data与len作为底层hash的key、len。
static BloomFilter *
bloom_add_datum(FunctionCallInfo fcinfo, BloomFilter *bloom, Datum elem)
{
TypeCacheEntry *typ = (TypeCacheEntry *) fcinfo->flinfo->fn_extra;
StringInfo buf;
buf = makeStringInfo();
DatumToBytes(elem, typ, buf);
BloomFilterAdd(bloom, buf->data, buf->len);
pfree(buf->data);
pfree(buf);
return bloom;
}
// MurmurHash3_128(const void *key, const Size len, const uint64_t seed, void *out)
https://pipelinedb-doc-cn.readthedocs.io/zh_CN/latest/introduction.html
本节完~