Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。
Pulsar 集群主要由以下三部分组成:
身为⼀个 Pub/Sub 系统,⾸先的存在要素必然是 producer(⽣产者)。producer 发送数据给 Pulsar,将消息以 append 的形式追加到 topic 中。发送的数据是 key/value 形式的,并且数据会上 schema 的信息。Pulsar 会确保⼀个 producer 往 topic 发送的消息满⾜⼀定的 schema 格式。
既然有 producer 负责生产消息,那么相应地就有 consumer 负责消费消息。在 Pulsar 中 consumer 可以使用不同的订阅模式来接受消息。
Pulsar ⾥将 consumer 接收消息的过程称之为:subscription(订阅),类似于 Kafka 的 consumer group(消费组)。⼀个订阅⾥的所有 consumer,会作为⼀个整体去消费这个 topic ⾥的所有消息。Pulsar 有四种订阅模式:独占(exclusive)、故障转移(failover)、共享(shared)、共享键(key_shared)。
在 exclusive 模式下,一个 subscription 只允许被一个 consumer 用于订阅 topic ,如果多个 consumer 使用相同的 subscription 去订阅同一个 topic,则会发生错误。exclusive 是默认的订阅模式。如下图所示,Consumer A-0 和 Consumer A-1 都使用了相同的 subscription(相同的消费组),只有 Consumer A-0 被允许消费消息。
在 failover 模式下,多个 consumer 允许使用同一个 subscription 去订阅 topic。但是对于给定的 topic,broker 将选择⼀个 consumer 作为该 topic 的主 consumer ,其他 consumer 将被指定为故障转移 consumer 。当主 consumer 失去连接时,topic 将被重新分配给其中⼀个故障转移 consumer ,⽽新分配的 consumer 将成为新的主 consumer 。发⽣这种情况时,所有未确认的消息都将传递给新的主 consumer ,这个过程类似于 Kafka 中的 consumer 组重平衡(rebalance)。
如下图所示,Consumer B-0 是 topic 的主 consumer ,当 Consumer B-0 失去连接时,Consumer B-1 才能成为新的主 consumer 去消费 topic。
在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息以轮询的方式分发给 consumer ,并且每条消费仅发送给一个 consumer 。当有 consumer 失去连接时,所有发送给该 consumer 但未被确认的消息将被重新安排,以便发送给该 subscription 上剩余的 consumer 。
如下图所示,Consumer C-1,Consumer C-2,Consumer C-3 以轮询的方式接受消息。
shared 模式有以下限制:
key_shared 是 Pulsar 2.4.0 以后⼀个新订阅模式。在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息按照 key 分发给 consumer ,含有相同 key 的消息只被发送给同一个 consumer 。
如下图所示,不同的 consumer 只接受到对应 key 的消息。
key_shared 模式有以下限制:
cursor 是用来存储一个 subscription 中消费的状态信息(类似 Kafka 中的 offset,偏移量)。Pulsar 将 subscription 的 cursor 存储至 BookKeeper 的 ledger 中。
⽽最底层的 message 通常包含 Message ID,由以下几个部分组成:
Pulsar 中的 broker 是无状态的,不存储数据,真正的数据存储在 Bookeeper 上。每个 topic 的 partition 都会分配到某一个 borker 上,producer 和 consumer 则会连接到这个 broker,从而向该 topic 的 partition 发送和消费消息。broker 主要负责消息的复制与分发,数据的计算。
Pulsar 从一开始就支持多租户,topic 的名称是层级化的,最上层是租户(tenant),然后是命名空间(namespace),最后才是 topic。
{persistent|non-persistent}://tenant/namespace/topic
persistent|non-persistent
标识了 topic 的类型,默认情况下 topic 是持久化存储到磁盘上的。在 Pulsar 中支持了两种 ack 的机制,分别是单条 ack 和批量 ack。单条 ack(AckIndividual)是指 consumer 可以根据消息的 messageID 来针对某一个特定的消息进行 ack 操作;批量 ack(AckCumulative)是指一次 ack 多条消息。
默认情况下,Pulsar Broker 会对消息做如下处理:
但是,很多线上的生产环境下,这种默认行为并不能满足我们的生产需求,所以,Pulsar 提供了如下配置策略来覆盖这些行为:
上述两种策略的设置都是在 NameSpace 的级别进行设置。
backlog 是未被确认的消息的集合,它有一个大前提是,这些消息所在的 topic 是被 broker 所持久化的,在默认情况下,用户创建的 topic 都会被持久化。换句话说,broker 会将所有未确认或者未处理的消息都存放到 backlog 中。
需要注意的是,对 backlog 进行配置时,我们需要明确以下两点:
当超过设定的 backlog 的阈值,Pulsar 提供了以下三种策略供用户选择:
Retention 策略的设置提供了两种方式:
TTL 参数就像附在每条消息上的秒表,用于定义允许消息停留在未确认状态的时间。当 TTL 过期时,Pulsar 会自动将消息更改为已确认状态(并使其准备删除)。TTL 只去处理一件事情,就是将未被确认的消息变为被确认的状态,TTL 本身不会去涉及相应的删除操作。
producer 向 topic 的 partition 对应的 broker 发送消息。broker 以并行的方式将消息写到多个 bookie 中,当指定数量的 bookie 写入成功时,broker 会向 producer 响应消息写入成功。
consumer 向订阅 topic 的 partition 对应的 broker 请求消息,如果消息在 broker 的缓存中存在,则 broker 直接将消息返回给 consumer 。如果缓存中不存在,broker 去 bookie 中读取消息,然后返回给 consumer 。consumer 在完成消费后,向 broker 响应 ack 表示完成消费。consumer ack 的元数据也是会持久化在 bookie 中的。
根据个人对 Pulsar 和 Kafka 的理解,整理如下 Pulsar 和 Kafka 的名词对应表:
Pulsar | Kafka |
---|---|
Topic | Topic |
Partition | Partition |
Segment(Ledger) | Segment |
Bookie | Broker |
Broker | Client SDK |
Ensemble Size | metadata.broker.list |
Write Quorum Size (Qw) | Replica Number |
Ack Quorum Size (Qa) | request.required.acks |
部署 Pulsar 集群包括以下步骤(按顺序):
主机名 | IP地址 | 角色 | 端口号 |
---|---|---|---|
zookeeper1 | 192.168.1.191 | zookeeper | 2181 |
zookeeper2 | 192.168.1.192 | zookeeper | 2181 |
zookeeper3 | 192.168.1.193 | zookeeper | 2181 |
bookeeper1 | 192.168.1.194 | bookeeper | 3181 |
bookeeper2 | 192.168.1.195 | bookeeper | 3181 |
bookeeper3 | 192.168.1.196 | bookeeper | 3181 |
pulsar1 | 192.168.1.147 | broker | 8080(http协议),6650(pulsar协议) |
pulsar2 | 192.168.1.148 | broker | 8080(http协议),6650(pulsar协议) |
pulsar3 | 192.168.1.149 | broker | 8080(http协议),6650(pulsar协议) |
pulsar1 | 192.168.1.149 | pulsar-manager | 7750 |
下载 pulsar 发行版的二进制的包,里面包含了 zookeeper,bookeeper,pulsar 所需要的文件:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/apache-pulsar-2.7.1-bin.tar.gz
包下载完成后,解压并进入到解压后的目录:
tar xvzf apache-pulsar-2.7.1-bin.tar.gz
cd apache-pulsar-2.7.1
解压后的文件目录包含以下子目录:
目录 | 内容 |
---|---|
bin | Pulsar 命令行工具,比如 pulsar 和 pulsar-admin |
conf | 配置文件,包含ZooKeeper,Bookeeper,Pulsar 等等 |
data | Zookeeper 和 Bookeeper 保存数据的目录 |
lib | Pulsar 使用的 JAR 文件 |
logs | 日志目录 |
修改所有 Zookeeper 节点的 conf/zookeeper.conf 配置文件:
# 设置Zookeeper数据存放目录。
dataDir=data/zookeeper
# 在配置文件中为每个节点添加一个 server.N行,其中N是ZooKeeper节点的编号。
server.1=192.168.1.191:2888:3888
server.2=192.168.1.192:2888:3888
server.3=192.168.1.193:2888:3888
在每个 Zookeeper 节点的 myid 文件中配置该节点在集群中的唯一ID。myid 文件应放在 dataDir 指定的目录下:
# 创建目录
mkdir -p data/zookeeper
# 每个Zookeeper节点的ID号不能重复,并且和server.N的编号对应,依次为1,2,3
echo 1 > data/zookeeper/myid
在每台 Zookeeper 节点启动 Zookeeper 服务:
bin/pulsar-daemon start zookeeper
Zookeeper 集群启动成功后,需要将一些 Pulsar 集群的元信息写入 ZooKeeper 集群的每个节点,由于数据在 ZooKeeper 集群内部会互相同步,因此只需要将元信息写入 ZooKeeper 的一个节点即可:
bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-1 \
--zookeeper 192.168.1.191:2181 \
--configuration-store 192.168.1.191:2181 \
--web-service-url http://192.168.1.147:8080,192.168.1.148:8080,192.168.1.149:8080 \
--broker-service-url pulsar://192.168.1.147:6650,192.168.1.148:6650,192.168.1.149:6650
参数说明如下:
参数 | 说明 |
---|---|
—cluster | pulsar 集群名字 |
--zookeeper | zookeeper 地址,只需要包含 zookeeer 集群中的任意一台机器即可 |
--configuration-store | 配置存储地址,只需要包含 zookeeer 集群中的任意一台机器即可 |
--web-service-url | pulsar 集群 web 服务的 URL 以及端口,默认的端口是8080 |
--broker-service-url | broker 服务的URL,用于与 pulsar 集群中的 brokers 进行交互,默认端口是 6650 |
Pulsar 集群中所有持久数据的存储都由 Bookeeper 负责。
修改所有 Bookeeper 节点的 conf/bookeeper.conf 配置文件,设置 Bookeeper 集群连接的 Zookeeper 信息:
zkServers=192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
在每台 Bookeeper 节点启动 Bookeeper 服务:
bin/pulsar-daemon start bookie
在任意一台 Bookeeper 节点上使用 Bookeeper shell 的 simpletest 命令,去校验集群内所有的 bookie 是否都已经启动,3 为 Bookeeper 节点数量。
bin/bookkeeper shell simpletest --ensemble 3 --writeQuorum 3 --ackQuorum 3 --numEntries 3
参数含义如下:
-a,--ackQuorum <arg> Ack quorum size (default 2) 当指定数量的 bookie ack 响应时,认为消息写入成功
-e,--ensemble <arg> Ensemble size (default 3) 写入数据的 bookie 节点数量
-n,--numEntries <arg> Entries to write (default 1000) 一批消息的消息数量
-w,--writeQuorum <arg> Write quorum size (default 2) 每条消息副本数量
这个命令会在集群上创建和 bookie 同等数量的 ledger,并往里面写一些条目,然后读取它,最后删除这个 ledger。
修改所有 Pulsar 节点的 conf/broker.conf 配置文件:
# 配置pulsar broker连接的zookeeper集群地址
zookeeperServers=192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
configurationStoreServers=192.168.1.191:2181,192.168.1.192:2181,192.168.1.193:2181
# broker数据端口
brokerServicePort=6650
# broker web服务端口
webServicePort=8080
# pulsar 集群名字,和前面zookeeper初始化集群元数据时配置的一样
clusterName=pulsar-cluster-1
# 创建一个ledger时使用的bookie数量
managedLedgerDefaultEnsembleSize=2
# 每个消息的副本数量
managedLedgerDefaultWriteQuorum=2
# 完成写操作前等待副本ack的数量
managedLedgerDefaultAckQuorum=2
在每台 Pulsar 节点启动 broker:
bin/pulsar-daemon start broker
修改 conf/client.conf 文件。
# pulsar集群web服务url
webServiceUrl=http://192.168.1.147:8080,192.168.1.148:8080,192.168.1.149:8080
# pulsar服务端口
# URL for Pulsar Binary Protocol (for produce and consume operations)
brokerServiceUrl=pulsar://192.168.1.147:6650,192.168.1.148:6650,192.168.1.149:6650
consumer 使用如下命令订阅 pulsar-test 这个主题的消息:
bin/pulsar-client consume \
persistent://public/default/pulsar-test \
-n 100 \
-s "consumer-test" \
-t "Exclusive"
如果不指定 --url
参数并且没有在 conf/client.conf
文件中指定 pulsar 集群连接信息,则默认连接的是 pulsar://localhost:6650/
。可以指定 --url pulsar://192.168.1.147:6650
或者 --url http://192.168.1.147:8080
与 broker 进行交互。
新开一个终端, producer 使用如下命令向 pulsar-test 主题生产一条消息,消息内容为 "Hello Pulsar":
bin/pulsar-client produce \
persistent://public/default/pulsar-test \
-n 1 \
-m "Hello Pulsar"
在 consumer 终端可以看到成功消费到了消息:
23:20:47.418 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
----- got message -----
key:[null], properties:[], content:Hello Pulsar
Pulsar manager 是用于管理和监控 Pulsar 集群的 WebUI 工具。Pulsar manager 可以管理多个 Pulsar 集群。github 地址:https://github.com/apache/pulsar-manager
wget https://dist.apache.org/repos/dist/release/pulsar/pulsar-manager/pulsar-manager-0.2.0/apache-pulsar-manager-0.2.0-bin.tar.gz
tar -zxvf apache-pulsar-manager-0.2.0-bin.tar.gz
cd pulsar-manager
tar -xvf pulsar-manager.tar
cd pulsar-manager
cp -r ../dist ui
./bin/pulsar-manager
创建用户名为 admin,密码为 apachepulsar 的超级管理员账号:
CSRF_TOKEN=$(curl http://192.168.1.147:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://192.168.1.147:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "myuser", "email": "chengzw258@163.com"}'
访问 http://192.168.1.147:7750/ui/index.html 登录 Pulsar manager:
点击 New Environment 添加 Pulsar 集群:
添加完成后可以查看并设置 Pulsar 集群的相关信息,例如查看 topic 信息:
访问 http://192.168.1.147:7750/bkvm 查看 bookie 信息,用户名:admin,密码:admin。
查看 ledger 信息:
pulsar 提供了压力测试的命令行工具,使用以下命令生产消息:
bin/pulsar-perf produce -r 100 -n 2 -s 1024 test-perf
# 输出内容,从左到右依次是:
# 每秒生产的消息数量:87.2条
# 每秒流量大小:0.7Mb
# 每秒生产失败的消息数:0
# 平均延迟:5.478ms
# 延迟中位数:4.462ms
# 95%的延迟在 11.262ms以内
# 99%的延迟在 25.802ms以内
# 99.9%的延迟在 43.757ms以内
# 99.99%的延迟在 51.956ms以内
# 最大延迟:51.956ms
... Throughput produced: 87.2 msg/s --- 0.7 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.478 ms - med: 4.642 - 95pct: 11.263 - 99pct: 25.802 - 99.9pct: 43.757 - 99.99pct: 51.956 - Max: 51.956
使用以下命令消费消息:
bin/pulsar-perf consume test-perf
# 输出内容,从左到右依次是:
# 每秒消费的消息数量:100.007条
# 每秒流量大小:0.781Mb
# 平均延迟:9.273ms
# 延迟中位数:9ms
# 95%的延迟在 14ms以内
# 99%的延迟在 15ms以内
# 99.9%的延迟在 28ms以内
# 99.99%的延迟在 34ms以内
# 最大延迟:34ms
... Throughput received: 100.007 msg/s -- 0.781 Mbit/s --- Latency: mean: 9.273 ms - med: 9 - 95pct: 14 - 99pct: 15 - 99.9pct: 28 - 99.99pct: 34 - Max: 34
在 Pulsar manager 界面可以 test-perf 这个 topic 有两个生产者在生产消息,有一个消费者正在消费消息:
查看 topic 的 存储状况: