因此,我测试了我的生产者的弹性,我有一个对象,我想发送给kafka (以json格式),然后根据结果,它将更新我的数据库。onSuccess回调非常完美,但是onFailure根本不触发,这使得我无法在发送失败的情况下完成我的目标,甚至无法测试它。
我尝试将我的对象发送到一个不存在的主题,我所得到的只是一些错误日志,而且我知道它有异常,因为LoggingProducerListener这么说。我检查了KafkaTemplate的代码,在通知LoggingProducerListener之前,它应该按照预期的方式注册错误。这是代码和日志。
final ListenableFuture<Se
我有这个简单的生产者和主题有两个分区。我想将前十条消息发送到分区0,其余的十条消息发送到另一个分区。有可能吗?
package com.company;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
pu
我正在开发一个应用程序,在这个应用程序中,我从IBM队列中读取,处理该消息,然后将该消息发送到Kafka主题。我正试图处理卡夫卡经纪人破产的案子。在这种情况下,我希望我的应用程序回滚事务,并尝试再次写入Kafka主题,进行X次重试,然后将消息发送到备份队列。但是,我在messageChannel.send调用中抛出异常有困难。我无法使它超时。应用程序在messageChannel.send调用上被无限阻塞。这是我的代码:
@Component
public class MainQueueListener {
@Value("${mq.queueName}")
我想用topic1处理消费者的数据,然后把信息发送回卡夫卡到topic2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
我的尝试:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ to
我有一个困难的时间故障排除,什么应该是一个简单的卡夫卡问题。
我正在尝试发布一些来自kafak-console-producer的消息。当我输入“嗨”时,我得到:
/opt/kafka/bin/kafka-console-producer.sh --topic test --broker-list 172.17.0.21:9092
hi
[2016-01-25 12:56:19,839] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: Batch Expired
我使用的是scala、spark和Kafka。我有两个问题。
1.如何确认Kafka broker(服务端)中存在topic?
2.如何确认Kafka服务器(bootstrap服务器)是否正在运行?
object kafkaProducer extends App {
def sendMessages(): Unit = {
//define topic
val topic = "spark-topic" // how can i confirm this topic is exist in kafka server ?
//define produc
我是卡夫卡和的新手。当使用Admin与Kakfa汇合.NET客户端时,我正在寻求有关如何实现失败弹性的建议。我正在使用Admin客户端来创建一个主题,如果在启动期间它还不存在的话。
最初,我试图使用实现一个简单的等待和重试策略,如下所示。我预计这将重新尝试一个create操作,以获得可配置的尝试次数。在每次重试尝试之间,都有一个可配置的短等待延迟。如果所有重试尝试都已用尽,则会发出致命错误信号,应用程序将优雅地退出。
等待和重试策略
public static AsyncRetryPolicy BrokerWaitAndRetry(short retryCount, short waitSec
我可以通过io.restassured.RestAssured确认端点在单元测试中工作。然而,在我启动服务之后,每个端点总是返回一页系统信息,例如 # HELP kafka_producer_node_request_total The total number of requests sent
# TYPE kafka_producer_node_request_total counter
kafka_producer_node_request_total{client_id="kafka-producer-metric-message-out",kafka_version
生产者代码将从光盘中读取.mp4视频文件,并将其发送给卡夫卡,后者显然可以打印"Message sent to the Kafka Topic java_in_use_topic Successfully",但consumer.poll是空的:
@RestController
@RequestMapping(value = "/javainuse-kafka/")
public class ApacheKafkaWebController {
@GetMapping(value = "/producer")
public String produ
我正在尝试在AWS中创建一个Lambda,作为MSK主题的生产者。所有的亚马逊网络服务文档都说要创建一个新的EC2实例,但是因为我的Lambda在同一个VPC中,所以我觉得应该可以这样做。我对此非常陌生,我注意到我的日志语句在我的producer.on函数中从不命中。我使用的是nodejs和kafka-node模块。代码可以在下面找到。 本质上,我只是想知道是否有人知道如何做到这一点,为什么当我通过Lambda运行测试时,producer.on函数从未命中?这只是一些测试代码,看看我是否可以让它发送,但如果需要更多的数据或帮助,请让我知道并提前感谢。 exports.handler = as