说说 MQ之Kafka(二)

来源:Valleylord ,

valleylord.github.io/post/201607-mq-kafka/

Kafka 的工具和编程接口

Kafka 的工具

Kafka 提供的工具还是比较全的,bin/ 目录下的工具有以下一些,

bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh

bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh

bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh

bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh

bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh

bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh

bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh

我常用的命令有以下几个,

bin/kafka-server-start.sh -daemon config/server.properties &

bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1

bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181

bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1

bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1

bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning

bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

kafka-server-start.sh 是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties,该文件中的配置项待会再说.还有一个 -daemon 参数,这个是将 Kafka 放在后台用守护进程的方式运行,如果不加这个参数,Kafka 会在运行一段时间后自动退出,据说这个是 0.10.0.0 版本才有的问题 5。kafka-topics.sh 是用于管理 Topic 的工具,我主要用的 --describe、--list、--delete、--create 这4个功能,上述的例子基本是不言自明的,--replication-factor 3、--partitions 2 这两个参数分别表示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh 和 kafka-console-producer.sh 是生产者和消费者的简易终端工具,在调试的时候比较有用,我常用的是 kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的发布版本 3.4.8,端口是默认2181,与 Broker 在同一台机器上。

下面说一下 Broker 启动的配置文件 config/server.properties,我在默认配置的基础上,修改了以下一些,

broker.id=0

listeners=PLAINTEXT://192.168.232.23:9092

log.dirs=/tmp/kafka-logs

剩下的工具多数在文档中也有提到。如果看一下这些脚本的话,会发现多数脚本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可见,这些工具都是使用运行 Java Class 的方式调用的。

Kafka 的 Java API

在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,基本上,无论用什么语言开发,都能找到相应的 API。下面说一下 Java 的 API 接口。

生产者的 API 只有一种,相对比较简单,代码如下,

import java.util.Properties;

public class SimpleProducerDemo {

public static void main(String[] args){

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");

props.put("zookeeper.connect", "192.168.232.23:2181");

props.put("client.id", "DemoProducer");

KafkaProducer producer = new KafkaProducer(props);

String topic = "topic1";

Boolean isAsync = false;

int messageNo = 1;

while (true) {

String messageStr = "Message_" + String.format("%05d",messageNo);

long startTime = System.currentTimeMillis();

if (isAsync) { // Send asynchronously

producer.send(new ProducerRecord(topic,

messageNo,

messageStr), new DemoCallBack(startTime, messageNo, messageStr));

} else { // Send synchronously

try {

producer.send(new ProducerRecord(topic,

messageNo,

messageStr)).get();

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

try {

Thread.sleep(3);

} catch (InterruptedException e) {

e.printStackTrace();

}

++messageNo;

}

}

}

class DemoCallBack implements Callback {

private final long startTime;

private final int key;

private final String message;

public DemoCallBack(long startTime, int key, String message) {

this.startTime = startTime;

this.key = key;

this.message = message;

}

public void onCompletion(RecordMetadata metadata, Exception exception) {

long elapsedTime = System.currentTimeMillis() - startTime;

if (metadata != null) {

"Send message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +

" to partition(" + metadata.partition() +

") in " + elapsedTime + " ms");

} else {

exception.printStackTrace();

}

}

}

import java.util.Arrays;

import java.util.Properties;

public class SimpleConsumer {

public static void main(String[] args){

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.232.23:9092");

props.put("group.id", "test");

KafkaConsumer consumer = new KafkaConsumer(props);

consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));

while (true) {

ConsumerRecords records = consumer.poll(100);

for (ConsumerRecord record : records) {

}

}

}

}

消费者还有旧的 API,比如 Consumer 和 SimpleConsumer API,这些都可以从 Kafka 代码的 kafka-example 中找到,上述的两个例子也是改写自 kafka-example。使用新旧 API 在功能上都能满足消息收发的需要,但新 API 只依赖 kafka-clients,打包出来的 jar 包会小很多,以我的测试,新 API 的消费者 jar 包大约有 2M 左右,而旧 API 的消费者 jar 包接近 16M。

其实,Kafka 也提供了按分区订阅,可以一次订阅多个分区 TopicPartition[];也支持手动提交 offset,需要调用 consumer.commitSync。

String[] options = new String[]{

"--create",

"--zookeeper",

"192.168.232.23:2181",

"--partitions",

"2",

"--replication-factor",

"3",

"--topic",

"topic1"

};

TopicCommand.main(options);

但是这样写有一个问题,在执行完 TopicCommand.main(options); 之后,系统会自动退出,原因是执行完指令之后,会调用 System.exit(exitCode); 系统直接退出。这样当然不行,我的办法是,把相关的执行代码挖出来,写一个 TopicUtils 类,如下,

import joptsimple.OptionSpecBuilder;

import kafka.admin.TopicCommand;

import kafka.admin.TopicCommand$;

import kafka.utils.ZkUtils;

import scala.runtime.Nothing$;

public class TopicUtils {

// from: http://blog.csdn.net/changong28/article/details/39325079

// from: http://www.cnblogs.com/davidwang456/p/4313784.html

public static void createTopic(){

String[] options = new String[]{

"--create",

"--zookeeper",

KafkaProperties.ZOOKEEPER_URL,

"--partitions",

"2",

"--replication-factor",

"3",

"--topic",

KafkaProperties.TOPIC

};

// TopicCommand.main(options);

oper(options);

}

public static void listTopic(){

String[] options = new String[]{

"--list",

"--zookeeper",

KafkaProperties.ZOOKEEPER_URL

};

// TopicCommand.main(options);

oper(options);

}

public static void deleteTopic(){

String[] options = new String[]{

"--delete",

"--zookeeper",

KafkaProperties.ZOOKEEPER_URL,

"--topic",

KafkaProperties.TOPIC

};

// TopicCommand.main(options);

oper(options);

}

public static void describeTopic(){

String[] options = new String[]{

"--describe",

"--zookeeper",

KafkaProperties.ZOOKEEPER_URL,

"--topic",

KafkaProperties.TOPIC

};

// TopicCommand.main(options);

oper(options);

}

public static void main(String[] args){

listTopic();

createTopic();

listTopic();

describeTopic();

deleteTopic();

try {

Thread.sleep(3*1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

listTopic();

}

/** copied & modified from kafka.admin.TopicCommand$.main

*

* @param args

*/

public static void oper(String args[]){

try {

TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;

final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);

if(args.length == 0) {

throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic.");

} else {

int actions =0;

OptionSpecBuilder[] optionSpecBuilders = ;

for (OptionSpecBuilder temp:optionSpecBuilders){

if (opts.options().has(temp)) {

actions++;

}

}

if(actions != 1) {

throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Command must include exactly one action: --list, --describe, --create, --alter or --delete");

} else {

opts.checkArgs();

ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());

byte exitCode = 0;

try {

try {

if(opts.options().has(opts.createOpt())) {

topicCommand$.createTopic(zkUtils, opts);

} else if(opts.options().has(opts.alterOpt())) {

topicCommand$.alterTopic(zkUtils, opts);

} else if(opts.options().has(opts.listOpt())) {

topicCommand$.listTopics(zkUtils, opts);

} else if(opts.options().has(opts.describeOpt())) {

topicCommand$.describeTopic(zkUtils, opts);

} else if(opts.options().has(opts.deleteOpt())) {

topicCommand$.deleteTopic(zkUtils, opts);

}

} catch (final Throwable var12) {

scala.Predef$.MODULE$.println((new StringBuilder()).append("Error while executing topic command : ").append(var12.getMessage()).toString());

exitCode = 1;

return;

}

} finally {

zkUtils.close();

// System.exit(exitCode);

}

}

}

} catch (Nothing$ nothing$) {

nothing$.printStackTrace();

}

}

}

参考文章

系列

【关于投稿】

如果大家有原创好文投稿,请直接给公号发送留言。

① 留言格式:

【投稿】+《 文章标题》+ 文章链接

② 示例:

【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/

③ 最后请附上您的个人简介哈~

看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181030B15NSP00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励