clickhouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS),由俄罗斯最大的搜索公司Yandex开发,于2016年开源,采用c++开发。
OLAP(Online analytical processing),即联机分析处理,主要用于支持企业决策管理分析。数据库概念最初源于1962年Kenneth Iverson发表的名为“A Programming Language” (APL)的著作,它第一次提出了处理操作和多维变量的的数学表达式,后来APL语言由IBM实现。
OLAP委员会对联机分析处理的定义为:使分析人员、管理人员或执行人员能够从多种角度对从原始数据中转化出来 的、能够真正为用户所理解的、并真实反映企业维特性的信息进行快速、一致、交互的存取,从而获得对数据更深 入了解的一类软件技术.
第一款OLAP产品Express于1975年问世,随着被Oracle收购后繁荣发展了30余年,最后由继任者Oracle 9i替代。这么多年过去,基本的OLAP理念和数据模型仍然未变。
OLAP这个名词是数据库之父Edgar F. Codd于1993年在文章《Providing OLAP (On-Line Analytical Processing) to User-Analysts: An IT Mandate》提出,他总结了OLAP产品的12个原则,随后OLAP产品相继问世并逐渐形成今天的格局。
OLAP按存储器的数据存储格式分为基于多维数据库的MOLAP和交互分析的的ROLAP。
列式数据对于大多数查询而言,处理速度至少提高了100倍
行式
列式
分析场景中,我们一般会读大量的行而取少量的列,在列式存储结构下,我们只需要取对应的列数据就可以,不参与计算的列完全不会被扫描到,这会极大的降低磁盘 IO 的消耗。
基于列式存储的结构,同一列中的数据属于同一类型,压缩效果会更加显著。列存储往有着高达十倍甚至更高的压缩比,节省了大量的存储空间,降低了存储成本。
SIMD(Single Instruction Multiple Data)即单条指令操作多条数据,它是通过数据并行以提高性能的一种方式,可以简单理解为在寄存器层面对程序中的数据做并行处理,Clickhouse 在能够提升计算效率的地方大量使用了 SIMD,通过使用 SIMD,基本上能带来几倍的性能提升,像阿里云的 PolarDB-X 也引入了向量化执行引擎,为表达式计算带来了几十倍的性能提升。
分布式领域存在一条定律,计算移动比数据移动更加划算,这也是其核心所在,将数据的计算直接发放到数据所在的服务器,多机并行处理,再把最终的结果汇集在一起;另外 Clickhouse 也通过线程级别并行的方式为效率进一步提速,极致去利用服务器的资源。
◆多样化的表引擎
ClickHouse可以在任何具有x86_64,AArch64或PowerPC64LE CPU架构的Linux,FreeBSD或Mac OS X上运行。
官方预构建的二进制文件通常针对x86_64进行编译,并利用SSE 4.2指令集,因此,除非另有说明,支持它的CPU使用将成为额外的系统需求。下面是检查当前CPU是否支持SSE 4.2的命令:
$ grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not supported"
Ubuntu的官方预编译deb软件包。运行以下命令来安装包:
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get updatesudo apt-get install -y clickhouse-server clickhouse-clientsudo service clickhouse-server startclickhouse-client
1、安装docker
2、安装ClickHouse
客户端:docker pull yandex/clickhouse-client 服务端:docker pull yandex/clickhouse-server
3、命令启动镜像或者docker控制台启动
docker run -d --name ch-server --ulimit nofile=262144:262144 -p 8123:8123 -p 9000:9000 -p 9009:9009 yandex/clickhouse-server
4、启动命令
clickhouse-client
方法二:y
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPGsudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
服务端启动命令:clickhouse-server
客户端启动命令:clickhouse-client
创建的时候将ENGINE参数声明为MergeTree()
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name (
name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
省略...
) ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, 省略...]
PARTITION BY [选填]:分区键,用于指定表数据以何种标准进行分区。
ORDER BY[选填]:排序键
PRIMARY KEY [选填] :主键
SAMPLE BY [选填]:抽样表达式,用于声明数据以何种标 准进行采样
SETTINGS:index_granularity [选填]:它表示索引的粒度,默认值为 8192。也就是说,MergeTree的索引在默认情况下,每间隔8192行数据 才生成一条索引,稀疏索引。
partition:分区目录,下面存放这各类数据文件,相同分区的数据,会被合并到同一个分区目录,不同的分区,数据永远不会被合并到一起。
MergeTree分区目录的完整 物理名称并不是只有ID而已,在ID之后还跟着一串奇怪的数字,例如 20190511_0,分区ID如何生成的。
命名规则
PartitionID_MinBlockNum_MaxBlockNum_Level
属于同一个分区的多个目录,在合并之后会生成一个全新的目 录,目录中的索引和数据文件也会相应地进行合并。新目录名称的合并方式遵循以下规则,其中:
分区目录从创建、合并到删除的整个过程
分区目录在发生合并之后,旧的分区目 录并没有被立即删除,而是会存留一段时间。但是旧的分区目录已不 再是激活状态(active=0),所以在数据查询时,它们会被自动过滤 掉。
MergeTree的主键使用PRIMARY KEY定义,待主键定义之后, MergeTree会依据index_granularity间隔(默认8192行),为数据表生成 一级索引并保存至primary.idx文件内,索引数据按照PRIMARY KEY排 序。相比使用PRIMARY KEY定义,更为常见的简化形式是通过 ORDER BY指代主键。
primary.idx文件内的一级索引采用稀疏索引实现。
稀疏索引的优势是显而易见的,它仅需使用少量的索引标记就能 够记录大量数据的区间位置信息,且数据量越大优势越为明显。以默 认的索引粒度(8192)为例,MergeTree只需要12208行索引标记就能为 1亿行数据记录提供索引。由于稀疏索引占用空间小,所以primary.idx 内的索引数据常驻内存,取用速度自然极快。
索引粒度 通过index_granularity这个参数设置,默认是8092。
数据以indexgranularity的粒度(默认8192)被标记成多个小的区 间,其中每个区间最多8192行数据。MergeTree使用MarkRange表示一 个具体的区间,并通过start和end表示其具体的范围。indexgranularity 的命名虽然取了索引二字,但它不单只作用于一级索引(.idx),同时 也会影响数据标记(.mrk)和数据文件(.bin)。
如果使用CounterID作为主 键(ORDER BY CounterID),则每间隔8192行数据就会取一次 CounterID的值作为索引值,索引数据最终会被写入primary.idx文件进行 保存
如果使用多个主键,例如ORDER BY(CounterID,EventDate),则每 间隔8192行可以同时取CounterID与EventDate两列的值作为索引值
MarkRange:一个具体的数据段,MarkRange与索引编号对应,使用start和end两个属性表示其区间范围。
索引查询其实就是两个数值区间的交集判断。其中,一个区间是由基于主键的查询条件转换而来的条件区间;而另一个区间是MarkRange对应的数值区间。
假如现在有一份测试数据,共192行记 录。其中,主键ID为String类型,ID的取值从A000开始,后面依次为 A001、A002……直至A192为止。MergeTree的索引粒度 index_granularity=3,根据索引的生成规则,primary.idx文件内的索引数据会如下图所示。
根据索引数据,MergeTree会将此数据片段划分成192/3=64个小的 MarkRange,两个相邻MarkRange相距的步长为1。其中,所有 MarkRange(整个数据片段)的最大数值区间为[A000,+inf),如下图所示。
查询过程
granularity与index_granularity的关系
indexgranularity定 义了数据的粒度,而granularity定义了聚合信息汇总的粒度。换言之, granularity定义了一行跳数索引能够跳过多少个indexgranularity区间的 数据。
二级索引类型
压缩数据块
MergeTree在数据具体的写入过程中,会依照索引粒度,按批次获取数据并进行处理。如果把一批数据 的未压缩大小设为size,压缩前数据字节大小,严格控制在64kb-1MB之间。
写入过程:
数据标记的生成规则
分区、索引、标记和压缩数据, 现在将它们聚在一块进行一番总结。接下来,就分别从写入过程、查 询过程,以及数据标记与压缩数据块的三种对应关系的角度展开介绍。
数据标记与压缩数据块的对应关系:一对一、多对一、一对多。
表引擎可以分为6个系列,分别是合并树、外部存储、内存、文件、接口和其他,每一个系列的 表引擎都有着独自的特点与使用场景。
除了基础表引擎MergeTree之 外,常用的表引擎还有ReplacingMergeTree、SummingMergeTree、 AggregatingMergeTree、CollapsingMergeTree和 VersionedCollapsingMergeTree
有三类存储策略
MergeTree拥有主键,但是它的主键却没有唯一键的约束,ReplacingMergeTree为了数据去重而设计的,它能够在合并分区时删除重复的数据。
创建ReplacingMergeTree表的方法,替换Engine即可
ENGINE = ReplacingMergeTree(ver)//ver是选填参数,会指定一个UInt*、Date或者DateTime类型的字段作为版本号
创建ReplacingMergeTree数据表方法
CREATE TABLE replace_table( id String,
code String,
create_time DateTime
)ENGINE = ReplacingMergeTree()PARTITION BY toYYYYMM(create_time)ORDER BY (id,code) //根据id与code去重
PRIMARY KEY id
只有在相同的数据分区内重复的数据才可以被删除,而不同数 据分区之间的重复数据依然不能被剔除
总结:ReplacingMergeTree在去除重复数据时,确实是以ORDER BY排序键为基准的,而不是PRIMARY KEY。
终端用户只需要查询数据的汇总结果,不关心明细数据,则使用SummingMergeTree 引擎
SummingMergeTree能够在合并分区的时候按照预先定义的条件聚合汇总数据,将同一分组下的多行数据汇总合并成一行,这样既减少了数据行,又降低了后续汇总查询的开销。
CREATE TABLE summing_table( id String,
city String,
v1 UInt32,
v2 Float64,
create_time DateTime
)ENGINE = SummingMergeTree()PARTITION BY toYYYYMM(create_time)ORDER BY (id, city)
PRIMARY KEY id
执行optimize强制进行触发和合并操作:
optimize TABLE summing_table FINAL
AggregatingMergeTree能够在合并分区的时候,按照预先定义的条件聚合数据。同时,根据预先定义的 聚合函数计算数据并通过二进制的格式存入表内。将同一分组下的多 行数据聚合成一行,既减少了数据行,又降低了后续聚合查询的开销。
AggregatingMergeTree更为常见的应用方式是结合物化视图使用, 将它作为物化视图的表引擎。而这里的物化视图是作为其他数据表上 层的一种查询视图。
ENGINE = AggregatingMergeTree()
例子:
CREATE TABLE agg_table(id String,
city String,
code AggregateFunction(uniq,String),value AggregateFunction(sum,UInt32),
create_time DateTime
)ENGINE = AggregatingMergeTree()PARTITION BY toYYYYMM(create_time)ORDER BY (id,city)
PRIMARY KEY id
上例中列字段id和city是聚合条件,等同于下面的语义GROUP BY id,city,
CollapsingMergeTree就是一种通过以增代删的思路,支持行级数据 修改和删除的表引擎,通过定义一个sign标记位字段,记录数据行的 状态。如果sign标记为1,则表示这是一行有效的数据;如果sign标记 为-1,则表示这行数据需要被删除,相互抵消。
ENGINE = CollapsingMergeTree(sign)
例子:
CREATE TABLE collpase_table( id String,
code Int32,
create_time DateTime, sign Int8)ENGINE = CollapsingMergeTree(sign)PARTITION BY toYYYYMM(create_time)ORDER BY id
--修改前的源数据, 它需要被修改INSERT INTO TABLE collpase_table VALUES('A000',100,'2019-02-20 00:00:00',1)--镜像数据, ORDER BY字段与源数据相同(其他字段可以不同),sign取反为-1,它会和源数据折叠INSERT INTO TABLE collpase_table VALUES('A000',100,'2019-02-20 00:00:00',-1)--修改后的数据 ,sign为1INSERT INTO TABLE collpase_table VALUES('A000',120,'2019-02-20 00:00:00',1)
--修改前的源数据, 它需要被删除INSERT INTO TABLE collpase_table VALUES('A000',100,'2019-02-20 00:00:00',1)--镜像数据, ORDER BY字段与源数据相同, sign取反为-1, 它会和源数据折叠INSERT INTO TABLE collpase_table VALUES('A000',100,'2019-02-20 00:00:00',-1)
VersionedCollapsingMergeTree对数 据的写入顺序没有要求,在同一个分区内,任意顺序的数据都能够完 成折叠操作.
ENGINE = VersionedCollapsingMergeTree(sign,ver)
CREATE TABLE ver_collpase_table( id String,
code Int32,
create_time DateTime, sign Int8,
ver UInt8
)ENGINE = VersionedCollapsingMergeTree(sign,ver)PARTITION BY toYYYYMM(create_time)ORDER BY id
Docker安装HDFS
拉取hadoop镜像
docker pull singularities/hadoop
创建docker-compose.yml文件
version: "2"services: namenode: image: singularities/hadoop command: start-hadoop namenode hostname: namenode environment: HDFS_USER: hdfsuser ports:
- "8020:8020"
- "14000:14000"
- "50070:50070"
- "50075:50075"
- "10020:10020"
- "13562:13562"
- "19888:19888"
datanode: image: singularities/hadoop command: start-hadoop datanode namenode environment: HDFS_USER: hdfsuser links:
- namenode
执行
[root@localhost hadoop]# docker-compose up -dCreating network "hadoop_default" with the default driver
Creating hadoop_namenode_1 ... doneCreating hadoop_datanode_1 ... done
生成3个datanode
[root@localhost hadoop]# docker-compose scale datanode=3WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
Starting hadoop_datanode_1 ... doneCreating hadoop_datanode_2 ... doneCreating hadoop_datanode_3 ... done
列出容器查看
[root@localhost hadoop]# docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES19f9685e286f singularities/hadoop "start-hadoop data..." 48 seconds ago Up 46 seconds 8020/tcp, 9000/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50010/tcp, 50020/tcp, 50070/tcp, 50075/tcp, 50090/tcp, 50470/tcp, 50475/tcp hadoop_datanode_3e96b395f56e3 singularities/hadoop "start-hadoop data..." 48 seconds ago Up 46 seconds 8020/tcp, 9000/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50010/tcp, 50020/tcp, 50070/tcp, 50075/tcp, 50090/tcp, 50470/tcp, 50475/tcp hadoop_datanode_25a26b1069dbb singularities/hadoop "start-hadoop data..." 8 minutes ago Up 8 minutes 8020/tcp, 9000/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50010/tcp, 50020/tcp, 50070/tcp, 50075/tcp, 50090/tcp, 50470/tcp, 50475/tcp hadoop_datanode_1a8656de09ecc singularities/hadoop "start-hadoop name..." 8 minutes ago Up 8 minutes 0.0.0.0:8020->8020/tcp, 0.0.0.0:10020->10020/tcp, 0.0.0.0:13562->13562/tcp, 0.0.0.0:14000->14000/tcp, 9000/tcp, 50010/tcp, 0.0.0.0:19888->19888/tcp, 0.0.0.0:50070->50070/tcp, 50020/tcp, 50090/tcp, 50470/tcp, 0.0.0.0:50075->50075/tcp, 50475/tcp hadoop_namenode_1
打开浏览器,查看效果图
http://localhost:50070/dfshealth.html#tab-overview
进入某台docker容器
//拿到container iddocker ps 执行命令进入容器docker exec -it e96b395f56e3 bash
执行HDFS命令
# 查看所有命令hadoop fs# 创建目录hadoop fs -mkdir /hdfs #在根目录下创建hdfs文件夹# 查看目录hadoop fs -ls / #列出根目录下的文件列表# 创建多级目录hadoop fs -mkdir -p /hdfs/d1/d2# 上传文件到HDFSecho "hello world" >> local.txt #创建文件hadoop fs -put local.txt /hdfs/ #上传文件到hdfs# 下载hdfs文件hadoop fs -get /hdfs/local.txt# 删除hdfs中的文件hadoop fs -rm /hdfs/local.txt# 删除hdfs中的目录hadoop fs -rmdir /hdfs/d1/d2
docker 容器里安装一下clickhouse,进行通信
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get updatesudo apt-get install -y clickhouse-server clickhouse-clientsudo service clickhouse-server startclickhouse-client
hadoop fs -mkdir /clickhouse
hadoop fs -chown -R clickhouse:clickhouse /clickhouse
创建HDFS数据表
CREATE TABLE hdfs_table10(id UInt32,
code String,name String)ENGINE = HDFS('hdfs://namenode:8020/clickhouse/hdfs_table10','CSV')
写入数据
INSERT INTO hdfs_table10 SELECT number,concat('code',toString(number)),concat('n',toString(number)) FROM numbers(5)
-- 再次插入就会失败,报文件已存在。一般是csv文件已经在hdfs中存在了,我们直接建表直接去读
查询数据
select * from hdfs_table10
这种方式与使用Hive类似,我们直接可以将HDFS对应的文件映射成ClickHouse中的一张表,这样就可以使用SQL操作HDFS上的文件了。注意:ClickHouse并不能够删除HDFS上的数据,当我们在ClickHouse客户端中删除了对应的表,只是删除了表结构,HDFS上的文件并没有被删除,这一点跟Hive的外部表十分相似。
服务器安装mysql
apt-get install mysql-server//启动服务service mysql start//进入服务mysql -uroot -p
MySQL表引擎可以与MySQL数据库中的数据表建立映射,并通过 SQL向其发起远程查询,包括SELECT和INSERT,它的声明方式如 下:
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[,
replace_query, 'on_duplicate_clause'])
·replacequery默认为0,对应MySQL的REPLACE INTO语法。如果 将它设置为1,则会用REPLACE INTO代替INSERT INTO。·onduplicateclause默认为0,对应MySQL的ON DUPLICATE KEY 语法。如果需要使用该设置,则必须将replacequery设置成0。
CREATE TABLE dolphin_scheduler_table00(id UInt32,name String)ENGINE = MySQL('127.0.0.1:3306', 'test','dolphin_scheduler_table', 'root', '')
插入数据
INSERT INTO TABLE dolphin_scheduler_table00 VALUES (1,'流程1')
查询Mysql 表 dolphinschedulertable ,发现数据已经被远程写入了。
kafka表引擎的声明方式
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port,... ', //表示Broker服务的地址列表、多个地址之间使用逗号分隔,如broker_1,broker_2
kafka_topic_list = 'topic1,topic2,...', //表示订阅消息主题的名称列表
kafka_group_name = 'group_name', //表示消费组的名称,
kafka_format = 'data_format'[,] //表示用于解析消息的数据格式
[kafka_row_delimiter = 'delimiter_symbol'] //表示判定一行数据的结束符,默认值为'\0'
[kafka_schema = ''] //对应Kafka的schema参数
[kafka_num_consumers = N] //表示消费者的数量,默认值为1
[kafka_skip_broken_messages = N]
[kafka_commit_every_batch = N //表示执行Kafka commit的频率
注意:带方括号的为选填项
CREATE TABLE kafka_test( id UInt32,
code String, name String) ENGINE = Kafka()SETTINGS
kafka_broker_list = 'hdp1.nauu.com:6667',
kafka_topic_list = 'sales-queue',
kafka_group_name = 'chgroup',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 100
1、在生产环境中、或者在实际应用场景中、应当避免使用SELECT * 形式来查询数据,因为通配符*对于采用列式存储的ClickHouse而言没有任何好处。假如面对一张拥有数百个列字段的数据表,下面这两条 SELECT语句的性能可能会相差100倍之多,因为 * 会查询所有列字段。
--使用通配符*与按列按需查询相比,性能可能相差100倍SELECT * FROM datasets.hits_v1;SELECT WatchID FROM datasets.hits_v1;
◆各OLAP引擎对比
来源:
https://www.toutiao.com/article/7108275894850028032/?log_from=a264805d7c4ed_1655341720493
“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com
来都来了,走啥走,留个言呗~
IT大咖说 | 关于版权
由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!
感谢您对IT大咖说的热心支持!