前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Kafka和ksqlDB构建和部署实时流处理ETL引擎

使用Kafka和ksqlDB构建和部署实时流处理ETL引擎

作者头像
IT大咖说
发布2020-12-29 15:39:11
2.5K0
发布2020-12-29 15:39:11
举报
文章被收录于专栏:IT大咖说IT大咖说

投入生产,启用强大的搜索功能-从设计决策到幕后的一切

> Image By Author

在Koverhoop,我们正在保险,医疗保健,房地产和离线分析领域建立一系列大型项目。对于我们的多租户团体保险经纪平台klient.ca,我们将建立强大的搜索功能。我们希望我们的搜索结果在键入时出现。以下是我们能够实现的目标,在本文中,我将讨论核心基础架构,我们如何完全自动化其部署以及如何也可以非常快速地对其进行设置。

> GIF By Author: Search Capability in Action

另外,将其视为一个分为两部分的系列,其中我将讨论以下内容:

第1部分:了解用于增强此搜索功能的堆栈,并使用Docker和docker-compose进行部署。(这个帖子)

第2部分:使用Kubernetes对这些服务进行可扩展的生产部署。(尚未发布)

问题定义与决策

为了构建快速,实时的搜索引擎,我们必须做出某些设计决策。我们使用Postgres作为主要数据库。因此,我们可以使用以下选项:

· 直接在Postgres数据库中查询我们在搜索栏中键入的每个字符。

· 使用像Elasticsearch这样的有效搜索数据库。

考虑到我们已经是一个多租户应用程序,要搜索的实体也可能需要大量的联接(如果我们使用Postgres)进行处理,并且我们计划的规模很大,因此我们决定不使用前者直接查询数据库的选项。

因此,我们必须决定一种可靠,有效的方式,将数据从Postgres实时迁移到Elasticsearch。再次做出以下决定:

· 使用Logstash定期查询Postgres数据库,并将数据发送到Elasticsearch。

· 在我们的应用程序中使用Elasticsearch客户端,然后对Postgres和Elasticsearch中的数据进行CRUD。

· 使用基于事件的流引擎,该引擎从Postgres的预写日志中检索事件,将事件流传输到流处理服务器,充实流并将其下沉到Elasticsearch。

选项1很快就删除了,因为它不是实时的,即使我们以较短的间隔查询,也会给Postgres服务器带来很大的负担。在其他两种选择之间进行选择可能是不同公司的不同决定。如果选择选项2,我们可以预见用例的一些问题;如果Elasticsearch确认更新较慢,可能会减慢我们的应用程序的速度,或者在出现不一致的情况下,我们如何重试插入一个事件或一组事件?

因此,我们决定继续构建基于事件的队列基础结构。另外,因为我们已经计划了一些适合基于事件的未来用例和服务,例如通知服务,数据仓库等。事不宜迟,让我们直接跳到解决方案和服务的概述。

服务基本概述

为了实现基于事件的流基础架构,我们决定使用Confluent Kafka Stack。

以下是我们提供的服务:

> Source: Confluent Inc.

Apache Kafka:Kafka是Confluent平台的核心。它是一个基于开源的分布式事件流平台。这将是我们数据库事件(插入,更新和删除)的主要存储区域。

Kafka Connect:我们使用Kafka-connect从Debezium的Postgres连接器将数据提取到Kafka中,该连接器从Postgres WAL文件中获取事件。在接收器端,我们使用ElasticSearch Connector将数据处理并将数据加载到Elasticsearch中。Connect可以作为独立应用程序运行,也可以作为生产环境的容错和可扩展服务运行。

ksqlDB:ksqlDB允许基于Kafka中的数据构建流处理应用程序。它在内部使用Kafka流,在事件发生时对其进行转换。我们用它来充实特定流的事件,并将其与Kafka中已经存在的其他表的预先存在的事件(可能与搜索功能相关)进行混合,例如,根表中的tenant_id。

> Image By Author: ksqlDB with Apache Kafka

使用ksqlDB,就像编写SQL查询以过滤,聚合,联接和丰富数据一样容易。例如,假设我们正在接收有关两个主题的事件流,其中包含与brand和brand_products有关的信息。考虑到这是一个多租户数据源,我们需要使用目前仅与品牌相关联的tenant_id来丰富brand_products。然后,我们可以使用这些丰富的记录,并将它们以非规范化的形式存储在Elasticsearch中(以使搜索有效)。

我们可以使用以下主题设置KStream:

代码语言:javascript
复制
CREATE STREAM “brands” 
WITH (
 kafka_topic = ‘store.public.brands’, 
 value_format = ‘avro’
);

要仅使用几列并按ID对流进行分区,我们可以创建一个称为riched_brands的新流:

代码语言:javascript
复制
CREATE STREAM “enriched_brands” 
WITH (
    kafka_topic = ‘enriched_brands’
) 
AS 
    SELECT 
        CAST(brand.id AS VARCHAR) as “id”, 
        brand.tenant_id as “tenant_id”, 
        brand.name as “name” 
    FROM 
        “brands” brand 
    PARTITION BY 
        CAST(brand.id AS VARCHAR) 
    EMIT CHANGES;”

然后可以通过KTable中的最新偏移量来实现事件集。我们使用它,以便我们可以将品牌活动的当前状态与其他流结合起来。

代码语言:javascript
复制
CREATE TABLE “brands_table” 
AS 
    SELECT 
        id as “id”, 
        latest_by_offset(tenant_id) as “tenant_id”
    FROM 
        “brands” group by id 
    EMIT CHANGES;

现在,我们添加了一个名为brand_products的新流,该流具有一个字段brand_id,但没有tenant_id。

代码语言:javascript
复制
CREATE STREAM “brand_products” 
WITH (
 kafka_topic = ‘store.public.brand_products’, 
 value_format = ’avro’ 
);

我们可以使用以下联接查询通过tenant_id丰富brand_products:

代码语言:javascript
复制
CREATE STREAM “enriched_brand_products” 
WITH (
    kafka_topic = ‘enriched_brand_products’ 
) AS 
    SELECT 
        “brand”.“id” as”brand_id”, 
        ”brand”.”tenant_id” as ”tenant_id”, 
        CAST(brand_product.id AS VARCHAR) as ”id”,
        brand_product.name AS ”name” 
    FROM 
        ”brand_products” AS brand_product 
    INNER JOIN ”brands_table” ”brand” 
    ON 
        brand_product.brand_id = ”brand”.”id” 
    PARTITION BY 
        CAST(brand_product.id AS VARCHAR) 
    EMIT CHANGES;

架构注册表:它是Kafka上的一层,用于存储您在Kafka中提取的事件的元数据。它基于AVRO模式,并提供用于存储和检索它们的REST接口。它有助于确保某些模式兼容性检查及其随时间的演变。

配置栈

我们使用Docker和docker-compose来配置和部署我们的服务。下面准备好构建以docker-compose文件编写的服务,该文件将运行Postgres,Elasticsearch和Kafka相关服务。我还将说明下面提到的每项服务。

Postgres和Elasticsearch

代码语言:javascript
复制
postgres:
    build: services/postgres
    container_name: oeso_postgres
    volumes:
      - database:/var/lib/postgresql/data
    env_file:
      - .env
    ports:
      - 5432:5432
    networks:
      - project_network
      
代码语言:javascript
复制
elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0
    container_name: elasticsearch
    volumes:
      - ./services/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
      - elasticsearch-database:/usr/share/elasticsearch/data
    env_file:
      - .env
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - project_network

为了从源数据库中流式传输事件,我们需要启用逻辑解码以允许从其日志中进行复制。与Postgres一样,这些日志称为预写日志(WAL),并且将它们写入文件中。我们需要一个逻辑解码插件,在我们的示例中是wal2json,以提取有关持久性数据库更改的易于阅读的信息,以便可以将其作为事件发送给Kafka。

有关设置所需扩展名的信息,请参考此Postgres Dockerfile。

对于Elasticsearch和Postgres,我们在环境文件中指定一些必要的变量,以使用用户名,密码等进行设置。

ZooKeeper

代码语言:javascript
复制
 zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - project_network

通常,Zookeeper充当Kafka等分布式平台的集中服务,该平台存储所有元数据,例如Kafka节点的状态,并跟踪主题或分区。

有计划在没有ZooKeeper的情况下运行Kafka,但是目前,这是管理集群的必要条件。

Kafka Broker

代码语言:javascript
复制
broker:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    networks:
      - project_network

为了简单起见,我们将设置一个单节点Kafka集群。在本系列的第2部分中将讨论有关多个代理集群的更多信息。

了解我们在此处为Kafka代理进行的一些配置尤其重要。

侦听器由于Kafka被设计为分布式平台,因此我们需要提供某些方式,以允许Kafka经纪人在内部彼此通信,并根据您的网络结构在外部与其他客户端通信。因此,侦听器是主机,端口和协议的组合。

→KAFKA_LISTENERS这是kafka绑定到的主机,端口和协议组合接口的列表。默认情况下,它设置为0.0.0.0。在所有接口上监听。

→KAFKA_ADVERTISED_LISTENERS的值再次是主机和端口的组合,客户端将使用这些端口连接到kafka代理。因此,如果客户端在docker内,则可以使用broker:9092连接到代理,如果docker外部有客户端,则将其返回localhost:9092进行连接。我们还需要提及映射到用于建立连接的适当协议的侦听器名称。

→KAFKA_LISTENER_SECURITY_PROTOCOL_MAP在此,我们将用户定义的侦听器名称映射到我们要用于通信的协议;它可以是PLAINTEXT(未加密)或SSL(已加密)。这些名称在KAFKA_LISTENERS和KAFKA_ADVERTISED_LISTENERS中进一步使用,以对主机/ ip使用适当的协议。

由于我们仅配置了一个单节点Kafka集群,因此返回的地址或向任何客户端宣传的地址都属于同一代理本身。

模式注册

代码语言:javascript
复制
schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
    networks:
      - project_network

对于单节点架构注册表,我们指定Kafka集群用于存储架构相关数据的Zookeeper连接字符串。

Kafka连接

代码语言:javascript
复制
connect:
    image: confluentinc/cp-kafka-connect:5.4.3
    hostname: connect
    container_name: connect
    volumes:
      - "./producers/debezium-debezium-connector-postgresql/:/usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/"
      - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/"
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    networks:
      - project_network

我们看到一些新参数,例如:

→CONNECT_BOOTSTRAP_SERVERS:一组主机和端口组合,用于建立与Kafka集群的初始连接。

→CONNECT_KEY_CONVERTER:用于将密钥从连接格式序列化为与Kafka兼容的格式。同样,对于CONNECT_VALUE_CONVERTER,我们使用AvroConverter进行序列化。

为我们的源连接器和接收器连接器映射卷并在CONNECT_PLUGIN_PATH中指定它们非常重要

ksqlDB数据库

代码语言:javascript
复制
ksqldb-server:
    image: confluentinc/ksqldb-server:0.11.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    volumes:
      - "./producers/debezium-debezium-connector-postgresql/:/usr/share/kafka/plugins/debezium-debezium-connector-postgresql/"
      - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/kafka/plugins/confluentinc-kafka-connect-elasticsearch/"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
    networks:
      - project_network

如果您不打算使用Kafka-Connect,并且不需要独立于ksql扩展Kafka-Connect,则可以为ksql设置嵌入式连接配置。这也从ksqldb服务器公开连接端点

代码语言:javascript
复制
ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.11.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    networks:
      - project_network

在测试或开发环境中时,使用ksqldb-cli服务尝试并测试流非常方便。即使在生产环境中,如果您想探索事件流或Ktables,也可以;或手动创建或过滤流。尽管建议您使用ksql或kafka客户端或其REST端点自动执行流,表或主题的创建,我们将在下面讨论。

> Image By Author: A more detailed look at our architecture, so far.

初始化数据流

代码语言:javascript
复制
streams-init:
    build: jobs/streams-init
    container_name: streams-init
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - ksqldb-server
      - ksqldb-cli
      - postgres
      - elasticsearch
      - connect
    env_file:
      - .env
    environment:
      ZOOKEEPER_HOSTS: "zookeeper:2181"
      KAFKA_TOPICS: "brands, brand_products"
    networks:
      - project_network

该服务的目的是初始化流并配置Kafka和我们正在使用的其他服务中的内容。在部署时,我们不想在服务器上手动创建主题,流,连接等。因此,我们利用为每个服务提供的REST服务,并编写一个Shell脚本来自动化该过程。

我们的安装脚本如下所示:

代码语言:javascript
复制
#!/bin/bash

# Setup ENV variables in connectors json files
sed -i "s/POSTGRES_USER/${POSTGRES_USER}/g" connectors/postgres.json
sed -i "s/POSTGRES_PASSWORD/${POSTGRES_PASSWORD}/g" connectors/postgres.json
sed -i "s/POSTGRES_DB/${POSTGRES_DB}/g" connectors/postgres.json
sed -i "s/ELASTIC_PASSWORD/${ELASTIC_PASSWORD}/g" connectors/elasticsearch.json

# Simply wait until original kafka container and zookeeper are started.
export WAIT_HOSTS=zookeeper:2181,broker:9092,schema-registry:8081,ksqldb-server:8088,elasticsearch:9200,connect:8083
export WAIT_HOSTS_TIMEOUT=300
/wait

# Parse string of kafka topics into an array
# https://stackoverflow.com/a/10586169/4587961
kafkatopicsArrayString="$KAFKA_TOPICS"
IFS=', ' read -r -a kafkaTopicsArray <<< "$kafkatopicsArrayString"

# A separate variable for zookeeper hosts.
zookeeperHostsValue=$ZOOKEEPER_HOSTS

# Terminate all queries
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
         -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
         -d '{"ksql": "SHOW QUERIES;"}' | \
  jq '.[].queries[].id' | \
  xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -d '{"ksql": "TERMINATE 'foo';"}'
           

# Drop All Tables
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d '{"ksql": "SHOW TABLES;"}' | \
      jq '.[].tables[].name' | \
      xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
               -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
               -d '{"ksql": "DROP TABLE \"foo\";"}'


# Drop All Streams
curl -s -X "POST" "http://ksqldb-server:8088/ksql" \
           -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
           -d '{"ksql": "SHOW STREAMS;"}' | \
    jq '.[].streams[].name' | \
    xargs -Ifoo curl -X "POST" "http://ksqldb-server:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d '{"ksql": "DROP STREAM \"foo\";"}'
             

# Create kafka topic for each topic item from split array of topics.
for newTopic in "${kafkaTopicsArray[@]}"; do
    # https://kafka.apache.org/quickstart
    curl -X DELETE http://elasticsearch:9200/enriched_$newTopic --user elastic:${ELASTIC_PASSWORD}
    curl -X DELETE http://schema-registry:8081/subjects/store.public.$newTopic-value
    kafka-topics --create --topic "store.public.$newTopic" --partitions 1 --replication-factor 1 --if-not-exists --zookeeper "$zookeeperHostsValue"
    curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @schemas/$newTopic.json http://schema-registry:8081/subjects/store.public.$newTopic-value/versions

done

curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"brands\\" WITH (kafka_topic = \'store.public.brands\', value_format = \'avro\');", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"enriched_brands\\" WITH ( kafka_topic = \'enriched_brands\' ) AS SELECT CAST(brand.id AS VARCHAR) as \\"id\\", brand.tenant_id as \\"tenant_id\\", brand.name as \\"name\\" from \\"brands\\" brand partition by CAST(brand.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'

curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"brand_products\\" WITH ( kafka_topic = \'store.public.brand_products\', value_format = \'avro\' );", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE TABLE \\"brands_table\\" AS SELECT id as \\"id\\", latest_by_offset(tenant_id) as \\"tenant_id\\" FROM \\"brands\\" group by id EMIT CHANGES;", "streamsProperties": {} }'
curl -X "POST" "http://ksqldb-server:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d $'{ "ksql": "CREATE STREAM \\"enriched_brand_products\\" WITH ( kafka_topic = \'enriched_brand_products\' ) AS SELECT \\"brand\\".\\"id\\" as \\"brand_id\\", \\"brand\\".\\"tenant_id\\" as \\"tenant_id\\", CAST(brand_product.id AS VARCHAR) as \\"id\\", brand_product.name AS \\"name\\" FROM \\"brand_products\\" AS brand_product INNER JOIN \\"brands_table\\" \\"brand\\" ON brand_product.brand_id = \\"brand\\".\\"id\\" partition by CAST(brand_product.id AS VARCHAR) EMIT CHANGES;", "streamsProperties": {} }'

curl -X DELETE http://connect:8083/connectors/enriched_writer
curl -X "POST" -H "Content-Type: application/json" --data @connectors/elasticsearch.json http://connect:8083/connectors

curl -X DELETE http://connect:8083/connectors/event_reader
curl -X "POST" -H "Content-Type: application/json" --data @connectors/postgres.json http://connect:8083/connectors

这是当前对我们有效的方法:

→在对它们运行任何作业之前,请确保所有服务均已准备就绪;→我们需要确保主题存在于Kafka上,或者我们创建新的主题;→即使有任何架构更新,我们的流也应该可以正常工作;→再次进行连接,以说明基础数据源或接收器的密码或版本更改。

共享此安装脚本的目的仅是演示一种自动化这些管道的方法。完全相同的设置可能对您不起作用,但是对于自动化工作流并避免在任何环境下的每个部署上避免进行任何手动工作的想法仍然相同。

要为您快速启动并运行此数据基础架构,请参阅以下Github存储库:https://github.com/behindthescenes-group/oesophagus

因此,克隆存储库并执行以下操作:

cp default.env .env

docker-compose up -d

..在您的终端上。

在商店Postgres数据库中创建brand和brand_products表:

代码语言:javascript
复制
CREATE TABLE brands (
    id serial PRIMARY KEY,
    name VARCHAR (50),
    tenant_id INTEGER
);
CREATE TABLE brand_products (
    id serial PRIMARY KEY,
    brand_id INTEGER,
    name VARCHAR(50)
);

在品牌表中插入一些记录:

代码语言:javascript
复制
INSERT INTO brands VALUES(1, 'Brand Name 1', 1);
INSERT INTO brands VALUES(2, 'Brand Name 2', 1);
INSERT INTO brands VALUES(3, 'Brand Name 3', 2);
INSERT INTO brands VALUES(4, 'Brand Name 4', 2);

以及brand_products表中的一些记录:

代码语言:javascript
复制
INSERT INTO brand_products VALUES(1, 1, 'Product Name 1');
INSERT INTO brand_products VALUES(2, 2, 'Product Name 2');
INSERT INTO brand_products VALUES(3, 3, 'Product Name 3');
INSERT INTO brand_products VALUES(4, 4, 'Product Name 4');
INSERT INTO brand_products VALUES(5, 1, 'Product Name 5');

看到brand_products在Elasticsearch中丰富了tenant_id:

curl localhost:9200/enriched_brand_products/_search --user elastic:your_password

我将继续为上述存储库做出贡献;使用Kubernetes为多节点Kafka基础架构添加部署配置;写更多的连接器;仅使用所需的服务来实现即插即用体系结构的框架。请随时为此做出贡献,或者让我知道您在当前设置中遇到的任何数据工程问题。

下一步

我希望本文能为您提供一个有关部署和运行完整的Kafka堆栈的合理思路,以构建一个实时流处理应用程序的基本而有效的用例。

根据产品或公司的性质,部署过程可能会有所不同,以满足您的要求。在本系列的下一部分中,我确实有计划解决此类系统的可扩展性方面的问题,这将涉及在完全相同的用例上在Kubernetes上部署此类基础架构。

(本文由闻数起舞翻译自Sahil Malhotra的文章《Building and Deploying a Real-Time Stream Processing ETL Engine with Kafka and ksqlDB》,转载请注明出处,原文链接:https://towardsdatascience.com/enabling-a-powerful-search-capability-building-and-deploying-a-real-time-stream-processing-etl-a27ecb0ab0ae)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT大咖说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题定义与决策
  • 服务基本概述
  • 配置栈
  • Postgres和Elasticsearch
  • ZooKeeper
  • Kafka Broker
  • 模式注册
  • Kafka连接
  • ksqlDB数据库
  • 初始化数据流
  • 下一步
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档