前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse案例:查询结果不一致

ClickHouse案例:查询结果不一致

原创
作者头像
Yannic
发布2020-11-16 11:11:52
12.5K1
发布2020-11-16 11:11:52
举报

问题背景

某用户反馈其使用的ClickHouse集群同样的查询返回了不同的结果,是否是ClickHouse数据不能够保证一致性,还是集群有问题。

对于数据库来说,查询数据的准确性至关重要,我查询确定的数据你给我返回不一致的结果,那这结果还有何可用性而言,因此这个问题对用户的重要性不言而喻。

在收到用户反馈的这个问题后,第一时间和用户确认了用户具体的使用情况。

  1. 在集群中的各个节点创建本地表,表引擎为Kafka同时创建了对应的视图(消费Kafka里的数据);
  2. 创建分布式表,表引擎Distributed,汇总视图;
  3. 多次执行同一条查询返回了不一致的结果。

查询数据是通过分布式表来进行的,要想弄清楚为何每次查询返回的数据不一致,首先就需要弄清楚分布式表的原理。

分布式表

具有分布式引擎的表本身不存储任何数据,但可以在多个节点上进行分布式查询。读取会自动并行化进行,无需参数配置或手动干预。 查询时随机选择某个shard的replica进行读取。如果表有索引优先使用索引。

分布式引擎参数:服务器配置文件中的集群名远程数据库名远程表名数据分片键(可选)。

代码语言:txt
复制
Distributed(logs, default, hits[, sharding_key])

查询时将从集群中每个服务器上的default.hits表中读取数据。

本文示例的集群配置如下:

代码语言:txt
复制
<?xml version="1.0" encoding="UTF-8"?>
<yandex>
    <clickhouse_remote_servers>
        <default_cluster>
            <shard>
                <internal_replication>false</internal_replication>
                <replica>
                    <host>10.0.3.27</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>10.0.3.41</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>10.0.3.46</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>10.0.3.26</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </clickhouse_remote_servers>
    <zookeeper-servers>
        <node>
            <host>10.0.3.12</host>
            <port>2181</port>
        </node>
        <node>
            <host>10.0.3.3</host>
            <port>2181</port>
        </node>
        <node>
            <host>10.0.3.23</host>
            <port>2181</port>
        </node>
    </zookeeper-servers>
</yandex>

集群名:default_cluster,包括两个分片,每个分片两个副本。

分片:各个分片(服务器)包含不同的数据(为了读取所有数据,必须访问所有分片)。

副本:多个相同冗余的服务器(读取数据时可以访问任何一个副本上的数据)。

当指定了副本时,读取的操作将为每个分片选择一个可用副本。也可以配置用于负载均衡的算法(访问副本的首选项(load_balancing = random/nearest_hostname/first_or_random/round_robin)–具体参阅官方文档load_balancing设定

问题复现

集群:

代码语言:txt
复制
┌─cluster─────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
│ default_cluster │         1 │            1 │           1 │ 10.0.3.27 │ 10.0.3.27    │ 9000 │        1 │ default │                  │            0 │                       0 │
│ default_cluster │         1 │            1 │           2 │ 10.0.3.41 │ 10.0.3.41    │ 9000 │        0 │ default │                  │            0 │                       0 │
│ default_cluster │         2 │            1 │           1 │ 10.0.3.46 │ 10.0.3.46    │ 9000 │        0 │ default │                  │            0 │                       0 │
│ default_cluster │         2 │            1 │           2 │ 10.0.3.26 │ 10.0.3.26    │ 9000 │        0 │ default │                  │            0 │                       0 │
└─────────────────┴───────────┴──────────────┴─────────────┴───────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────────────┘

如上所示:集群中有两个分片,每个分片两个2副本

shard_num 1: 10.0.3.27 10.0.3.41

shard_num 1: 10.0.3.46 10.0.3.26

创建本地非复制表、创建分布式表、向分布式表中插入数据、查询分布式表

代码语言:txt
复制
CREATE TABLE test.ddl_test ON cluster default_cluster(
  `Year` UInt16,
  `Quarter` UInt8,
  `Month` UInt8,
  `DayofMonth` UInt8,
  `DayOfWeek` UInt8,
  ...
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
代码语言:txt
复制
CREATE TABLE test.ddl_all ON cluster default_cluster AS test.ddl_test
ENGINE = Distributed(default_cluster, test, ddl_test, rand())
代码语言:txt
复制
10.0.3.27 :) INSERT INTO ddl_all SELECT * FROM ontime;

INSERT INTO ddl_all SELECT *
FROM ontime

↘ Progress: 185.13 million rows, 134.51 GB (413.80 thousand rows/s., 300.65 MB/s.) ██████████ 99%Ok.

0 rows in set. Elapsed: 447.398 sec. Processed 185.13 million rows, 134.51 GB (413.80 thousand rows/s., 300.65 MB/s.)
代码语言:txt
复制
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
134720581

可以看到相同的查询语句返回了不同的结果。

代码语言:txt
复制
## 41与27同一分片并将internal_replication设定为false
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_test" 
92562599
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_test" 
92562599
## 46与26同一分片并将internal_replication设定为true
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_test" 
50413171
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_test" 
42157982

# clickhouse-client -h 10.0.3.41 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.27 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.46 <<< "select count(1) from test.ddl_all"
142975770
# clickhouse-client -h 10.0.3.26 <<< "select count(1) from test.ddl_all"
134720581

# clickhouse-client -h 10.0.3.27 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
227283180
# clickhouse-client -h 10.0.3.41 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.46 --max_parallel_replicas=2 <<< "select count(1) from test.ddl_all"
235538369

原理分析

  1. 首先查询Distributed表引擎的过程是先通过本地的表查询,和当前节点处于同一个分片下的Replication副本节点不会接收到查询的请求,和当前节点不同的分片会随机选择这个分片中的一个副本发送请求,然后再聚合各个分片返回的数据最后返回最终结果。
  2. 查询的过程中如果指定--max_parallel_replicas大于1个,会同时并行向多个(前项指定)不和当前节点处于同一个分片下的副本发送查询请求,之后会聚合这些接收请求的副本返回的结果(不会去除重复),因此可以看到上一章节中我们通过指定--max_parallel_replicas=2在节点10.0.3.46和10.0.3.26 上都查询出了多于正确结果185133752的数量。
  3. 分布式表和本地表关联紧密(类似视图概念),如果同一分片中设定了<internal_replication>false</internal_replication>通过分布式表插入数据会同时向多个副本写入,这样每个副本都有完整的数据,此时通过Distributed表引擎查询分布式表则可以返回正确的结果。但这种情况可能会导致最终的各个副本状态不一致(如果不使用Zookeeper来进行协调,任何单一节点的中断都会导致最终数据的不一致)。

最后我们通过将集群中的所有分片都设定:<internal_replication>false</internal_replication>

后再执行以上试验,得到了如下的结果:

代码语言:txt
复制
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_local"
92567953
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_local"
92567953
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_local"
92565799
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_local"
92565799
# clickhouse-client -h 10.0.3.41  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.27  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.46  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26  <<< "select count(1) from test.ddl_all"
185133752
# clickhouse-client -h 10.0.3.26 --max_parallel_replicas=2  <<< "select count(1) from test.ddl_all"
277701705

可以看到结果如上诉分析的,正常查询分布式表能够得到正确的结果,使用 --max_parallel_replicas=2指定同时并行查询的副本数量为2得到了有冗余的结果277701705,这不是正确的结果185133752。

解决方案

  1. 使用Replicated MergeTree family表
  2. 建议不要通过Distributed表插入数据(适合查询)
  3. 副本数大于等于2的时候,分布式表一定要建立在Replicated引擎本地表上,这样能够避免遇到很多异常情况。

参考文献

1 https://clickhouse.tech/docs/en/engines/table-engines/special/distributed/

2 https://github.com/ClickHouse/ClickHouse/issues/5835

3 https://github.com/ClickHouse/ClickHouse/issues/1443

4 https://clickhouse.tech/docs/zh/engines/table-engines/special/distributed/

5 https://clickhouse.tech/docs/en/operations/settings/settings/#settings-load_balancing

6 https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/

7 https://github.com/ClickHouse/ClickHouse/issues/1854

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题背景
  • 分布式表
  • 问题复现
  • 原理分析
  • 解决方案
  • 参考文献
相关产品与服务
弹性 MapReduce
弹性 MapReduce (EMR) 是基于云原生技术和泛 Hadoop 生态开源技术的安全、低成本、高可靠的开源大数据平台。提供易于部署及管理的 Hive、Spark、HBase、Flink、StarRocks、Iceberg、Alluxio 等开源大数据组件,帮助客户高效构建云端企业级数据湖技术架构。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档