前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >流式数据库PipelineDB之BF杂谈

流式数据库PipelineDB之BF杂谈

作者头像
公众号guangcity
发布2022-12-02 20:46:18
6930
发布2022-12-02 20:46:18
举报
文章被收录于专栏:光城(guangcity)

流式数据库 PipelineDB

1.导语

PipelineDB是一个PostgreSQL的一个流式数据库,是pg社区的一个扩展。

下面来引入PipelineDB里面的一些概念:

1.1 什么是流?

流是一种允许客户端将时序数据写入流视图的抽象管道。流里面的一行数据(或者简单称作 event),与数据表中的行数据是很相似的,并且二者的写入也是完全一致的。然而,流和数据表的语义是完全不同的。简单来说,在PipelineDB中是一个foreign table,该表仅仅写数据,不可读取。

代码语言:javascript
复制
public | wiki_stream              | foreign table | postgres

当从该表中读取报错如下:

代码语言:javascript
复制
postgres=# select * from wiki_stream;
ERROR:  "wiki_stream" is a stream
HINT:  Streams can only be read by a continuous view's FROM clause.

1.2 那如何要使用这些流/读取这些流?

此时便引出流视图,可以达到"流和表中的数据组合后作为输入并进行实时增量更新"的效果。

流数据一旦被流视图读取后就会被销毁,流数据不会存储在任何地方。只有诸如 SELECT * FROM that_view 查询返回的结果才会被持久化,也就是说,流视图可以被视为高吞吐量、实时的物化视图。

创建流视图如下语法,在原生SQL中扩展action行为。

代码语言:javascript
复制
CREATE VIEW name [WITH (action=materialize [, ...])]  AS query

使用如下:

代码语言:javascript
复制
---创建流
CREATE FOREIGN TABLE test_stream_targets_stream (x int) SERVER pipelinedb;

---创建流视图
CREATE VIEW test_stream_targets0 AS SELECT COUNT(*) FROM test_stream_targets_stream;

1.3 流转换

流转换是一个比较有趣的概念,可以在不存储数据的情况下,将数据作为另外一个流的输入或者写入流视图中。由于在进行流转换过程中数据不会被存储,因此流转换不支持聚合操作。

语法:

代码语言:javascript
复制
CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query

例如:

代码语言:javascript
复制
---流转换 
CREATE VIEW t WITH (action=transform) AS
  SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
---将流转换的数据进行存储
CREATE VIEW v WITH (action=materialize) AS
  SELECT sum(y) FROM output_of('t');

当然,这里也可以传递outputfunc,可以自定义,function_name 是一个用户传入的函数,它的返回类型为 trigger,并且会作用到流转换的每一行输出上。arguments 是一系列逗号分隔的参数,在触发器执行时传给函数,只能为字符串常量。

例如:

代码语言:javascript
复制
CREATE VIEW ct1 WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('ct_stream0')) AS SELECT x::int % 4 AS x FROM ct_stream1 WHERE x > 10 AND x < 50;

2.流聚合

PipelineDB最核心的功能便是高性能的连续聚合。其中包括:Bloom Filter、Count-Min Sketch、Top-K等算法。这里着重学习Bloom Filter。

2.1 Bloom Filter

使用角度非常简单了,如下Demo:

代码语言:javascript
复制
CREATE TYPE test_cont_complex_type AS (
  x int,
  y int,
  z text
);

CREATE FOREIGN TABLE cont_complex_stream (r test_cont_complex_type, x int, y int, z text) SERVER pipelinedb;
CREATE VIEW test_cont_complex1 AS SELECT bloom_agg(r::test_cont_complex_type) FROM cont_complex_stream;

INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world')::test_cont_complex_type);

SELECT bloom_cardinality(bloom_agg) FROM test_cont_complex1;

首先创建了一个自定义类型,创建了一个**流(cont_complex_stream)、流视图(test_cont_complex1)**。

随后对流插入数据,最后通过Bloom Filter查看Bloom中包含的元素数量。

假设继续往里面插入数据,最后查出来的数据数量依旧是2,这种聚合将在去重角度非常有用。

代码语言:javascript
复制
INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world'));
INSERT INTO cont_complex_stream (r) VALUES ((1, 1, 'hello')::test_cont_complex_type);
INSERT INTO cont_complex_stream (r) VALUES ((1, 2, 'world')::test_cont_complex_type);

SELECT bloom_cardinality(bloom_agg) FROM test_cont_complex1;

2.2 BF实现

查看上述流视图的表结构,可以看到通过下面这个语句会创建出bloom类型。

代码语言:javascript
复制
CREATE VIEW test_cont_complex1 AS SELECT bloom_agg(r::test_cont_complex_type) FROM cont_complex_stream;

postgres=# \d test_cont_complex1
          View "public.test_cont_complex1"
  Column   | Type  | Collation | Nullable | Default 
-----------+-------+-----------+----------+---------
 bloom_agg | bloom |           |          |

bloom_agg是一个聚集函数,如下:

代码语言:javascript
复制
CREATE AGGREGATE bloom_agg(anyelement) (
  sfunc = bloom_agg_trans,
  stype = bloom,
  combinefunc = bloom_union_agg_trans,
  parallel = safe
);

其内部实现会调用bloom_agg_trans,代码位于:src/bloomfuncs.c,在该实现文件中实现了诸如:bloom_agg_transbloom_intersection_agg等聚合函数,而这些逻辑也都非常简单,调用底层src/bloom.c。

bloom_agg_trans会将所有的元素插入到Bloom Filter中,调用BF的create、add等操作,下面来看看BF底层实现,算法侧采用MurmurHash,像HBase、Impala都采用了这个算法。

BF几个参数:

  • n filter当中元素数量
  • p

假阳性概率

  • m filter当中位数量
  • k hash函数数量

在这里默认采用n=16384,p=0.02进行计算,计算公式如下:

代码语言:javascript
复制
n = ceil(m / (-k / log(1 - exp(log(p) / k))))
p = pow(1 - exp(-k / (m / n)), k)
m = ceil((n * log(p)) / log(1 / pow(2, log(2))));
k = round((m / n) * log(2));

算法侧插入、查询都比较简单,分别是|&,这里涉及到的一个问题是:如何支持不同pg类型?

在PipelineDB中处理的方式是统一类型位Bloom类型。

代码语言:javascript
复制
CREATE TYPE bloom (
  input = bloom_in,
  output = bloom_out,
  receive = bloom_recv,
  send = bloom_send,
  alignment = int4,
  storage = extended
);
CREATE FUNCTION bloom_in(cstring)
RETURNS bloom
AS 'MODULE_PATHNAME', 'bloom_in'
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;

CREATE FUNCTION bloom_out(bloom)
RETURNS cstring
AS 'MODULE_PATHNAME', 'bloom_out'
LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE;

......

BF添加元素时,会创建一个StringInfo,并将pg的不同类型Datum通过DatumToBytes转换为StringInfo结构,最后将data与len作为底层hash的key、len。

代码语言:javascript
复制
static BloomFilter *
bloom_add_datum(FunctionCallInfo fcinfo, BloomFilter *bloom, Datum elem)
{
 TypeCacheEntry *typ = (TypeCacheEntry *) fcinfo->flinfo->fn_extra;
 StringInfo buf;

 buf = makeStringInfo();
 DatumToBytes(elem, typ, buf);
 BloomFilterAdd(bloom, buf->data, buf->len);

 pfree(buf->data);
 pfree(buf);

 return bloom;
}
// MurmurHash3_128(const void *key, const Size len, const uint64_t seed, void *out)

https://pipelinedb-doc-cn.readthedocs.io/zh_CN/latest/introduction.html

本节完~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 光城 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 流式数据库 PipelineDB
    • 1.导语
      • 1.1 什么是流?
        • 1.2 那如何要使用这些流/读取这些流?
          • 1.3 流转换
            • 2.流聚合
              • 2.1 Bloom Filter
                • 2.2 BF实现
                相关产品与服务
                对象存储
                对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档