【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[61]篇文章,欢迎阅读和收藏】
1 基本概念
消息系统的组成是生产者、存储系统和消费者,消费者会从存储系统读取生产者写入的消息。Kafka 作为消息系统在主题当中保存消息的信息。生产者向主题写入数据,消费者从主题读取数据。由于 Kafka 的特性是支持分布式,同时也是基于分布式的,所以主题也是可以在多个节点上被分区和覆盖的。Kafka 集成了 Producer/Consumer 链接的 Broker 的客户端工具,但是在消息处理方便,这两者主要用于服务端( Broker )的简单操作,如:创建 Topic 、罗列出已存在的 Topic 、对已有的 Topic 的 Producer/Consumer 测试。跟其他消息系统一样, Kafka 提供了多种不同语言实现的客户端 API ,如 Java 、 Python 、 Ruby 、 Go 等。这些 API 极大的方便用户使用 Kafka 集群。
2 术语解释
Kafka 包含四种核心的 API
1、 Producer API :支持应用将数据流发送到 Kafka 集群的主题。
2、 Consumer API :支持应用从 Kafka 集群的主题中读取数据流。
3、 Streams API :支持数据流从输入主题转化到输出主题
4、 Connect API :支持实现持续的从一些源系统或应用划入 Kafka 或者从 Kafka 推入一些原系统或应用的接口。
3 基于 Application 的生产者消费者实例
1、 创建 Kafka 生产者
创建 Kafka 生产者有三个基本属性:
n Bootstrap.servers: 指定了生产者建立初始连接的 broke 列表
n Key.serializer: 指定了用来序列化键值的类
n Value.serializer: 指定了用来序列化消息记录的类,与 key.serializer 差不多
创建完生产者后,就可以发送消息, Kafka 中有三种发送消息的方式:
n 只发不管结果( fire-and-forget ) : 只调用接口发送消息到 Kafka 服务器,但不管成功写入与否。由于 Kafka 是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。
n 同步发送( Synchronous send ):调用 send ()方法返回一个 Future 对象,我们可以使用它的 get ()方法来判断消息发送是否成功。
n 异步发送( Asynchronous send ):调用 send ()时提供一个回调方法,当接受到 broker 结果后回调此方法 .
下面是单线程发送的简单代码:
public static void produce() throws Exception {
//topic
String topic = "mytopic";
// 配置
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
// 序列化类型 properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者
KafkaProducer pro = new KafkaProducer(properties);
for (int i=0;i
pro.send(new ProducerRecord(topic, "message: " + i++));
//sleep
Thread.sleep(1000);
}
2、 创建消费者:
创建 Kafka 消费者有两个基本属性:
n Zookeeper.connect: 声明 zookeeper 服务器和端口
n Group.id :指定消费者分组
示例代码如下:
private String topic;
public MyConsumer(String topic){
super();
this.topic = topic;
}
public void run() {
ConsumerConnector consumer = createConsumer();
Map topicCountMap = new HashMap();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "127.0.0.1:2181");// 声明 zk
properties.put("group.id", "group5");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的 topic 数据
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
领取专属 10元无门槛券
私享最新 技术干货