如果生产者一方的确认滞后很大,我们是否可以使用一些代理度量来监视Kafka broker。
我们正在使用datadog监控生产者和卡夫卡经纪人方面。可以看出,出厂延迟超过10秒。但是,在代理方面,我觉得只使用message.in.rate和kafka.net.bytes_in.rate并不是很有效。我们最好在代理端有一些滞后指标来指示the broker is fully loaded to acknowledge back the producer.。
另外,我们只对分区领导者使用kafka.acks = 1。
我想知道是否有人在这方面有一些经验,任何建议都是欢迎的。)预先感谢。
如果一个人可以从kafka集群中的一个主题推送和消费,该集群由3个代理组成,运行在kubernetes和auto_scaling disabled;中,这是否可以用来得出结论,即我们可以成功地推送和消费集群中的所有其他主题?
Total brokers in the cluster: 3
Min-InSyncReplicas: 2
No. of partitions for eahc topic: 25
我对卡夫卡很陌生。我有以下架构:
1) 2 servers runing application logics and database, can I write kafka producer on these servers wrapped with docker container ?
2) 1 server reserved for kafka broker and zookeeper
3) 1 sever reserved for kafka comsumer
我很困惑
1) whether I can run kafka producer, broker and consumer
我想使用kafka-console-producer cli工具向使用事务客户端的主题发送消息。我认为我的配置是正确的,但是当我运行cli工具时,我得到了以下异常:
java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
因此,看起来kafka控制台生产者,我相信它实例化了kafka.tools.ConsoleProducer类,要么没有调用initTransaction
KafkaStream在很长一段时间后无法生成数据。(超过设定的过期时间)
在记录错误消息后,甚至KafkaStream也死了。
例外情况如下:
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at o
我已经编写了自己的序列化程序来将java对象发布到主题。我将serializer.class属性设置为自定义序列化程序。在运行生产者时,我会得到以下异常。有人能帮我吗?
异常
Exception in thread "main" java.lang.NoSuchMethodException: com.xxxx.CustomFileSerializer.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3082)
at java
我正在开发Spring和Apache --尝试使用用户定义的配置-
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.Consumer.consume(com.example.demo.User) throws java.io.IOException]
Bea
我对kafka很陌生,我使用Kafka生产者Java。面对这个问题,卡夫卡,Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION。
人们已经写到,只有在没有交易的情况下,才应该调用producer.abortTransaction() .知道如何检查飞行中是否有交易吗?以及如何清除/阻止它们?
这是我的密码:
try {
producer.send(record, new Callback() {
@Override
publ
我的应用程序需要将不同的记录发送到不同的主题。我的应用程序正在使用同样的Kafka集群。由于应用程序使用相同的Kafka集群,创建一个生产者工厂就足够了(如果需要更多的话,请告诉我)。
在我看来,我有两个选择。
对两个主题使用相同的kafkaTemplate,并按以下主题调用send方法(请假定我使用了春季默认的Kafka生产者配置)。在这里,我们需要为每个调用传递主题&对于多个主题,我们使用相同的Kafka模板。
class ProducerService {
@Autowired
private KafkaTemplate<GenericRecord, Gen
我是卡夫卡的新手。我有关于kafka配置的问题。
我想使用如下所示的独立服务器,
server1:卡夫卡生产者server2:卡夫卡经纪人,卡夫卡消费者,动物园管理员
但是,我不能向broker发送消息。我收到了这个错误消息。
在控制台生产者(Server1)上,控制台标准输出错误消息`
[2016-05-24 16:41:11,823] ERROR Error when sending message to topic twitter with key: null, value: 3 bytes with error: Failed to update metadata after 6000