
背景
kafka 早期作为一个日志消息系统,很受运维欢迎的,配合ELK玩起来很happy,在kafka慢慢的转向流式平台的过程中,开发也慢慢介入了,一些业务系统也开始和kafka对接起来了,也还是很受大家欢迎的,由于业务需要,一部分小白也就免不了接触kafka了,这些小白总是会安奈不住好奇心,要精确的查看kafka中的某一条数据,作为服务提供方,我也很方啊,该怎么怼?业务方不敢得罪啊,只能写consumer去消费,然后人肉查询。
流式处理数据库是一种专门设计用于处理大量实时流数据的数据库。与在处理之前批量存储数据的传统数据库不同,流数据库在生成数据后立即对其进行处理,从而实现实时洞察和分析。与不保留数据的传统流处理引擎不同,流数据库可以存储数据并响应用户数据访问请求。流数据库是实时分析、欺诈检测、网络监控和物联网 (IoT) 等延迟关键型应用程序的理想选择,并且可以简化技术堆栈。
KSQL 概述

KSQL是Apache Kafka的流式SQL引擎,让你可以SQL语方式句执行流处理任务。KSQL降低了数据流处理这个领域的准入门槛,为使用Kafka处理数据提供了一种简单的、完全交互的SQL界面。你不再需要用Java或Python之类的编程语言编写代码了!KSQL具有这些特点:开源(采用Apache 2.0许可证)、分布式、可扩展、可靠、实时。它支持众多功能强大的数据流处理操作,包括聚合、连接、加窗(windowing)和sessionization(捕获单一访问者的网站会话时间范围内所有的点击流事件)等等。

KSQL 与关系型数据库中的 SQL 还是有很大不同的。传统的 SQL 都是即时的一次性操作,不管是查询还是更新都是在当前的数据集上进行。而 KSQL 则不同,KSQL 的查询和更新是持续进行的,而且数据集可以源源不断地增加。KSQL 所做的其实是转换操作,也就是流式处理。
Apache Kafka是为数据管道的流行选择。KSQL使得在管道中转换数据变得简单,准备好消息以便在另一个系统中干净地着陆。
通过快速构建实时仪表板,生成指标以及创建自定义警报和消息,跟踪,了解和管理基础架构,应用程序和数据源。
在Kafka中导航并浏览您的数据。
通过毫秒级延迟识别模式并发现实时数据中的异常,使您能够正确地表现出异常事件并分别处理欺诈活动。
为用户创建数据驱动的实时体验和洞察力。
理解并提供传感器数据的方式和位置。
KSQL 的适用场景
一方面,可以通过 KSQL 自定义业务层面的度量指标,这些指标可以实时获得。底层的度量指标无法告诉我们应用程序的实际行为,所以基于应用程序生成的原始事件来自定义度量指标可以更好地了解应用程序的运行状况。另一方面,可以通过 KSQL 为应用程序定义某种标准,用于检查应用程序在生产环境中的行为是否达到预期。
CREATE TABLE error_counts AS
SELECT error_code,count(*)FROM monitoring_stream
WINDOW TUMBLING(SIZE 1 MINUTE)
WHERE type ='ERROR'
CREATE TABLE possible_fraud AS
SELECT card_number,count(*)
FROM authorization_attempts
WINDOW TUMBLING(SIZE 5 SECONDS)
GROUP by card_number
HAVING count(*)> 3;
KSQL 把事件流转换成包含数值的时间序列数据,然后通过可视化工具把这些数据展示在 UI 上,这样就可以检测到很多威胁安全的行为,比如欺诈、入侵,等等。KSQL 为此提供了一种实时、简单而完备的方案。
CREATE STREAM vip_users AS
SELECT userid,page,action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level ='Platinum';
大部分的数据处理都会经历 ETL(Extract—Transform—Load)这样的过程,而这样的系统通常都是通过定时的批次作业来完成数据处理的,但批次作业所带来的延时在很多时候是无法被接受的。而通过使用 KSQL 和 Kafka 连接器,可以将批次数据集成转变成在线数据集成。比如,通过流与表的连接,可以用存储在数据表里的元数据来填充事件流里的数据,或者在将数据传输到其他系统之前过滤掉数据里的敏感信息。
对于复杂的应用来说,使用 Kafka 的原生 Streams API 或许会更合适。不过,对于简单的应用来说,或者对于不喜欢 Java 编程的人来说,KSQL 会是更好的选择。
KSQL 架构

KSQL 是一个独立运行的服务器,多个 KSQL 服务器可以组成集群,可以动态地添加服务器实例。集群具有容错机制,如果一个服务器失效,其他服务器就会接管它的工作。KSQL 命令行客户端通过 REST API 向集群发起查询操作,可以查看流和表的信息、查询数据以及查看查询状态。因为是基于 Streams API 构建的,所以 KSQL 也沿袭了 Streams API 的弹性、状态管理和容错能力,同时也具备了仅一次(exactly once)语义。KSQL 服务器内嵌了这些特性,并增加了一个分布式 SQL 引擎、用于提升查询性能的自动字节码生成机制,以及用于执行查询和管理的 REST API。
Kafka+KSQL 要颠覆传统数据库

传统关系型数据库以表为核心,日志只不过是实现手段。而在以事件为中心的世界里,情况却恰好相反。日志成为了核心,而表几乎是以日志为基础,新的事件不断被添加到日志里,表的状态也因此发生变化。将 Kafka 作为中心日志,配置 KSQL 这个引擎,我们就可以创建出我们想要的物化视图,而且视图也会持续不断地得到更新。

KSQL 是基于 Kafka 的 Streams API 进行构建的,所以它的两个核心概念是流(Stream)和表(Table)。
流是没有边界的结构化数据,数据可以被源源不断地添加到流当中,但流中已有的数据是不会发生变化的,即不会被修改也不会被删除。
CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
WITH (kafka_topic='pageviews', value_format=’JSON’);
表即是流的视图,或者说它代表了可变数据的集合。它与传统的数据库表类似,只不过具备了一些流式语义,比如时间窗口,而且表中的数据是可变的。
CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR)
WITH (kafka_topic='users', value_format='DELIMITED');
Quick start
官方地址:https://docs.confluent.io/platform/current/platform-quickstart.html

安装配置并启动服务

#1, 解压并配置环境变量
[root@c7-docker confluent-6.1.1]#
ln -s /opt/confluent-6.1.1/ /opt/confluent
export CONFLUENT_HOME=/opt/confluent
echo 'export CONFLUENT_HOME=/opt/confluent' >> /etc/profile
export PATH=$PATH:$CONFLUENT_HOME/bin
echo 'export PATH=$PATH:$CONFLUENT_HOME/bin' >> /etc/profile
#2, 安装kafka连接器 kafka-connect-datagen
#connector doc: https://docs.confluent.io/home/connect/overview.html
#插件安装目录: /opt/confluent-6.1.1/share/confluent-hub-components
[root@c7-docker confluent-6.1.1]# grep 'plugin.path' /opt/confluent-6.1.1/etc/ -r
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-standalone.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/kafka/connect-distributed.properties:plugin.path=/usr/share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/ksqldb/connect.properties:# plugin.path=
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-standalone.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
/opt/confluent-6.1.1/etc/schema-registry/connect-avro-distributed.properties:plugin.path=share/java,/opt/confluent-6.1.1/share/confluent-hub-components
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-distributed.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
/opt/confluent-6.1.1/etc/kafka-connect-replicator/replicator-connect-standalone.properties:#plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
#本地编译安装:git clone https://github.com/confluentinc/kafka-connect-datagen.git
# git checkout v0.4.0
# mvn clean package
# confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.4.0.zip
### Usage: confluent-hub install : install a component from either Confluent Hub or from a local file
[root@c7-docker confluent-6.1.1]# confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://www.apache.org/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.5.0, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-6.1.1//share/confluent-hub-components
Adding installation directory to plugin path in the following files:
/opt/confluent-6.1.1//etc/kafka/connect-distributed.properties
/opt/confluent-6.1.1//etc/kafka/connect-standalone.properties
/opt/confluent-6.1.1//etc/schema-registry/connect-avro-distributed.properties
/opt/confluent-6.1.1//etc/schema-registry/connect-avro-standalone.properties
Completed
#3, 修改配置文件 ( 默认ksqlDB的连接地址为 localhost:8088, 防止远程连接 http://192.168.56.7:9021/ 查询的sql会报错)
[root@c7-docker confluent-6.1.1]# cd etc/
[root@c7-docker etc]# ls
cli confluent-control-center-fe confluent-metadata-service kafka ksqldb
confluent-common confluent-hub-client confluent-rebalancer kafka-connect-replicator rest-utils
confluent-control-center confluent-kafka-mqtt confluent-security kafka-rest schema-registry
[root@c7-docker etc]# grep ':8088' * -r
confluent-control-center/control-center.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-minimal.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-minimal.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-dev.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
confluent-control-center/control-center-dev.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
confluent-control-center/control-center-production.properties:#confluent.controlcenter.ksql.ksqlDB.url=http://ksql:8088
confluent-control-center/control-center-production.properties:confluent.controlcenter.ksql.ksqlDB.url=http://192.168.56.117:8088
ksqldb/ksql-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-server.properties:# listeners=https://0.0.0.0:8088
ksqldb/ksql-production-server.properties:#listeners=http://0.0.0.0:8088
ksqldb/ksql-production-server.properties:listeners=http://192.168.56.117:8088
ksqldb/ksql-production-server.properties:# listeners=http://[::]:8088
ksqldb/ksql-production-server.properties:# listeners=https://0.0.0.0:8088
#4,启动服务并查看日志
[root@c7-docker etc]# confluent local services start
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.007829
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
[root@c7-docker etc]# ls /tmp/confluent.007829
connect control-center kafka kafka-rest ksql-server schema-registry zookeeper
#数据文件,日志文件:
[root@c7-docker lib]# ls /tmp/confluent.007829/
connect control-center kafka kafka-rest ksql-server schema-registry zookeeper
[root@c7-docker lib]# ls /tmp/confluent.007829/connect/
connect.properties connect.stdout data logs

页面化创建 datagen source connector 相关参数:
#a, 给 pageviews topic生成测试数据 in AVRO format :
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"quickstart": "pageviews"
}
#b, 给 users topic生成测试数据 in AVRO format :
{
"name": "datagen-users",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "users",
"max.interval": "1000",
"quickstart": "users"
}
查看数据/创建table或stream

#创建 stream
create stream users( rowkey int key, username varchar) with( kafka_topic='users',value_format='JSON') ;
#创建 table
create table tab_gender_count with (KEY_FORMAT='JSON') AS select gender, COUNT(*) AS numusers from users GROUP BY gender EMIT CHANGES limit 3 ;
#读取 stream/table 数据
select * from users/tab_gender_count emit changes limit 1 ;
######################### 使用ksqlDB创建table/stream :
### ksqlDB 样例1 ( 自动创建 kafka topic)
# ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
# WITH (kafka_topic='locations', value_format='json', partitions=1);
#参数说明:
# kafka_topic : Name of the Kafka topic underlying the stream. In this case it will be automatically created because it doesn't exist yet, but streams may also be created over topics that already exist.
# value_format : Encoding of the messages stored in the Kafka topic. For JSON encoding, each row will be stored as a JSON object whose keys/values are column names/values.
# partitions : Number of partitions to create for the locations topic. Note that this parameter is not needed for topics that already exist.
### ksqlDB 样例2 (使用已存在的 kafka topic)
# ksql> create stream users( rowkey int key, username varchar) with( KAFKA_TOPIC='users',VALUE_FORMAT='JSON');
# Message
#----------------
# Stream created
#----------------
# ksql> insert into users(username) values('a');
# ksql> insert into users(username) values('b');
# ksql> select 'hello,'+ username as greeting from users emit changes;
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|GREETING |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------+
#|hello,b |
#|hello,a |
# create a stream for the pageviews topic :
ksql> CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
# create a table for the users topic:
ksql> CREATE TABLE users (id VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
ksql> set 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------
######################### 1,非持久化的查询:non-persistent query
ksql> SELECT * FROM pageviews EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|VIEWTIME |USERID |PAGEID |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|129321 |User_1 |Page_40 |
Limit Reached
Query terminated
ksql> SELECT * from users EMIT CHANGES LIMIT 1;
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|ID |REGISTERTIME |USERID |REGIONID |GENDER |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|User_5 |1489800608800 |User_5 |Region_5 |MALE |
Limit Reached
Query terminated
######################### 2, 持久化的查询:persistent query (as a stream) : 过滤 pageviews stream 中的女性用户,把查询结果保存到 pageviews_female topic 里面
ksql> SELECT users.id AS userid, pageid, regionid
FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
WHERE gender = 'FEMALE'
EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID |PAGEID |REGIONID |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_3 |Page_89 |Region_7 |
Limit Reached
Query terminated
ksql> CREATE STREAM pageviews_female AS
SELECT users.id AS userid, pageid, regionid
FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
WHERE gender = 'FEMALE'
EMIT CHANGES;
ksql> SELECT * from pageviews_female EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID |PAGEID |REGIONID |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8 |Page_97 |Region_4 |
Limit Reached
Query terminated
######################### 3,持久化的查询:persistent query : 过滤 regionid 以8或9结尾的,把查询结果保存到 pageviews_enriched_r8_r9 topic 里面
ksql> SELECT * FROM pageviews_female
WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID |PAGEID |REGIONID |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_9 |Page_95 |Region_8 |
Limit Reached
Query terminated
ksql> CREATE STREAM pageviews_female_like_89
WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', VALUE_FORMAT='AVRO')
AS SELECT * FROM pageviews_female
WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
EMIT CHANGES;
ksql> SELECT * from pageviews_female_like_89 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|USERID |PAGEID |REGIONID |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|User_8 |Page_67 |Region_8 |
Limit Reached
Query terminated
######################### 4, 持久化的查询:persistent query : 统计 pageviews (stream) 里面 每个 REGIONID 和 GENDER ( 30s 为一个窗口 ) , 并且 count >1 , 把结果保存为 table ( topic 为 pageviews_regions )
ksql> SELECT gender, regionid, COUNT(*) AS numusers
FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
WINDOW TUMBLING (SIZE 30 SECOND)
GROUP BY gender, regionid
HAVING COUNT(*) > 1 EMIT CHANGES LIMIT 1;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|GENDER |REGIONID |NUMUSERS |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|FEMALE |Region_6 |2 |
Limit Reached
Query terminated
ksql> CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')
AS SELECT gender, regionid, COUNT(*) AS numusers
FROM pageviews LEFT JOIN users ON pageviews.userid = users.id
WINDOW TUMBLING (SIZE 30 SECOND)
GROUP BY gender, regionid
HAVING COUNT(*) > 1
EMIT CHANGES;
ksql> SELECT * from pageviews_regions EMIT CHANGES LIMIT 1;
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|KSQL_COL_0 |WINDOWSTART |WINDOWEND |NUMUSERS |
+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|OTHER|+|Region_3 |1623913830000 |1623913860000 |3 |
Limit Reached
Query terminated
参考文章:https://blog.csdn.net/Allenzyg/article/details /107693321 https://blog.csdn.net/eyeofeagle/article/ details/117999231