首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

在你的 Java 程序里调用 Kafka 发起数据流

Kafka 与 Java 的集成最容易实现了。

按照文档上面的例子,稍微改一改就能用。

kafka 是 LinkedIn 的开源产品

源代码托管在 https://github.com/apache/kafka 上面

因此需要下载源代码,才编译成想要的jar,供自己编写的 java 程序调用

当然也可以使用安装包里面的 kafka lib,里面包含了所有的 kafka api.

只要在新建的 java project 中引用安装包里面的 kafka library即可。

找到 examples 文件夹,修改 Demo 文件 /KafkaDemo/AppDemo/KafkaConsumerProducerDemo.java:

因为我们不需要模拟 consumer 的接收情况,所以注释掉 consumer 部分.

public class KafkaConsumerProducerDemo implements KafkaProperties

{

public static void main(String[] args)

{

Producer producerThread = new Producer(KafkaProperties.topic);

producerThread.start();

// Consumer consumerThread = new Consumer(KafkaProperties.topic);

// consumerThread.start();

//

}

}

找到 producer.java , 修改其代码,让它每1秒钟发送一次消息:

public void run() {

int messageNo = 1;

while (true) {

String messageStr = new String("message from Java App: Message_" + messageNo);

producer.send(new KeyedMessage(topic, messageStr));

messageNo++;

try {

this.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

还需要修改 properties.java 文件,使其适应我们本机的 zookeeper, kafka cluster 配置

public interface KafkaProperties

{

final static String zkConnect = "127.0.0.1:2181";

final static String groupId = "group1";

final static String topic = "ConsoleRealTimeMessage";

final static String kafkaServerURL = "localhost";

final static int kafkaServerPort = 9092;

final static int kafkaProducerBufferSize = 64*1024;

final static int connectionTimeOut = 100000;

final static int reconnectInterval = 10000;

final static String topic2 = "topic2";

final static String topic3 = "topic3";

final static String clientId = "SimpleConsumerDemoClient";

}

打开一个 terminal 窗口,接收消息:

>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ConsoleRealTimeMessage --from-beginning

欢迎关注【有关SQL】

帮忙转发的朋友,可以截图后台私信我,

并申请免费接入我的【知识星球】,

翻阅我平时翻译的作品和自己写书的稿子

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180224G0GGWW00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券