Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。 消息的发布描述为producer,消息的订阅描述为consumer,将中间的存储阵列称作broker(代理)。kafka是linkedin用于日志处理的分布式消息队列,同时支持离线和在线日志处理。kafka对消息保存时根据Topic进行归类,发送消息者就是Producer,消息接受者就是Consumer,每个kafka实例称为broker。然后三者都通过Zookeeper进行协调。 也即:
1、启动zookeeper的server
2、启动kafka的server
3、Producer生产数据,然后通过zookeeper找到broker,再讲数据push到broker进行保存
4、Consumer通过zookeeper找到broker,然后再主动pull数据
kafka存储是基于硬盘存储的,然而却有着快速的读写效率,一个 67200rpm STAT RAID5 的阵列,线性读写速度是 300MB/sec,如果是随机读写,速度则是 50K/sec。 虽然都知道内存读取速度会明显快于硬盘读写速度,但是kafka却通过线性读写的方式实现快速地读写。
学习kafka一定要理解好Topic,每个Topic被分成多个partition(区)。每条消息在partition中的位置称为offset(偏移量),类型为long型数字。消息即使被消费了,也不会被立即删除,而是根据broker里的设置,保存一定时间后再清除,比如log文件设置存储两天,则两天后,不管消息是否被消费,都清除。
broker也即中间的存储队列。我们将消息的収布(publish)暂时称作 producer,将消息的订阅(subscribe)表述为consumer,将中间的存储阵列称作 broker(代理)。
每个consumer属于一个consumer group。在kafka中,一个partition的消息只会被group中的一个consumer消费;可以认为一个group就是一个“订阅者”。一个Topic中的每个partition只会被一个“订阅者”中的一个consumer消费。
kafka集群几乎不需要维护任何Consumer和Producer的信息。这些信息由Zookeeper保存。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
Kafka好处:转自InfoQ:Kafka剖析
vi /etc/selinux/config
对配置文件进行修改,然后按ESC键,:wq保存退出
#SELINUX=enforcing
#SELINUXTYPE=targeted
SELINUX=disabled #增加
:wq! #保存退出
vi /etc/sysconfig/iptables
可以看到配置文件,然后按I键,insert一行
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
然后按ESC键,:wq保存退出
# Firewall configuration written by system-config-firewall
# Manual customization of this file is not recommended.
*filter
:INPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
-A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
-A INPUT -p icmp -j ACCEPT
-A INPUT -i lo -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -j REJECT --reject-with icmp-host-prohibited
-A FORWARD -j REJECT --reject-with icmp-host-prohibited
COMMIT
:wq! #保存退出
如果有安装yum的话,一般可以使用yum安装,下面给出网上一篇很不错的jdk安装教程,建议Linux安装的可以去linux公社找找教程 CentOS6安装JDK
cd进入相应文件夹,一般安装到/usr/local/src
cd /usr/local/src
wget下载文件
wget http://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
解压
tar -xzvf kafka_2.11-0.8.2.1.tgz
移动到安装目录
mv kafka_2.11-0.8.2.1 /usr/local/kafka
创建Kafka日志文件存放文件夹
mkdir /usr/local/kafka/logs/kafka
cd配置文件目录
cd /usr/local/kafka/config
VI编辑
vi server.properties
修改配置
broker.id=0
port=9092 #端口号
host.name=127.0.0.1 #服务器IP地址,修改为自己的服务器IP
log.dirs=/usr/local/kafka/logs/kafka #日志存放路径,上面创建的目录
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
:wq! #保存退出
创建一个目录安装
mkdir /usr/local/kafka/zookeeper
创建一个Zookeeper日志存放目录
mkdir /usr/local/kafka/logs/zookeeper
配置文件
cd /usr/local/kafka/config
vi zookeeper.properties
dataDir=/usr/local/kafka/zookeeper #zookeeper数据目录
dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日志目录
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
:wq! #保存退出
编写kafka的start脚本
cd /usr/local/kafka
使用vi创建脚本
vi kafkastart.sh
加入脚本代码,&符号表示在后台执行,然后:wq保存退出
#!/bin/sh
#启动zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒后执行
#启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
编写kafka的stop脚本
vi kafkastop.sh
脚本代码如,同样是:wq保存退出
#!/bin/sh
#关闭zookeeper
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒后执行
#关闭kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
添加脚本执行权限
chmod +x kafkastart.sh
chmod +x kafkastop.sh
vi /etc/rc.d/rc.local
设置开机时脚本在后台执行,使用&符号 将如下代码添加到rc.local里,同样使用VI编辑器
sh /usr/local/kafka/kafkastart.sh &
:wq保存退出
sh /usr/local/kafka/kafkastart.sh #启动kafka
usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 test
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --from-beginning