KafkaProperties java文档:
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
我有hello掌声,其中包含application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset
我的应用程序需要将不同的记录发送到不同的主题。我的应用程序正在使用同样的Kafka集群。由于应用程序使用相同的Kafka集群,创建一个生产者工厂就足够了(如果需要更多的话,请告诉我)。
在我看来,我有两个选择。
对两个主题使用相同的kafkaTemplate,并按以下主题调用send方法(请假定我使用了春季默认的Kafka生产者配置)。在这里,我们需要为每个调用传递主题&对于多个主题,我们使用相同的Kafka模板。
class ProducerService {
@Autowired
private KafkaTemplate<GenericRecord, Gen
我正在尝试处理producer无法向Kafka发送消息的情况:
try {
Future<RecordMetadata> res = producer.send(new ProducerRecord<>(topic, msg.key(), msg));
log.info("Waiting for confirmation from kafka for message : \n {}",msg.toString());
record = res.get(30,TimeUnit.SECONDS);
log.info("
我使用的是kafka-client 1.0.0库中的KafkaProducer,根据文档,Future<RecordMetadata> send(ProducerRecord<K, V> record)方法将立即返回,但实际上是这样,但看起来并非如此。该方法还调用同一个类中的另一个方法doSend (见下面的片段),在这个方法中,它正在等待主题的元数据,我认为这是必要的,因为它与分区等有关。
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<Re
我试图在处理异常时向DLQ发送消息,但是当我从Spring-boot-kafka-streams-binder使用SendToDlqAndContinue时,我一直收到序列化异常
@EnableBinding(ConsumerStreamsWay.KStreamBinding.class)
public class ConsumerStreamsWay {
@Autowired
private SendToDlqAndContinue dlqHandler;
@StreamListener
public void topic3Processor2(@Input("topic3
我有一个Ingres DB with History表,它记录数据库事件,如插入、更新和删除。我有一个生产者,这将是多线程。这个生产者将读取History表,以找到要选择的表和行,然后将该行添加到Kafka主题中。现在生产者需要确保将事件添加到Kafka主题中,其方式与History表登录的方式相同。因此,使用者读取它们的顺序与它在History表中记录的顺序相同,并在Postgrace上执行它。
我可以将这些数据生成多个生产者。示例
Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message
当我尝试使用使用kafka控制台工具(V0.9.0.1)托管在ec2中的kafka服务器(V0.9.0.1)时,我会得到以下异常。我怎样才能克服这一切?
#./kafka-控制台-消费.from动物园管理员zookeeper1.xx.com:2181 -主题MY_TOPIC -从开始
[2016-04-06 14:34:58,219] WARN Fetching topic metadata with correlation id 0 for topics [Set(MY_TOPIC)] from broker [BrokerEndPoint(1014,kafka3.xx.com,9092)]
我刚开始学习Python和Kafka。这是我尝试开始的第一个示例。
我得到了一个例外:
Traceback (most recent call last):
File "producer.py", line 23, in <module>
main()
File "producer.py", line 18, in main
print_response(producer.send_messages(topic, msg))
File "D:\Setups\Python35-32\lib\site-packages
我正在尝试为python kafka创建一个简单的函数,但是在将一个字符串循环传递给producer.send_messages方法时遇到了问题。
from kafka import SimpleProducer, KafkaClient
import random
import os
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
x = 0
while x!=1:
#producer.send_messages(b'test1',b'st
因此,有了,我尝试创建一个简单的发布者,它将能够发布基于主题的消息(类似于):
using (var context = ZmqContext.Create())
using (var pub = context.CreateSocket(SocketType.PUB))
{
pub.Bind(url);
while (true)
{
pub.Send( // How to send "Message" string on topic "A.message"?
}
}
那么如何在topic/上发送带有A.messa