首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法从互联网上通过SSL连接到Kafka,而是从卡夫卡荚内部工作。

无法从互联网上通过SSL连接到Kafka,而是从卡夫卡荚内部工作。
EN

Stack Overflow用户
提问于 2022-05-05 14:22:56
回答 2查看 418关注 0票数 0

我已经将Kafka 纹子部署到我的Kubernetes集群中。

我的目标是能够通过SSL侦听器(端口9094)连接到Kafka主题,并能够生成/使用消息。

症状

代码语言:javascript
复制
[2022-05-05 10:20:11,252] ERROR Error when sending message to topic public.kafka.incoming.events with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Topic public.kafka.incoming.events not present in metadata after 60000 ms.

该消息不能有效地由生产者发送或由消费者接收。

当选择负载均衡器或入口作为类型时(即:kafka.listeners.external.type)时,条子-簇-运算符荚的日志。

代码语言:javascript
复制
failed to reconcile ZookeeperScalingException: Failed to connect to Zookeeper my-cluster-zookeeper-0.my-cluster-zookeeper-nodes.my-kafka-project.svc:2181. Connection was not ready in 300000 ms.

不确定这是否与我的Kubernetes集群配置问题有关。当我选择NodePort -它的工作,如预期和卡夫卡+动物园是正常启动。

为了使这个故事更简短,我有:

1.激活调试标志:

log4j.logger.org.apache.kafka=INFO -> log4j.logger.org.apache.kafka=DEBUG

我能从卡夫卡服务器上读取日志。

2.配置/生成签名证书、密钥库/信任库JK以及我使用的配置如下:

代码语言:javascript
复制
cat <<EOT > ssl.properties
security.protocol=SSL
ssl.protocol=TLSv1.2
bootstrap.servers=my-cluster-kafka-external-bootstrap:9094
ssl.truststore.location=/home/kafka/user-truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=/home/kafka/user-keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
ssl.endpoint.identification.algorithm=
EOT

3.案例1和不起作用的案例2之间的区别,这也是为什么我要问这个问题。

  1. 案例1:这是正常工作的情况,我从运行Kafka服务器的吊舱中连接到Kafka主题。
  2. 案例2:在第二种情况下,连接不像预期的那样工作,连接到一个暴露在互联网上的load-balancer(i.e )。然后通过专用网络 (10.20.10.0/24)建立负载均衡器连接,后者与Kubernetes集群通信。

案例1(这是我在这里帮助解决这个问题的原因)。

从卡夫卡服务器舱(在集群内)进行测试。

生产者

代码语言:javascript
复制
[kafka@my-cluster-kafka-0 ~]$ /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-external-bootstrap:9094 --topic public.kafka.incoming.events --producer.config consumer-ssl.properties 
>ttesttt
>

消费者

代码语言:javascript
复制
[kafka@my-cluster-kafka-0 ~]$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-0.my-cluster-kafka-brokers.my-kafka-project.svc.cluster.local:9094 --topic public.kafka.incoming.events --consumer.config consumer-ssl.properties
ttesttt

确认

基于Kafka服务器日志的确认。

  • 当生产者成功连接时
代码语言:javascript
复制
2022-05-05 13:40:07,988 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:45150-42 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:45150], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 45150 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 13:40:07,989 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
  • 当消费者成功连接时
代码语言:javascript
复制
2022-05-05 13:43:56,053 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.19.76:40094-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.19.76:40094], selector=sun.nio.ch.EPollSelectorImpl@7df0ddcc, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.19.76' peerPort 40094 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 13:43:56,053 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.19.76 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 13:43:56,323 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:1270-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:1270], selector=sun.nio.ch.EPollSelectorImpl@323eac4e, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 1270 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-11]
2022-05-05 13:43:56,323 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-11]
2022-05-05 13:43:56,335 INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-27420 in Empty state. Created a new member id console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3]
2022-05-05 13:43:56,340 INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-27420 in state PreparingRebalance with old generation 0 (__consumer_offsets-26) (reason: Adding new member console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-6]
2022-05-05 13:43:59,342 INFO [GroupCoordinator 0]: Stabilized group console-consumer-27420 generation 1 (__consumer_offsets-26) with 1 members (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
2022-05-05 13:43:59,359 INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-f9cb320c-a5cf-4e40-a686-6dbd1d37cdd6 for group console-consumer-27420 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-7]
2022-05-05 13:43:59,468 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-xx.xxx.xx4.xx7:1584-43 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/xx.xxx.xx4.xx7:1584], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost 'xx.xxx.xx4.xx7' peerPort 1584 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 13:43:59,468 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /xx.xxx.xx4.xx7 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]

案例2(这不起作用,也是我问这个问题的原因)

测试从我的本地机器(通过互联网连接)。

生产者

代码语言:javascript
复制
$ /opt/kafka_2.13-3.1.0/bin/kafka-console-producer.sh --bootstrap-server internet-reachable-domain-name:9094 --topic public.kafka.incoming.events --producer.config consumer-ssl.properties 
....long SSL/TLS hand-shake....
)
ttesttttt
javax.net.ssl|ALL|16|kafka-producer-network-thread | console-producer|2022-05-05 10:05:04.469 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine

注意字符'>‘是如何不显示在第一行上的。

消费者

代码语言:javascript
复制
$ /opt/kafka_2.13-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server internet-reachable-domain-name:9094 --topic public.kafka.incoming.events --consumer.config consumer-ssl.properties 
javax.net.ssl|DEBUG|01|main|2022-05-05 10:08:29.034 EDT|Finished.java:535|Consuming server Finished handshake message (
"Finished": {
  "verify data": {
    0000: 6A 3A 3D F6 87 81 A0 36   FF 08 77 CB 
  }'}
)
javax.net.ssl|ALL|01|main|2022-05-05 10:08:37.815 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:08:48.159 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:09:06.713 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:09:37.193 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 10:10:11.936 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine

注意消息是如何到达的,即使我输入了大小写#1的生产者的消息,它也不会显示在这个使用者上。

确认

基于Kafka服务器日志的确认。

  • 当生产者连接
代码语言:javascript
复制
2022-05-05 14:14:43,507 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.30.64:19952-46 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.30.64:19952], selector=sun.nio.ch.EPollSelectorImpl@5c2628ee, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.30.64' peerPort 19952 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
2022-05-05 14:14:43,507 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.30.64 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-12]
  • 当消费者正在连接时
代码语言:javascript
复制
2022-05-05 14:16:02,620 DEBUG [SslTransportLayer channelId=10.15.19.76:9094-10.15.219.64:42153-47 key=channel=java.nio.channels.SocketChannel[connected local=/10.15.19.76:9094 remote=/10.15.219.64:42153], selector=sun.nio.ch.EPollSelectorImpl@7df0ddcc, interestOps=1, readyOps=0] SSL handshake completed successfully with peerHost '10.15.219.64' peerPort 42153 peerPrincipal 'CN=my-user' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' (org.apache.kafka.common.network.SslTransportLayer) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]
2022-05-05 14:16:02,621 DEBUG [SocketServer listenerType=ZK_BROKER, nodeId=0] Successfully authenticated with /10.15.219.64 (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(EXTERNAL-9094)-SSL-10]

其他信息

我使用的选项是:

export KAFKA_OPTS="-Djavax.net.debug=ssl:handshake:verbose"

从卡夫卡客户那里获得更多信息。这就是我在第二种情况下从制片人那里看到的。

代码语言:javascript
复制
javax.net.ssl|ALL|01|main|2022-05-05 09:54:50.374 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:55:23.285 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:55:48.477 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:56:13.718 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:56:50.532 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:57:25.793 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:58:01.707 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:58:30.806 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine
javax.net.ssl|ALL|01|main|2022-05-05 09:59:07.867 EDT|SSLEngineImpl.java:752|Closing outbound of SSLEngine

卡夫卡版本

这两个客户都是相同的:

代码语言:javascript
复制
# Local device
$ /opt/kafka_2.13-3.1.0/bin/kafka-topics.sh --version
3.1.0 (Commit:37edeed0777bacb3)

# Kafka server pod
$ /opt/kafka/bin/kafka-topics.sh --version
3.1.0 (Commit:37edeed0777bacb3)
EN

回答 2

Stack Overflow用户

发布于 2022-05-09 21:16:24

感谢@OneCricketeer提供的信息。

你提供的链接是我必须要看的。问题是,在我的Kubernetes集群中,输入选项或负载均衡器没有工作,卡夫卡没有开始。只有nodePort选项有效,但这不足以解决我遇到的问题。

最后,我用的是肉桂,而不是strimzi,https://bitnami.com/stack/kafka/helm,它的效果就像魅力。

我依赖于@橡胶鸭在kubernetes集群外访问bitnami/kafka上提供的示例

我只需要确保通过负载均衡器传送端口。

命令:

代码语言:javascript
复制
helm install kafka-bitnami --values bitnami-values-external.yaml bitnami/kafka

文件:bitnami-values Foreignal.yaml

代码语言:javascript
复制
replicaCount: 3
externalAccess:
  enabled: true
  autoDiscovery:
    enabled: false
    image:
      registry: docker.io
      repository: bitnami/kubectl
      tag: 1.23.4-debian-10-r17
      pullPolicy: IfNotPresent
      pullSecrets: []
    resources:
      limits: {}
      requests: {}
  service:
    type: NodePort
    port: 9094
    loadBalancerIPs: []
    loadBalancerSourceRanges: []
    nodePorts:
      - 30000
      - 30001
      - 30002
    useHostIPs: false
    annotations: {}
    domain: <domain-name which points to my load-balancer public IP>

请注意,replicaCount必须大大增加nodePorts项的数量。

负载平衡器

负载均衡器配置将具有如下内容:

代码语言:javascript
复制
frontend-1 port: 30000 -> backend-1 port: 30000, server ips: [set of private worker node ips]
frontend-1 port: 30001 -> backend-1 port: 30001, server ips: [set of private worker node ips]
frontend-1 port: 30002 -> backend-1 port: 30002, server ips: [set of private worker node ips]

也许从这里开始,我可以回到strimzi,尝试调整/修复它,因为我更了解它的工作原理。

票数 0
EN

Stack Overflow用户

发布于 2022-05-18 20:06:27

斯特里齐是一个很好的平台,可以在Kubernetes上运行卡夫卡。由于您还没有共享kafka CR文件,我假设您已经完成了以下设置,以便从k8s集群之外访问Kafka -

代码语言:javascript
复制
# ...
listeners:
# ...
  - name: external
    port: 9094
    type: loadbalancer
    tls: true
# ...

确保你有如下的资源-

有了这个配置和正确的Authn/Authz,您就可以从k8s集群之外访问Kafka了。请检查这个文章 (PS -由我编写),它深入研究了如何在k8s集群中启用Authn/Authz,并讨论了如何从k8s集群内部和外部访问Kafka。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72128817

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档