我已经将Kafka 纹子部署到我的Kubernetes集群中。
我的目标是能够通过SSL侦听器(端口9094)连接到Kafka主题,并能够生成/使用消息。
症状
[2022-05-05 10:20:11,252] ERROR Error when sending message to topic public.kafka.incoming.events with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Topic public.kafka.incoming.events not present in metadata after 60000 ms.该消息不能有效地由生产者发送或由消费者接收。
当选择负载均衡器或入口作为类型时(即:kafka.listeners.external.type)时,条子-簇-运算符荚的日志。
failed to reconcile ZookeeperScalingException: Failed to connect to Zookeeper my-cluster-zookeeper-0.my-cluster-zookeeper-nodes.my-kafka-project.svc:2181. Connection was not ready in 300000 ms.不确定这是否与我的Kubernetes集群配置问题有关。当我选择NodePort -它的工作,如预期和卡夫卡+动物园是正常启动。
为了使这个故事更简短,我有:
1.激活调试标志:
log4j.logger.org.apache.kafka=INFO -> log4j.logger.org.apache.kafka=DEBUG
我能从卡夫卡服务器上读取日志。
2.配置/生成签名证书、密钥库/信任库JK以及我使用的配置如下:
cat <<EOT > ssl.properties
security.protocol=SSL
ssl.protocol=TLSv1.2
bootstrap.servers=my-cluster-kafka-external-bootstrap:9094
ssl.truststore.location=/home/kafka/user-truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=/home/kafka/user-keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
ssl.endpoint.identification.algorithm=
EOT3.案例1和不起作用的案例2之间的区别,这也是为什么我要问这个问题。
案例1(这是我在这里帮助解决这个问题的原因)。
从卡夫卡服务器舱(在集群内)进行测试。
生产者
[kafka@my-cluster-kafka-0 ~]$ /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-external-bootstrap:9094 --topic public.kafka.incoming.events --producer.config consumer-ssl.properties
>ttesttt
>消费者
[kafka@my-cluster-kafka-0 ~]$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-0.my-cluster-kafka-brokers.my-kafka-project.svc.cluster.local:9094 --topic public.kafka.incoming.events --consumer.config consumer-ssl.properties
ttesttt确认
基于Kafka服务器日志的确认。
2022-05-05 13:40:07,988 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:45150-42 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:45150], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 45150 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 13:40:07,989 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]2022-05-05 13:43:56,053 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.19.76:40094-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.19.76:40094], selector=sun.nio.ch.EPollSelectorImpl@7df0ddcc, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.19.76' peerPort 40094 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 13:43:56,053 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.19.76 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 13:43:56,323 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:1270-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:1270], selector=sun.nio.ch.EPollSelectorImpl@323eac4e, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 1270 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-11]
2022-05-05 13:43:56,323 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-11]
2022-05-05 13:43:56,335 INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-27420 in Empty state. Created a new member id console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3]
2022-05-05 13:43:56,340 INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-27420 in state PreparingRebalance with old generation 0 (__consumer_offsets-26) (reason: Adding new member console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-6]
2022-05-05 13:43:59,342 INFO [GroupCoordinator 0]: Stabilized group console-consumer-27420 generation 1 (__consumer_offsets-26) with 1 members (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
2022-05-05 13:43:59,359 INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 for group console-consumer-27420 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-7]
2022-05-05 13:43:59,468 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:1584-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:1584], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 1584 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 13:43:59,468 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]案例2(这不起作用,也是我问这个问题的原因)
测试从我的本地机器(通过互联网连接)。
生产者
$ /opt/kafka_2.13-3.1.0/bin/kafka-console-producer.sh --bootstrap-server internet-reachable-domain-name:9094 --topic public.kafka.incoming.events --producer.config consumer-ssl.properties
....long SSL/TLS hand-shake....
)
ttesttttt
javax.net.ssl|ALL|16|kafka-producer-network-thread | console-producer|2022-05-05 10:05:04.469 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine注意字符'>‘是如何不显示在第一行上的。
消费者
$ /opt/kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server internet-reachable-domain-name:9094 --topic public.kafka.incoming.events --consumer.config consumer-ssl.properties
javax.net.ssl|DEBUG|01|main|2022-05-05 10:08:29.034 EDT|Finished.java:535|Consuming server Finished handshake message (
"Finished": {
"verify data": {
0000: 6A 3A 3D F6 87 81 A0 36 FF 08 77 CB
}'}
)
javax.net.ssl|ALL|01|main|2022-05-05 10:08:37.815 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:08:48.159 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:09:06.713 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:09:37.193 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:10:11.936 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine注意消息是如何到达的,即使我输入了大小写#1的生产者的消息,它也不会显示在这个使用者上。
确认
基于Kafka服务器日志的确认。
2022-05-05 14:14:43,507 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.30.64:19952-46 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.30.64:19952], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.30.64' peerPort 19952 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 14:14:43,507 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.30.64 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]2022-05-05 14:16:02,620 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.219.64:42153-47 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.219.64:42153], selector=sun.nio.ch.EPollSelectorImpl@7df0ddcc, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.219.64' peerPort 42153 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 14:16:02,621 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.219.64 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]其他信息
我使用的选项是:
export KAFKA_OPTS="-Djavax.net.debug=ssl:handshake:verbose"
从卡夫卡客户那里获得更多信息。这就是我在第二种情况下从制片人那里看到的。
javax.net.ssl|ALL|01|main|2022-05-05 09:54:50.374 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:55:23.285 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:55:48.477 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:56:13.718 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:56:50.532 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:57:25.793 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:58:01.707 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:58:30.806 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:59:07.867 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine卡夫卡版本
这两个客户都是相同的:
# Local device
$ /opt/kafka_2.13-3.1.0/bin/kafka-topics.sh --version
3.1.0 (Commit:37edeed0777bacb3)
# Kafka server pod
$ /opt/kafka/bin/kafka-topics.sh --version
3.1.0 (Commit:37edeed0777bacb3)发布于 2022-05-09 21:16:24
感谢@OneCricketeer提供的信息。
你提供的链接是我必须要看的。问题是,在我的Kubernetes集群中,输入选项或负载均衡器没有工作,卡夫卡没有开始。只有nodePort选项有效,但这不足以解决我遇到的问题。
最后,我用的是肉桂,而不是strimzi,https://bitnami.com/stack/kafka/helm,它的效果就像魅力。
我依赖于@橡胶鸭在在kubernetes集群外访问bitnami/kafka上提供的示例
我只需要确保通过负载均衡器传送端口。
命令:
helm install kafka-bitnami --values bitnami-values-external.yaml bitnami/kafka文件:bitnami-values Foreignal.yaml
replicaCount: 3
externalAccess:
enabled: true
autoDiscovery:
enabled: false
image:
registry: docker.io
repository: bitnami/kubectl
tag: 1.23.4-debian-10-r17
pullPolicy: IfNotPresent
pullSecrets: []
resources:
limits: {}
requests: {}
service:
type: NodePort
port: 9094
loadBalancerIPs: []
loadBalancerSourceRanges: []
nodePorts:
- 30000
- 30001
- 30002
useHostIPs: false
annotations: {}
domain: <domain-name which points to my load-balancer public IP>请注意,replicaCount必须大大增加nodePorts项的数量。
负载平衡器
负载均衡器配置将具有如下内容:
frontend-1 port: 30000 -> backend-1 port: 30000, server ips: [set of private worker node ips]
frontend-1 port: 30001 -> backend-1 port: 30001, server ips: [set of private worker node ips]
frontend-1 port: 30002 -> backend-1 port: 30002, server ips: [set of private worker node ips]也许从这里开始,我可以回到strimzi,尝试调整/修复它,因为我更了解它的工作原理。
发布于 2022-05-18 20:06:27
https://stackoverflow.com/questions/72128817
复制相似问题