Kafka集群安装

①.kafka需要依赖zk管理,在搭建kafka集群之前需要先搭建zk集群:

https://my.oschina.net/u/2486137/blog/1537389

②.从apache kafka官网下载kafka( 二进制版本)

       注意下载的版本否则会在启动时报错:找不到主类Kafka.kafka.

我这里使用的是2.10版本.

③.配置config/server.properties文件:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
#每个Broker在集群中的唯一标识.即使Broker的IP地址发生变化,broker.id只要没变,
#则不会影响consumers的消息标识.
broker.id=0 #类似于zk的myid,

#是否允许Topic被删除,如果是false,使用管理员工具删除Topic的时候,kafka并不会处理此操作
delete.topic.enable=true

#是否允许自动创建topic,若是false,就需要通过命令创建topic,默认为true,建议设置成false,
#并在使用topic之前手动创建.
#如果打开此选项(true)则以下2种请求会触发topic的自动创建:
#①.producer向某个不存在的topic写入消息
#②.consumer某个不存在的topic读取消息
auto.create.topics.enable =true

############################# Socket Server Settings #############################
############################# 下面是服务端网络的相关配置 #############################
# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
# kafka server使用的协议,主机名及端口格式如下:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#参考示例:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092,#这是默认配置,使用PLAINTEXT,端口是9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
#broker处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3

# The number of threads doing disk I/O
#broker处理磁盘IO的线程数,数值应该大于你的硬盘数
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
#socket的发送缓冲区,socket的调优参数SO_SNDBUFF,如果是-1就使用操作系统的默认值
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
#socket的接受缓冲区,socket的调优参数SO_RCVBUFF,如果是-1就使用操作系统的默认值
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
#socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
#存储log文件的目录,可以将多个目录通过逗号分隔,形成一个目录列表
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#每个topic的分区个数,默认为1,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
#用来恢复log文件以及关闭时将log数据刷新到磁盘的线程数量,每个目录对应num.recovery.threads.per.data.dir个线程
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################
############################# log文件刷盘的相关配置 #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,
#但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
#如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),
#如果此值过小,将会导致"fsync"的次数较多,
#这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
#每隔多少个消息触发一次flush操作,将内存中的数据刷新到磁盘
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#仅仅通过interval来控制消息的磁盘写入时机,是不足的.
#此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
#每隔多少毫秒触发一次flush操作,将内存中的数据刷新到磁盘
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
############################# Log 相关的保存策略 #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
#注意:下面有两种配置,一种是基于时间的策略,另一种是基于日志文件大小的策略,两种.
#策略同是配置的话,只要满足其中一种,则触发log删除的操作,删除操作总是删除最旧的日志
# The minimum age of a log file to be eligible for deletion
#消息在kafka中保存的时间,168小时前的log,可以被删除掉
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#当剩余空间低于log.segment.bytes字节,则开始删除log
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# segment日志文件大小的上限值,当超过这个值,会创建新的segment日志文件
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
#每隔300000ms,logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
############################# Zookeeper的相关配置 #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#kafka依赖的Zookeeper集群地址,可以配置多个Zookeeper地址,使用,隔开
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

# Timeout in ms for connecting to zookeeper
#Zookeeper连接超时的超时时间
zookeeper.connection.timeout.ms=6000

④.查看启动日志:

[2017-09-16 19:22:12,567] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 3
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        default.replication.factor = 1
        delete.topic.enable = false
        fetch.purgatory.purge.interval.requests = 1000
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name = 
        inter.broker.protocol.version = 0.10.1-IV2
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listeners = PLAINTEXT://k1:9092
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /usr/local/kafka_2.10/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.10.1-IV2
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides = 
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 3
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class 
        org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        unclean.leader.election.enable = true
        zookeeper.connect = zk1:2181,zk2:2181,zk3:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2017-09-16 19:22:12,910] INFO starting (kafka.server.KafkaServer)
[2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledReques
tReaper)
[2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequ
estReaper)

        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null

[2017-09-16 19:22:13,241] INFO Connecting to zookeeper on zk1:2181,zk2:2181,zk3:2181 (kafka
.server.KafkaServer)
[2017-09-16 19:22:15,475] INFO Cluster ID = YkyEXTiPR62G5jdo1v6rKQ (kafka.server.KafkaServer)
[2017-09-16 19:22:15,570] INFO Log directory '/usr/local/kafka_2.10/kafka-logs' not found, creating it. (kafka.log.LogMan
ager)
[2017-09-16 19:22:15,708] INFO Loading logs. (kafka.log.LogManager)
[2017-09-16 19:22:15,723] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)
[2017-09-16 19:22:21,676] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2017-09-16 19:22:21,844] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManage
r)
[2017-09-16 19:22:21,850] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka.
server.BrokerMetadataCheckpoint)
[2017-09-16 19:22:22,028] INFO Awaiting socket connections on k3:9092. (kafka.network.Acceptor)
[2017-09-16 19:22:22,032] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer)
[2017-09-16 19:22:22,081] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe
aper)
[2017-09-16 19:22:22,092] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe
aper)
[2017-09-16 19:22:22,174] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe
aper)
[2017-09-16 19:22:22,181] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe
aper)
[2017-09-16 19:22:22,186] INFO [ExpirationReaper-3], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe
aper)
[2017-09-16 19:22:22,218] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator)
[2017-09-16 19:22:22,220] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2017-09-16 19:22:22,233] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 8 milliseconds. (kafka.
coordinator.GroupMetadataManager)
[2017-09-16 19:22:22,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2017-09-16 19:22:22,992] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-09-16 19:22:23,087] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-09-16 19:22:23,090] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(192.168.1
.137,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2017-09-16 19:22:23,092] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka.
server.BrokerMetadataCheckpoint)
[2017-09-16 19:22:23,498] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
[2017-09-16 19:32:22,220] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka.
coordinator.GroupMetadataManager)

⑤.创建一个topic

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-1 --partitions 3 --replication-factor 3 --config max.message.bytes=64000 --config flush.messages=1

⑥.查看topic信息:

可以看到主题,分区,副本等一些信息

[root@localhost kafka_2.10]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-1
Topic:test-1	PartitionCount:3	ReplicationFactor:3	Configs:max.message.bytes=64000,flush.messages=1
	Topic: test-1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: test-1	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: test-1	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

⑦.java client代码:

生产者:

	List<ProducerInterceptor<Integer,String>> interceptors = new ArrayList<ProducerInterceptor<Integer,String>>();
		interceptors.add(new KafkaProducerInterceptor());
		Properties props = new Properties();
		props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST);
		props.put("key.serializer", IntegerSerializer.class);
		props.put("value.serializer", StringSerializer.class);
		props.put("compression.type", "gzip");
		@SuppressWarnings("resource")
        KafkaProducer<Integer, String> producer  = new KafkaProducer<Integer, String>(props);
		String content = "";
		for(int i =0;i<100;i++){
			content = "hello:"+(i+1);
			ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
					"test-1", i, content);
			producer.send(record, new KafkaHandle());
			System.out.println("async message:" + content);
		}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/repository/org/slf4j/slf4j-log4j12/1.7.1/slf4j-log4j12-1.7.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
722  [main] INFO  o.a.k.c.p.ProducerConfig - ProducerConfig values: 
	interceptor.classes = null
	request.timeout.ms = 30000
	ssl.truststore.password = null
	retry.backoff.ms = 100
	buffer.memory = 33554432
	batch.size = 16384
	ssl.keymanager.algorithm = SunX509
	receive.buffer.bytes = 32768
	ssl.key.password = null
	ssl.cipher.suites = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.service.name = null
	ssl.provider = null
	max.in.flight.requests.per.connection = 5
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092]
	client.id = 
	max.request.size = 1048576
	acks = 1
	linger.ms = 0
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	metadata.fetch.timeout.ms = 60000
	ssl.endpoint.identification.algorithm = null
	ssl.keystore.location = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	block.on.buffer.full = false
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	metrics.sample.window.ms = 30000
	security.protocol = PLAINTEXT
	metadata.max.age.ms = 300000
	ssl.protocol = TLS
	sasl.kerberos.min.time.before.relogin = 60000
	timeout.ms = 30000
	connections.max.idle.ms = 540000
	ssl.trustmanager.algorithm = PKIX
	metric.reporters = []
	ssl.truststore.type = JKS
	compression.type = gzip
	retries = 0
	max.block.ms = 60000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	send.buffer.bytes = 131072
	reconnect.backoff.ms = 50
	metrics.num.samples = 2
	ssl.keystore.type = JKS

794  [main] INFO  o.a.k.c.p.ProducerConfig - ProducerConfig values: 
	interceptor.classes = null
	request.timeout.ms = 30000
	ssl.truststore.password = null
	retry.backoff.ms = 100
	buffer.memory = 33554432
	batch.size = 16384
	ssl.keymanager.algorithm = SunX509
	receive.buffer.bytes = 32768
	ssl.key.password = null
	ssl.cipher.suites = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.service.name = null
	ssl.provider = null
	max.in.flight.requests.per.connection = 5
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092]
	client.id = producer-1
	max.request.size = 1048576
	acks = 1
	linger.ms = 0
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	metadata.fetch.timeout.ms = 60000
	ssl.endpoint.identification.algorithm = null
	ssl.keystore.location = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	block.on.buffer.full = false
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	metrics.sample.window.ms = 30000
	security.protocol = PLAINTEXT
	metadata.max.age.ms = 300000
	ssl.protocol = TLS
	sasl.kerberos.min.time.before.relogin = 60000
	timeout.ms = 30000
	connections.max.idle.ms = 540000
	ssl.trustmanager.algorithm = PKIX
	metric.reporters = []
	ssl.truststore.type = JKS
	compression.type = gzip
	retries = 0
	max.block.ms = 60000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	send.buffer.bytes = 131072
	reconnect.backoff.ms = 50
	metrics.num.samples = 2
	ssl.keystore.type = JKS

798  [main] INFO  o.a.k.c.u.AppInfoParser - Kafka version : 0.10.0.1
798  [main] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
async message:hello:1
async message:hello:2
async message:hello:3
async message:hello:4
async message:hello:5
async message:hello:6
async message:hello:7
async message:hello:8
async message:hello:9
async message:hello:10
async message:hello:11
async message:hello:12
async message:hello:13
async message:hello:14
async message:hello:15
async message:hello:16
async message:hello:17
async message:hello:18
async message:hello:19
async message:hello:20
async message:hello:21
async message:hello:22
async message:hello:23
async message:hello:24
async message:hello:25
async message:hello:26
async message:hello:27
async message:hello:28
async message:hello:29
async message:hello:30
async message:hello:31
async message:hello:32
async message:hello:33
async message:hello:34
async message:hello:35
async message:hello:36
async message:hello:37
async message:hello:38
async message:hello:39
async message:hello:40
async message:hello:41
async message:hello:42
async message:hello:43
async message:hello:44
async message:hello:45
async message:hello:46
async message:hello:47
async message:hello:48
async message:hello:49
async message:hello:50
async message:hello:51
async message:hello:52
async message:hello:53
async message:hello:54
async message:hello:55
async message:hello:56
async message:hello:57
async message:hello:58
async message:hello:59
async message:hello:60
async message:hello:61
async message:hello:62
async message:hello:63
async message:hello:64
async message:hello:65
async message:hello:66
async message:hello:67
async message:hello:68
async message:hello:69
async message:hello:70
async message:hello:71
async message:hello:72
async message:hello:73
async message:hello:74
async message:hello:75
async message:hello:76
async message:hello:77
async message:hello:78
async message:hello:79
async message:hello:80
async message:hello:81
async message:hello:82
async message:hello:83
async message:hello:84
async message:hello:85
async message:hello:86
async message:hello:87
async message:hello:88
async message:hello:89
async message:hello:90
async message:hello:91
async message:hello:92
async message:hello:93
async message:hello:94
async message:hello:95
async message:hello:96
async message:hello:97
async message:hello:98
async message:hello:99
async message:hello:100

消费者:

ExecutorService fixedPool = Executors.newFixedThreadPool(3);
		fixedPool.execute(new Runnable() {
			public void run() {
				Properties props = new Properties();
				props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST);
				props.put("group.id", KafkaCfg.GROUP_ID);
				props.put("zookeeper.session.timeout.ms", "60000");
				props.put("zookeeper.sync.time.ms", "200");
				props.put("enable.auto.commit", "true"); // 自动commit
				props.put("auto.commit.interval.ms", "1000");
				//latest, earliest, none
				props.put("auto.offset.reset", "earliest");
				props.put("key.deserializer", IntegerDeserializer.class);
				props.put("value.deserializer", StringDeserializer.class);
				KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
				consumer.subscribe(Arrays.asList(KafkaCfg.TOPIC, KafkaCfg.TOPIC2)); // 可消费多个topic,组成一个list
				while (true) {
					ConsumerRecords<Integer, String> records = consumer.poll(100);
					for (ConsumerRecord<Integer, String> record : records) {
						System.out.println("record:"+new Gson().toJson(record));
					}
				}
				
			}
		});
record:{"topic":"test-1","partition":0,"offset":17,"timestamp":1505629339505,"timestampType":"CREATE_TIME","checksum":3084842117,"serializedKeySize":4,"serializedValueSize":7,"key":1,"value":"hello:2"}
record:{"topic":"test-1","partition":0,"offset":18,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2036504617,"serializedKeySize":4,"serializedValueSize":7,"key":7,"value":"hello:8"}
record:{"topic":"test-1","partition":0,"offset":19,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2096183246,"serializedKeySize":4,"serializedValueSize":7,"key":8,"value":"hello:9"}
record:{"topic":"test-1","partition":0,"offset":20,"timestamp":1505629339524,"timestampType":"CREATE_TIME","checksum":1567468433,"serializedKeySize":4,"serializedValueSize":8,"key":14,"value":"hello:15"}
record:{"topic":"test-1","partition":0,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":2250809392,"serializedKeySize":4,"serializedValueSize":8,"key":15,"value":"hello:16"}
record:{"topic":"test-1","partition":0,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":795944797,"serializedKeySize":4,"serializedValueSize":8,"key":17,"value":"hello:18"}
record:{"topic":"test-1","partition":0,"offset":23,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":1596880373,"serializedKeySize":4,"serializedValueSize":8,"key":21,"value":"hello:22"}
record:{"topic":"test-1","partition":0,"offset":24,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":2549012433,"serializedKeySize":4,"serializedValueSize":8,"key":26,"value":"hello:27"}
record:{"topic":"test-1","partition":0,"offset":25,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3946489373,"serializedKeySize":4,"serializedValueSize":8,"key":30,"value":"hello:31"}
record:{"topic":"test-1","partition":0,"offset":26,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":4171966126,"serializedKeySize":4,"serializedValueSize":8,"key":32,"value":"hello:33"}
record:{"topic":"test-1","partition":0,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3143199368,"serializedKeySize":4,"serializedValueSize":8,"key":33,"value":"hello:34"}
record:{"topic":"test-1","partition":0,"offset":28,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":889962223,"serializedKeySize":4,"serializedValueSize":8,"key":35,"value":"hello:36"}
record:{"topic":"test-1","partition":0,"offset":29,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":658609139,"serializedKeySize":4,"serializedValueSize":8,"key":38,"value":"hello:39"}
record:{"topic":"test-1","partition":0,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":1769068338,"serializedKeySize":4,"serializedValueSize":8,"key":42,"value":"hello:43"}
record:{"topic":"test-1","partition":0,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":3207409220,"serializedKeySize":4,"serializedValueSize":8,"key":44,"value":"hello:45"}
record:{"topic":"test-1","partition":2,"offset":17,"timestamp":1505629339518,"timestampType":"CREATE_TIME","checksum":3419956930,"serializedKeySize":4,"serializedValueSize":7,"key":2,"value":"hello:3"}
record:{"topic":"test-1","partition":2,"offset":18,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2857189508,"serializedKeySize":4,"serializedValueSize":7,"key":5,"value":"hello:6"}
record:{"topic":"test-1","partition":2,"offset":19,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2866062050,"serializedKeySize":4,"serializedValueSize":7,"key":6,"value":"hello:7"}
record:{"topic":"test-1","partition":2,"offset":20,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":577748521,"serializedKeySize":4,"serializedValueSize":8,"key":12,"value":"hello:13"}
record:{"topic":"test-1","partition":2,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1649992521,"serializedKeySize":4,"serializedValueSize":8,"key":16,"value":"hello:17"}
record:{"topic":"test-1","partition":0,"offset":32,"timestamp":1505629339533,"timestampType":"CREATE_TIME","checksum":2322283505,"serializedKeySize":4,"serializedValueSize":8,"key":47,"value":"hello:48"}
record:{"topic":"test-1","partition":0,"offset":33,"timestamp":1505629339535,"timestampType":"CREATE_TIME","checksum":2329901557,"serializedKeySize":4,"serializedValueSize":8,"key":48,"value":"hello:49"}
record:{"topic":"test-1","partition":2,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":3854334725,"serializedKeySize":4,"serializedValueSize":8,"key":18,"value":"hello:19"}
record:{"topic":"test-1","partition":2,"offset":23,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1792756199,"serializedKeySize":4,"serializedValueSize":8,"key":19,"value":"hello:20"}
record:{"topic":"test-1","partition":2,"offset":24,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2514692525,"serializedKeySize":4,"serializedValueSize":8,"key":22,"value":"hello:23"}
record:{"topic":"test-1","partition":2,"offset":25,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1562610569,"serializedKeySize":4,"serializedValueSize":8,"key":25,"value":"hello:26"}
record:{"topic":"test-1","partition":2,"offset":26,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3501401355,"serializedKeySize":4,"serializedValueSize":8,"key":28,"value":"hello:29"}
record:{"topic":"test-1","partition":2,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":2946838050,"serializedKeySize":4,"serializedValueSize":8,"key":31,"value":"hello:32"}
record:{"topic":"test-1","partition":2,"offset":28,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":2695171007,"serializedKeySize":4,"serializedValueSize":8,"key":36,"value":"hello:37"}
record:{"topic":"test-1","partition":2,"offset":29,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":3877831509,"serializedKeySize":4,"serializedValueSize":8,"key":40,"value":"hello:41"}
record:{"topic":"test-1","partition":2,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":2747042666,"serializedKeySize":4,"serializedValueSize":8,"key":41,"value":"hello:42"}
record:{"topic":"test-1","partition":2,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":4222789243,"serializedKeySize":4,"serializedValueSize":8,"key":45,"value":"hello:46"}
record:{"topic":"test-1","partition":2,"offset":32,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":830470691,"serializedKeySize":4,"serializedValueSize":8,"key":46,"value":"hello:47"}
record:{"topic":"test-1","partition":1,"offset":19,"timestamp":1505629339461,"timestampType":"CREATE_TIME","checksum":27654439,"serializedKeySize":4,"serializedValueSize":7,"key":0,"value":"hello:1"}
record:{"topic":"test-1","partition":1,"offset":20,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2877195336,"serializedKeySize":4,"serializedValueSize":7,"key":3,"value":"hello:4"}
record:{"topic":"test-1","partition":1,"offset":21,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2833341777,"serializedKeySize":4,"serializedValueSize":7,"key":4,"value":"hello:5"}
record:{"topic":"test-1","partition":1,"offset":22,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":1116560893,"serializedKeySize":4,"serializedValueSize":8,"key":9,"value":"hello:10"}
record:{"topic":"test-1","partition":1,"offset":23,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2285896101,"serializedKeySize":4,"serializedValueSize":8,"key":10,"value":"hello:11"}
record:{"topic":"test-1","partition":1,"offset":24,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":672893159,"serializedKeySize":4,"serializedValueSize":8,"key":11,"value":"hello:12"}
record:{"topic":"test-1","partition":1,"offset":25,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":1637741071,"serializedKeySize":4,"serializedValueSize":8,"key":13,"value":"hello:14"}
record:{"topic":"test-1","partition":2,"offset":33,"timestamp":1505629339543,"timestampType":"CREATE_TIME","checksum":3620398696,"serializedKeySize":4,"serializedValueSize":8,"key":51,"value":"hello:52"}
record:{"topic":"test-1","partition":2,"offset":34,"timestamp":1505629339545,"timestampType":"CREATE_TIME","checksum":242342934,"serializedKeySize":4,"serializedValueSize":8,"key":52,"value":"hello:53"}
record:{"topic":"test-1","partition":2,"offset":35,"timestamp":1505629339547,"timestampType":"CREATE_TIME","checksum":2840039757,"serializedKeySize":4,"serializedValueSize":8,"key":53,"value":"hello:54"}
record:{"topic":"test-1","partition":1,"offset":26,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":464649674,"serializedKeySize":4,"serializedValueSize":8,"key":20,"value":"hello:21"}
record:{"topic":"test-1","partition":1,"offset":27,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":3591464331,"serializedKeySize":4,"serializedValueSize":8,"key":23,"value":"hello:24"}
record:{"topic":"test-1","partition":1,"offset":28,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2254864424,"serializedKeySize":4,"serializedValueSize":8,"key":24,"value":"hello:25"}
record:{"topic":"test-1","partition":1,"offset":29,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3670479813,"serializedKeySize":4,"serializedValueSize":8,"key":27,"value":"hello:28"}
record:{"topic":"test-1","partition":1,"offset":30,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1843557739,"serializedKeySize":4,"serializedValueSize":8,"key":29,"value":"hello:30"}
record:{"topic":"test-1","partition":1,"offset":31,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":1905538768,"serializedKeySize":4,"serializedValueSize":8,"key":34,"value":"hello:35"}
record:{"topic":"test-1","partition":1,"offset":32,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3985428395,"serializedKeySize":4,"serializedValueSize":8,"key":37,"value":"hello:38"}
record:{"topic":"test-1","partition":1,"offset":33,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3427427349,"serializedKeySize":4,"serializedValueSize":8,"key":39,"value":"hello:40"}
record:{"topic":"test-1","partition":1,"offset":34,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":713267988,"serializedKeySize":4,"serializedValueSize":8,"key":43,"value":"hello:44"}
record:{"topic":"test-1","partition":1,"offset":35,"timestamp":1505629339536,"timestampType":"CREATE_TIME","checksum":813675607,"serializedKeySize":4,"serializedValueSize":8,"key":49,"value":"hello:50"}
record:{"topic":"test-1","partition":1,"offset":36,"timestamp":1505629339541,"timestampType":"CREATE_TIME","checksum":2006019882,"serializedKeySize":4,"serializedValueSize":8,"key":50,"value":"hello:51"}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

Hive Load本地数据文件异常分析

使用beeline登录HiveServer2向my_table表Load本地数据文件时报如下异常:

69140
来自专栏程序员的SOD蜜

SOD框架--系统概要

SOD框架(源PDF.NETE框架)系统概要介绍 --核心三大功能(S,O,D): SQL-MAP   XML SQL config and Map DAL  ...

30640
来自专栏码字搬砖

CDH impala hive hbase 整合

最终达到的效果是这样的:hbase是数据的存储中心。impala、hive可以增查habse数据,不支持insert overwrite impala...

42530
来自专栏杨建荣的学习笔记

一个oracle查询引起的bug (r4笔记第59天)

任何软件都不是完美的,oracle也是如此,隔一段时间就会收到oracle的邮件说建议打哪些安全补丁什么的。新发布的产品都是release 1,比如10gR1...

41160
来自专栏mantou大数据

【大数据安全】CDH集群禁用Kerberos

在调试Kerberos的时候可能会有需要禁用的场景。以下是各组件禁用Kerberos的相关配置。

43730
来自专栏IT 指南者专栏

SSM 框架快速整合实例--学生查询

SSM 框架即 Spring 框架、SpringMVC 框架、MyBatis 框架,关于这几个框架的基础和入门程序,我前面已经写过几篇文章作为基础和入门介绍了。

17020
来自专栏杨建荣的学习笔记

分分钟搭建MySQL Group Replication测试环境(r11笔记第82天)

最近看了下MySQL 5.7中的闪亮特性Group Replication,也花了不少做了些测试,发现有些方面的表现确实不赖。当然要模拟这么一套环境还是需...

38670
来自专栏IT 指南者专栏

SSM 框架快速整合实例--学生查询

SSM 框架即 Spring 框架、SpringMVC 框架、MyBatis 框架,关于这几个框架的基础和入门程序,我前面已经写过几篇文章作为基础和入门介绍了。...

586140
来自专栏杨建荣的学习笔记

dg broker配置的问题及分析 (r7笔记第22天)

今天在配置一个备库的时候碰到了一些问题,话说配置dg broker真没什么特别需要注意的细节了,本身已经给DBA省了很大的事儿了。 但是有时候就是会出现一些稀奇...

40040
来自专栏二进制文集

CentOS 7 安装 MYSQL 5.6

MySQL官方文档 - 2.5.1 Installing MySQL on Linux Using the MySQL Yum Repository RHEL...

56920

扫码关注云+社区

领取腾讯云代金券