我正在尝试构建一个事件驱动的应用程序。这是一个要求,一些服务将只使用Kafka事件来存储信息,所以我使用Kafka表。
我有两个服务共享相同的Kafka表,这是一个复杂的业务逻辑,所以我希望确保相同的代码构建表。
我的问题是:同一个Kafka Table的不同实例可以共享同一个客户端id吗?
我已经构建了这个示例,它只运行过一次。但现在我遇到了一些问题。它总是被赋予一个异常:The state store, topic-name, may have migrated to another instance.
我的两个服务都运行在同一台机器上,没有容器。
我正在配置我的Kafka流:
Prope
在我的需求中,我有两个消费者组,一个组(Main)只是获取数据并发送到其他服务器,如果发送到其他服务器失败,那么我需要重新加入(启动)其他消费者组(失败的处理)。 在这种情况下,Main Group将继续读取并重试,并将继续相同的操作,当消息发送成功时,它需要通知其他使用者组(失败的处理)。现在失败的处理应该从第一个失败到最后一个失败的地方开始发送。 public void StartMainstreamHandler() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, S
我对卡夫卡和溪流很陌生。我正在创建一个本地商店,以保存来自特定主题components的所有更新。我不知道我做错了什么。还有其他方法可以从Stream创建存储吗?
我需要在卡夫卡中创建一个主题comp-store吗?
public class MyStream {
final static CountDownLatch latch = new CountDownLatch(1);
private static final String APP_ID = "MyTestApp";
public static void main(String[] arg
我正在访问一个状态存储来查询它,并且不得不用一个try/catch块包装store()语句来重试它,因为有时我会得到这个异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvid
我有一个本地的码头实例,3个动物园管理员和3个卡夫卡经纪人运行1.0.0,以确保更新我们的环境将如预期的那样工作。我在kafka 1.0.0上创建了一个流处理器,它不消耗也不产生任何主题--它只是简单地初始化自己。
当我试图运行这个处理器时,我会收到以下错误消息:
Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException:
Could not find any available broker.
Check your StreamsConfig set