首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Druid架构与实现

Druid架构与实现

原创
作者头像
charmer
修改2021-04-16 17:57:54
1.5K0
修改2021-04-16 17:57:54
举报

Druid是实时分析型数据库(OLAP),利用面向列的存储布局、分布式非共享体系结构和先进的索引结构,可以在秒量级的延迟内查询百亿行表。

Introduction

Druid产生的动机是处理机器生产的,海量而低价的数据。以往想要处理这种数据需要配备专门的硬件、团队,价格昂贵,因此这部分数据往往被浪费。

Hadoop用商品机器实现可靠大量数据,成功解决了存储问题,但是访问数据的时效性仍然没有得到保证。druid团队尝试使用RMDB、NoSQL实现实时查询,虽然实现了实时查询,却存在大量问题,比如数据可能无法简单二维表示、灵活性差无法聚合扩展以及无法连续升级等等。

于是,介于海量数据存储和生产级别的查询性能保证之间的空缺,便是druid的生存之地。Druid的实现借鉴了大量优秀的经验,比如OLAP系统、交互查询系统、内存数据库以及分布式数据存储。

解决的问题

  1. RDBMS和NoSQL无法同时提供低延迟数据采集和交互式应用所需的查询功能。
  2. 多用户、高可用。
  3. 允许用户和告警系统实时作出决策。

架构

druid集群包含了大量完全独立运行的节点(进程),因此集群内通信故障对数据可用性的影响几乎忽略不记。

目前0.20.0版本druid有6类进程,又归类为三组:

  1. master:
    1. coordinator node: 负责调度数据,通过zookeeper间接控制historical node对数据的操作
    2. overload node: 负责调度数据摄取的工作,通过zookeeper调度、指派middle manager node
  2. data:
    1. middle manager node: 负责数据摄取
    2. historical node: 负责存储数据、查询数据
  3. query:
    1. broker node: 负责将查询转发到data组
    2. router node: (可选),负责路由查询到broker和data组

由于篇幅限制,本文只介绍最为重要的middle manager node、historical node、broker node和coordinator node。

Real-Time Nodes

Real-Time Node数据摄取及查询流程
Real-Time Node数据摄取及查询流程

real-time node在系统中的具体实现是middle-manager node, 该节点提供ingest和query两个功能。在运行期间,节点对通过的事件生成索引,此时便可查询。节点只专注于小时间范围内收集事件,并定期的hand off到Historical Nodes。同时,节点不与其他节点直接相连,而是通过zookeeper宣布自己联机状态和服务的数据。

节点为所有传入的数据维护一个内存索引缓冲区(JVM Heap-based buffer),随着事件的接收而增量填充,并且也可以查询。为了避免JVM堆溢出,节点会定期或者达到用户设置的最大行数时将内存中的索引持久化存储到外存。而持久话存储的格式在后文存储格式中。每个持久话存储的索引都是只读的,在查询请求时,节点会将数据加载到非堆内存中。

节点会组织一个定期后台任务,搜索本地的持久存储的索引。该任务会将一段时间内的持久存储索引合并并构建,我们称之为segment。在hand off阶段,节点首先将数据打包存储在deep storage中(一般是分布式存储)。而ingest、persist、merge和hand off四步是流动的,任何过程中都不会丢失数据。为了详细描绘这个过程,我们用下图来解释:

数据ingest、persist、merge和hand off过程
数据ingest、persist、merge和hand off过程

节点在13:37开始运行,只会接受这个小时或者下个小时内的数据。当数据被摄取,节点向zookeeper宣布它身上有13:00到14:00时间段内的一个segment。每隔十分钟(自定义的),节点会将内存中的索引写入到外存。在13:00到14:00时间段即将结束时,节点很可能会开始摄取14:00到15:00的数据。当节点真的这么做,它会在内存中常见一个新的索引,然后宣布它同时对外提供14:00到15:00时间段内的一个segment。节点不会立刻合并外存中13:00到14:00的索引,而是等待一段时间(自定义的),当这个窗口时间结束,节点将合并13:00到14:00的索引为一个只读的segment,然后将其hand off(发送到deep storage)。只有当这个segment在集群中被另一个节点(historical node)宣布提供服务,本节点才会删除数据并宣布不提供服务。

可用性和扩展性

middle-manager node是数据消费者,通常,出于数据持久性目的,消息总线(e.g. kafka)位于消息生产者和middle-manager node之间,正常情况下数据消耗的时间为几百毫秒。

使用消息总线目的有二:

  1. 缓冲区。消息总线保持位置偏移,指示middle-manager node在事件流中读取的offset。因此可以手动调整offset。每每需要持久话内存中的索引到外存时,middle-manager node都会更新offset。在节点挂了并恢复时,若持久话存储没有丢失,middle-manager node可以从磁盘中加载所有持久化索引,并继续从它提交的最后一个偏移量读取数据。
  2. 支持多middle-manager node同时读取数据。多个节点可以同时读取同一时间段数据,从而创建事件的副本。同时,这种模式允许对数据流分区,以便多个节点每个消耗数据流的一部分,使得可以无缝添加midele-manager node(最大支持500MB/s)的速度消耗原始数据)。
offset用处
offset用处

Historical Nodes

historical node加载、服务middle-manager node提供的只读segment块。在实际工作流中,historical node加载的数据是不可变的,是查询的主要工作节点。节点之间遵循无耦合架构,之间没有任何交集,只知道如何加载、删除和服务只读的segment。

middle-manager node类似,historical node也会往zookeeper宣布在线状态和服务的数据。加载、删除段的指令通过zookeeper发送,并包含有关段在deep storage存储位置以及如何解压和处理段的信息。在下载被要求的segment之前首先检查自己的cache中是否已经存在。在处理完成后,就会在zookeeper中宣布段可被查询(queryable)。本地cache允许快速更新和重启historical node。启动时节点检查缓存并同样在zookeeper宣布。

historical node从deep storage加载segment
historical node从deep storage加载segment

得益于只读的segment,historical node具有读一致性。同时,节点支持简单的并行化模型,historical nodes可以同时扫描和聚合只读块。

分层(Tiers)

historical nodes可以分组在不同的层中,给定层中所有节点配置相同。可以给每层设置不同的性能和容错参数。分层是为了让优先级不同的segment可以根据重要性分布。例如,可以设置hot层,之中的historical node具有大量内存和cpu,因此可以配置去下载更为频繁使用的segment。

这里可以看看这篇知乎工程师的经验:知乎 Druid 集群优化实践

可用性

zookeeper只控制segment的新增删除,不会影响已经在cache中的segment的可用性。由于查询是通过HTTP请求的,因此zookeeper出问题不会影响现有数据的查询。

Broker Nodes

broker nodehistorical nodemiddle-manager node的查询路由。通过查询zookeeper中发布的元数据,broker node将得知哪些段queryable,并且在哪些节点。broker node将查询路由到正确的节点,并合并最终的结果返回给调用者。

缓存

broker node使用LRU策略。cache使用本地堆内存或者外部分布式键值对存储服务。每每broker node收到查询请求,它首先将确定查询涉及的segment。某些段的结果可能已经存储于缓存中,无需重新计算。若不存在,则broker node将计算转发到正确的middle-manager nodehistorical node中。一旦收到historical node的结果,broker node将结果缓存到本地。注意,middle-manager node的查询是实时查询,结果不缓存(仍然会变化)。

broker node处理查询过程
broker node处理查询过程
可用性

在zookeeper挂了时,仍然可以查询数据。如果broker node无法与zookeeper通信,它将使用集群的最后一个已知视图,并依此将查询转发。也就是说假设集群的数据不再变动流通。

Coordinator Nodes

coordinator node主要负责数据管理和segment在historical node的分布。coordinator node控制historical node加载新segment、删除过期segment并复制segment,移动segment以保证负载均衡。为了保证视图稳定,druid使用multi-version concurrency control swapping protocal来管理只读segment。若任何只读segment包含的数据被较新的segment完全废弃,则过期的segment将被删除。coordinator node会经历一个领导者选举(leader-election)过程,以选择出一个作为coordinator功能的节点,其余的节点当作冗余备份。

coordinator node定期运行以确定集群的当前状态。它通过比较集群的预期状态和运行时集群的实际状态来做出决策。同样的,coordinator node也会与zookeeper维护一个连接。与此同时,还会维护与metadata storage的连接。其中的一个关键信息是一张表,包含了所有应该由historical node提供服务的段的列表。这张表可以被任何创建段的进程更新,比如middle-manager nodemetadata storage中还包含一个规则表(rule table),控制如何在集群中创建、删除和复制segment。

规则(rules)

规则控制如何从集群加载和删除historical node的segment。规则指定如何将segment分配给不同的historical node tiers,并且每一层中应该存在多少segment的备份。规则控制何时完全删除segment。规则在一段时间内制定的。比如:用户可以制定规则以将一个月的数据段加载到hot层中,将一年数据加载到层中,删除其他旧数据。

coordinator nodemetadata storage中加载一组规则,规则可能指定某一个数据源(datasource)或是所有数据源的默认规则。coordinator node将遍历所有可用的segment,并将每个segment与应用于它们的第一条规则匹配。

负载均衡

在生产环境中,查询通常会涉及几十个甚至几百个段。由于每个historical node都有有限的资源,因此必须在集群之间分配segment,以确保集群负载不会太不平衡。然而,确定最佳的负载均衡需要一些关于查询寻模式的知识。通常,查询覆盖单个数据源中跨越连续时间间隔的最近segment。平均而言,访问较小segment的查询速度更快。

这些查询模式建议以更高的速率复制最近的historical node的数据,将时间上接近不同historical node的大型segment分开,并将来自不同数据源的数据段放在一起。为了在集群中优化分配和平衡segment,druid使用一个cost-based优化过程,考虑了segment的数据源、最近性和大小。该算法这里不提。

副本

coordinator node会控制不同的historical node加载相同的一段副本。而副本数量是可以配置的,需要更高的容错率可以设置更多副本数。复制segment的方式与正常segment一致,并也遵循相同的负载均衡算法。通过副本的方式使得druid中单个historical node故障变得无所谓。同样,通过这种策略,我们可以无缝地将historical node脱机、更新、备份与软件升级。(有点疑惑,这样需要人工一个个节点停机操作后开机,这不SRE)

可用性

coordinator node依赖zookeeper和metadata storage

coordinator node利用zookeeper确定集群中已存在哪些historical node。若zookeeper不可用,coordinator node将无法再发送分配、平衡和删除segment的指令。但是不会影响数据可用性。

druid使用metadata storage存储操作管理信息和关于集群中应该存在哪些段元数据信息。若metadata storage不可用,仅仅会导致coordinator node无法获取这些信息,此时coordinator node无法再执行它的任务,但是集群中broker nodehistorical nodemiddle-manager node仍然正常运行并且可以查询(queryable)。

总而言之,如果负责协调的外部依赖挂了,集群将保持现状。

存储格式

druid使用列向存储数据,同样最适合使用于聚合事件流(aggregating event streams)。列存储在查询时可以只加载和扫描所需内容,降低了负载。druid的列共有三种类型,如前文所述,并且使用不同的压缩方法来降低在内存和磁盘上存储的成本。

比如字符串。直接存储字符串是不必要的,druid使用字典压缩的方式存储字符串。对于每个字符串将之映射为一个唯一的整数标识符,于是可以用一个整数数组和一个map来表示原本的字符串列。而结果整数数组非常适合压缩,druid使用LZF算法压缩。

在实际情况的OLAP工作流中,往往查询是对满足某一dimension集合规范的某一metrics集合的聚合结果。并且,dimension往往是字符串(string),metric往往是数值。druid为字符串列创建了额外的反向索引,以便只扫描与特定查询筛选相关的行。这种使用位图方式,执行boolean运算,在搜索引擎中经常见到。druid使用Concise压缩算法压缩位图。

段存储格式

Druid存储的数据格式是列向的表,其中列的类型可以分为三类:

  1. timestamp:目前一个表中只有一个,是OLAP的基础。
  2. dimension:存储可以直接获取的数据,字符串、整形、浮点型,可以过滤聚合。
  3. metric:存储二次获取的值,可以聚合。

对于timestamp和metric列来说,存储非常简单,用LZ4(可自定义)压缩的整数或者浮点数组存储。一旦查询确认要哪些行,便会解压这些行,提取相关行,然后适用所需的聚合运算获取结果。

对于字符串dimension列来说就不一样了,因为字符串dimension支持过滤和聚合操作。它会存储三种数据结构(第三种bitmap可以自定义是否需要):

  1. 将值映射到整数id的字典
  2. 使用第一步中字典进行编码的列表
  3. 对于列中每一个不同的值,标识哪些行包含该值的位图

现在考虑下druid官网给出的例子:

数据例子
数据例子

对于这样一个小表中Page这一字符串dimension列,在druid中会有以下三个数据结构:

1: 将值映射到整数id的字典
{
	"Justin Bieber": 0,
	"Ke$ha"        : 1
}

2. 使用第一步中字典进行编码的列表
[
 0,
 0,
 1,
 1
]

3. 对于列中每一个不同的值,标识哪些行包含该值的位图
value="Justin Bieber": [1,1,0,0]
value="Ke$ha":         [0,0,1,1]

那么为什么要设计这三种数据结构呢?字典将字符串映射到整数,以便字符串可以在2、3中紧凑表示,同时避免了重复字符串占用大量存储。而3中的bitmap(这里用作倒排索引)可以进行快速过滤操作(比如AND、OR)。在过滤时,druid仅需考虑目标列bitmap非0行即可;在groupby时,也只需将非0行取出进行其他操作。

注意,bitmap是比较消耗资源的,位图大小是数据行数*列基数,虽然这是一个非常稀疏,高度可压缩的位图,可以考虑在重复字符串很少、不常用作聚合过滤操作的字符串diemension中禁用。

性能

配置

我们集群配置如下:

12个historical node,每个8核16G,拥有2T缓存。

20个middlemanager node,每个6核16G,拥有200G缓存。

数据ingestion

ingestion性能不好衡量,每个task的peon虚拟机配置不同,每个task的时长、数据源数量不同,每个核心频率不同,内存speed不同,甚至数据表的字段、有无特殊处理都不同,因此不好衡量。

由于远未到druid数据摄取极限,目前我们集群中最高达到60'0000条数据/minute/core。

druid开发团队给出的数据: 10-100K events/second/core

数据查询

我设计了一组sql查询实验,基于80亿量级数据查询。结果见下文:

test1
select model, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by model order by cnt desc
test2
select model, os, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by model, os order by cnt desc
test3
select SUBSTRING(app_id, 1, 2) AS "app_id_substring", model, os, count(*) as cnt from "tencent-json_kv_3"
where app_id='620' group by 1, model, os order by cnt desc
test4
select count(DISTINCT os) as os, TIME_CEIL(__time, 'PT00H05M') from "tencent-json_kv_3"
where app_id='620'
group by 2
test5
select TIME_CEIL(__time, 'PT00H05M'), MAX(op) from "tencent-json_kv_3"
where app_id='620' AND op <= 9000 AND page_id IS NOT NULL
group by 1
order by 2

总结

druid的使用比较繁琐,有大量配置需要不断测试比较,不同类型、不同情况又有不同的推荐参数,比较吃经验。另外,druid查询缓存命中对查询性能影响极大,命中情况下查询性能能到几秒甚至几毫秒。

对于一个OLAP来说,druid实现了它要求的功能,甚至超越了自己设下的目标。但是配置又过于繁琐,很多参数选项配置实际上重复了,完全可以用部分参数推导剩下的部分。

由于druid仍在成长期,后续改进可能日新月异。希望druid能成为apache的另一面招牌。

最后总结一下druid的适用情况:

Druid适用于

  1. 数据经常插入而很少更新、删除
  2. 查询一般是聚合查询与非组查询(Group By),部分检索和扫描查询
  3. 数据查询延迟要求在100毫秒到几秒之间
  4. 数据有时间字段
  5. 多表场景下,每次查询只命中一个大的分布式表,而且会命中多个小的loopup表
  6. 场景中包含high cardinality data columns(这里不太好表诉,用原文。例如:URL,用户ID),并且需要对其进行快速计数和排序
  7. 需要从Kafka、HDFS对象存储中加载数据

Druid不适用于

  1. 根据主键对数据低延迟更新
  2. 延迟完全不是关键的离线数据系统
  3. 场景包括大连接,并且可以容忍大量时间花销

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Introduction
  • 解决的问题
  • 架构
    • Real-Time Nodes
      • 可用性和扩展性
    • Historical Nodes
      • 分层(Tiers)
      • 可用性
    • Broker Nodes
      • 缓存
      • 可用性
    • Coordinator Nodes
      • 规则(rules)
      • 负载均衡
      • 副本
      • 可用性
  • 存储格式
    • 段存储格式
    • 性能
      • 配置
        • 数据ingestion
          • 数据查询
            • test1
            • test2
            • test3
            • test4
            • test5
        • 总结
          • Druid适用于
            • Druid不适用于
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档