前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0915-7.1.7-Kafka Connectors for SAP HANA测试

0915-7.1.7-Kafka Connectors for SAP HANA测试

作者头像
Fayson
发布2024-04-10 11:22:55
1200
发布2024-04-10 11:22:55
举报
文章被收录于专栏:Hadoop实操Hadoop实操

作者:孙小波

1 简介及基础环境

1.1 Kafka Connectors for SAP简介

Kafka原生没有提供SAP HANA的Connector,GitHub开源项目Kafka Connectors for SAP提供了kafka与SAP之间的Connector,可实现定时全量或增量的拉取SAP HANA数据发送到Kafka。详细信息,参考GitHub:https://github.com/SAP/kafka-connect-sap/tree/master

1.2 测试环境信息

Kafka版本:2.5.0.7.1.7.2013-1(cloudera) SAP HANA版本:HDB (ver. 2.00.048.04.1612945474) 测试以下两种情况:

  • • Kerberos环境下,全量和增量拉取HANA数据到kafka
  • • 非kerberos环境下,全量和增量拉取HANA数据到kafka

1.3 JAR包准备

需要准备两个jar包,SAP HANA的驱动和kafka-connect-sap项目打包。kafka-connect-sap选择对应的版本下载打包即可。或者直接在GitHub release中下载jar:https://github.com/SAP/kafka-connect-sap/releases。例如下载源码打包:

代码语言:javascript
复制
cd Downloads/kafka-connect-sap-master-2.8.1
sudo /Applications/IntelliJ\ IDEA.app/Contents/plugins/maven/lib/maven3/bin/mvn clean install -DskipTests -e
ll modules/scala_2.12/target

选择一台kafka客户端节点,将kafka-connector-hana_2.12-0.9.5-SNAPSHOT.jar放到kafka客户端节点的/var/lib/kafka/或者/opt/cloudera/parcels/CDH/lib/kafka/libs/目录下;SAP HANA驱动ngdbc-2.12.9.jar放在/opt/cloudera/parcels/CDH/lib/kafka/libs/下。

1.4 配置文件准备

关于Kafka Connect Standalone的配置文件释义,可参考:https://kafka.apache.org/25/documentation.html#connect_configuring 关于Kafka Connectors for SAP的配置文件释义,可参考: https://github.com/SAP/kafka-connect-sap#configuration

1.4.1 worker-nokrb.properties

worker-nokrb.properties: 非kerberos环境Kafka Connect Standalone模式的 启动配置文件,配置文件名称可以自定义命名。

代码语言:javascript
复制
bootstrap.servers=hqcncdptst03l:9192
config.storage.replication.factor=1
config.storage.topic=connect-configs
connect.prometheus.metrics.port=28186
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
metrics.jetty.server.port=28084
offset.flush.interval.ms=60000
offset.storage.replication.factor=1
offset.storage.topic=connect-offsets
plugin.path=/var/lib/kafka
rest.extension.classes=com.cloudera.dim.kafka.metrics.JmxJsonMetricsRestExtension
rest.port=28183
ssl.client.auth=none
status.storage.replication.factor=1
status.storage.topic=connect-status
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
offset.storage.file.filename=/opt/hana_test/hana_offset.txt

1.4.2 worker.properties

worker.properties:kerberos环境Kafka Connect Standalone模式的 启动配置文件,配置文件名称可以自定义命名。

代码语言:javascript
复制
bootstrap.servers=hqcncdptst01l:9092,hqcncdptst02l:9092,hqcncdptst03l:9092
config.storage.replication.factor=1
config.storage.topic=connect-configs
connect.prometheus.metrics.port=28096
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
metrics.jetty.server.port=28094
offset.flush.interval.ms=60000
offset.storage.replication.factor=1
offset.storage.topic=connect-offsets
plugin.path=/var/lib/kafka
rest.extension.classes=com.cloudera.dim.kafka.metrics.JmxJsonMetricsRestExtension
rest.port=28093
status.storage.replication.factor=1
status.storage.topic=connect-status
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
producer.acks = 1
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/opt/kafka-jaas/hive.keytab" \
        principal="hive@HADOOP.COM";
sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka
security.protocol = SASL_PLAINTEXT
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/opt/kafka-jaas/hive.keytab" \
        principal="hive@HADOOP.COM";
producer.sasl.mechanism = GSSAPI
producer.sasl.kerberos.service.name = kafka
producer.security.protocol = SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/opt/kafka-jaas/hive.keytab" \
        principal="hive@HADOOP.COM";
consumer.sasl.mechanism = GSSAPI
consumer.sasl.kerberos.service.name = kafka
consumer.security.protocol = SASL_PLAINTEXT
offset.storage.file.filename = /opt/hana_test/hana_offset.txt

1.4.3 hana_source_full.properties

hana_source_full.properties:全量拉取HANA数据的配置文件,配置文件名称可以自定义命名。

代码语言:javascript
复制
name=hana_test-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=hana_test
connection.url=jdbc:sap://10.xxx.xxx.xxx:33015?encrypt=true&validateCertificate=false
connection.user=username
connection.password=xxxx
hana_test.table.name="BI_CONNECT"."MAT_SD_TEST_KAFKA"
hana_test.poll.interval.ms=60000

1.4.4 hana_source_incr.properties

hana_source_incr.properties:增量拉取HANA数据的配置文件,配置文件名称可以自定义命名。

代码语言:javascript
复制
name=hana_test-incr-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
mode=incrementing
topics=hana_incr_test
connection.url=jdbc:sap://10.xxx.xxx.xxx:33015?encrypt=true&validateCertificate=false
connection.user=username
connection.password=xxxxx
hana_incr_test.table.name="BI_CONNECT"."MAT_SD_TEST_KAFKA"
hana_incr_test.poll.interval.ms=30000
hana_incr_test.incrementing.column.name=GROUP_ID

2 非Kerberos环境测试

2.1 全量测试

1.启动kafka connect-standalone任务:

代码语言:javascript
复制
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker-nokrb.properties hana_source_full.properties

每间隔60s全量拉取一次"BI_CONNECT"."MAT_SD_TEST_KAFKA"数据,发送到Kafka topic hana_test中。

2.查看SAP HANA数据

3.Kafka启动一个控制台消费者查看拉取数据情况

代码语言:javascript
复制
kafka-console-consumer --topic hana_test --from-beginning --bootstrap-server $(hostname):9192

2.2 增量测试

1.启动kafka connect-standalone任务:

代码语言:javascript
复制
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker-nokrb.properties hana_source_incr.properties

2.HANA插入1条数据

3.Kafka启动一个控制台消费者查看拉取数据情况

代码语言:javascript
复制
kafka-console-consumer --topic hana_incr_test --from-beginning --bootstrap-server $(hostname):9192

3 Kerberos测试

3.1 全量测试

1.启动kafka connect-standalone任务:

代码语言:javascript
复制
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker.properties hana_source_full.properties

每间隔60s全量拉取一次"BI_CONNECT"."MAT_SD_TEST_KAFKA"数据,发送到Kafka topic hana_test中。

2.查看SAP HANA数据

3.Kafka启动一个控制台消费者查看拉取数据情况

代码语言:javascript
复制
# 准备一个kafka jaas文件和client.properties文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-jaas/jaas-hive.conf"
kafka-console-consumer --topic hana_test --from-beginning --bootstrap-server $(hostname):9092 --consumer.config client.properties

3.2 增量测试

1.启动kafka connect-standalone任务:

代码语言:javascript
复制
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker.properties hana_source_incr.properties

2.HANA插入1条数据

代码语言:javascript
复制
INSERT INTO "BI_CONNECT"."MAT_SD_TEST_KAFKA" VALUES (1010,'xxx','message_10');

3.Kafka启动一个控制台消费者查看拉取数据情况

代码语言:javascript
复制
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-jaas/jaas-hive.conf"
kafka-console-consumer --topic hana_incr_test --from-beginning --bootstrap-server $(hostname):9092 --consumer.config client.properties

4 DELETE和UPDATE测试

在kerberos环境补充测试delete和update情况。

4.1 DELETE

全量拉取模式下:

  1. 1. 在HANA中删除一条数据
代码语言:javascript
复制
delete from "BI_CONNECT"."MAT_SD_TEST_KAFKA" where GROUP_ID = 1011
  1. 1. 启动Kafka Connect Standalone任务后,查看Kafka consumer
代码语言:javascript
复制
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1010,"U_NAME":"xxx","U_MESS":"message_10"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1012,"U_NAME":"xxx","U_MESS":"message_12_new"}}

数据结果中没有已删除的数据,说明删除的 数据没有发送到kafka。增量拉取模式下:

  1. 1. 在HANA中删除一条数据
代码语言:javascript
复制
delete from "BI_CONNECT"."MAT_SD_TEST_KAFKA" where GROUP_ID = 1010

2.Kafka Connect Standalone任务后,查看Kafka consumer 删除的删除没有发送到Kafka topic

3.查看Kafka connect standalone任务输出日志

代码语言:javascript
复制
23/08/24 10:02:56 INFO querier.IncrColTableQuerier: 1
23/08/24 10:02:56 INFO hana.HANASourceTask: Closing this query for IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}
23/08/24 10:02:56 INFO hana.HANASourceTask: No updates for IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}
23/08/24 10:02:56 INFO hana.HANASourceTask: Start polling records from HANA
23/08/24 10:02:56 INFO hana.HANASourceTask: Waiting 30000 ms to poll from IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}

日志显示没有数据更新。

4.2 UPDATE

全量拉取模式下:

1.在HANA中update一条数据

代码语言:javascript
复制
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set U_MESS = 'message_12_new' where GROUP_ID = 1012

2.Kafka Connect Standalone任务后,查看Kafka consumer

代码语言:javascript
复制
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1012,"U_NAME":"xxx","U_MESS":"message_12_new"}}

HANA数据更改后,全量拉取发送到kafka。增量拉取模式下:

1.在HANA中更新两条数据

代码语言:javascript
复制
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set U_MESS = 'message_9_new' where GROUP_ID = 1009
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set  GROUP_ID = 10088 where U_MESS = 'message_8'

分别更新GROUP_ID和U_MESS字段值。

2.Kafka Connect Standalone任务后,查看Kafka consumer

代码语言:javascript
复制
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":10088,"U_NAME":"xxx","U_MESS":"message_8"}}

结果显示,GROUP_ID更新的数据会拉取发送到Kafka,因为我们在hana_source_incr.properties配置文件中指定了参数hana_incr_test.incrementing.column.name=GROUP_ID,以GROUP_ID的变化来判断数据增量。所以,只有配置文件中指定了incrementing.column.name的column发生变化,才算是增量数据,才能发送到Kafka。

4.3 测试总结

1.在全量拉取模式下,将通过指定的全量拉取间隔时间定期拉取全量数据发送到Kafka;数据始终以HANA查询出来的数据为准,未发生变化的数据和发生变化的数据,都会全量发送到Kafka topic。

2.在增量拉取模式下,需要指定HANA Table的一个column为增量列,无论该column是否为primary key以下结论都符合:

  • • 当更新的数据是配置文件指定增加的column时,更新后的数据发送到kafka topic。
  • • 当更新的数据是非配置文件指定增加的column时,不会发送到Kafka topic。
  • • delete数据时,delete的数据是检测不到更新的,不会发送到kafka topic。

5 其他问题处理及注意事项

1.修改Kafka connect standalone启动脚本 启动脚本中指定了log4j配置文件,但实际不存在,会报错:

代码语言:javascript
复制
java.io.FileNotFoundException: /opt/cloudera/parcels/CDH/lib/kafka/bin/../config/connect-log4j.properties (No such file or directory

可以注释掉以下内容解决:

代码语言:javascript
复制
vim /opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi

2.kafka connect standalone启动是有部分报错,例如:org.reflections.ReflectionsException: could not get type for name org.springframework.beans.factory.FactoryBean,可以忽略不管。

3.在Kafka connect standalone配置文件中,需要指定offset存文件地址。可以先创建一个空文件。

代码语言:javascript
复制
offset.storage.file.filename = /opt/hana_test/hana_offset.txt

4.kerberos环境下,Kafka connect standalone配置中,consumer和producer的认证类型和jaas配置需要分别以conmuser.xxx和producer.xxx单独指定。

代码语言:javascript
复制
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/opt/kafka-jaas/hive.keytab" \
        principal="hive@HADOOP.COM";
producer.sasl.mechanism = GSSAPI
producer.sasl.kerberos.service.name = kafka
producer.security.protocol = SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/opt/kafka-jaas/hive.keytab" \
        principal="hive@HADOOP.COM";
consumer.sasl.mechanism = GSSAPI
consumer.sasl.kerberos.service.name = kafka
consumer.security.protocol = SASL_PLAINTEXT
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 简介及基础环境
    • 1.1 Kafka Connectors for SAP简介
      • 1.2 测试环境信息
        • 1.3 JAR包准备
          • 1.4 配置文件准备
            • 1.4.1 worker-nokrb.properties
            • 1.4.2 worker.properties
            • 1.4.3 hana_source_full.properties
            • 1.4.4 hana_source_incr.properties
        • 2 非Kerberos环境测试
          • 2.1 全量测试
            • 2.2 增量测试
            • 3 Kerberos测试
              • 3.1 全量测试
                • 3.2 增量测试
                • 4 DELETE和UPDATE测试
                  • 4.1 DELETE
                    • 4.2 UPDATE
                      • 4.3 测试总结
                      • 5 其他问题处理及注意事项
                      相关产品与服务
                      腾讯云服务器利旧
                      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档