前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >EMR(弹性MapReduce)入门之kafka实战(十五)

EMR(弹性MapReduce)入门之kafka实战(十五)

原创
作者头像
小司机带你入门EMR
修改2020-02-18 17:50:25
1.5K0
修改2020-02-18 17:50:25
举报
文章被收录于专栏:EMR冲鸭

kafka

介绍

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

消息传递模式是:发布—订阅模式。

Kafka主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。

同时支持离线数据处理和实时数据处理。

Scale out:支持在线水平扩展

优点

解耦、冗余、扩展性、灵活性和峰值的处理能力、可恢复性、顺序保证、缓冲、异步通信

工作原理

消息传递模式:发布—订阅模式

发布订阅模式
发布订阅模式

解释:

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息

kafka中的术语

  •  broker:中间的kafka cluster,存储消息,是由多个server组成的集群。
  •  topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据。

kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。这样,消息就以一个个id的方式,组织起来。

 producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾。

 consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。

id在kafka中称为offset,它的好处是

  1. 消费者可以根据需求,灵活制定offset消费。
  2. 保证了消息不变性,为并发消费提供了线程安全的保证。
  3. 消息访问的并行高效性。
  4. 增加消息系统的可伸缩性。
  5. 保证消息可靠性。
  6. 灵活的持久化策略。
  7. 备份高可用性。
  •  producer:往broker中某个topic里面生产数据。

producer生产消息需要如下参数:

  1. topic:往哪个topic生产消息。
  2. partition:往哪个partition生产消息。
  3. key:根据该key将消息分区到不同partition。
  4. message:消息。
  •  consumer:从broker中某个topic获取数据。

传统消息系统有两种模式:

点对点

 发布订阅

kafka通过consumer group将两种模式统一处理:每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。

  • Partitions

 每个Topics划分为一个或者多个Partition,并且Partition中的每条消息都被标记了一个sequential id ,也就是offset,并且存储的数据是可配置存储时间的 。

实战案例----kafka数据通过flume收集并存储到hbase

1、准备工作

  • 因为任务中需要访问腾讯云消息队列 CKafka,所以需要先创建一个 CKafka 实例,具体见 消息队列 CKafka。
  • 已经开通了腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置界面选择 Spark 组件。

2、在 EMR 集群使用 Kafka 工具包

首先需要查看 CKafka 的内网 IP 与端口号。登录消息队列 CKafka 的控制台,选择要使用的 CKafka 实例,在基本消息中查看其内网 IP 为 $kafkaIP,而端口号一般默认为 9092。在 topic 管理界面新建一个topic即可。

3、配置flume

  • 创建flume的配置文件hbase_kafka.properties
代码语言:java
复制
vim hbase_kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hbase_sink
# 以下配置source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# 以下配置sink
agent.sinks.hbase_sink.channel = mem_channel
agent.sinks.hbase_sink.table = foo_table
agent.sinks.hbase_sink.columnFamily = cf
agent.sinks.hbase_sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 以下配置channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
  • 创建hbase表
代码语言:javascript
复制
hbase shell
create 'foo_table','cf'
  • 运行flume
代码语言:javascript
复制
./bin/flume-ng agent --conf ./conf/-f hbase_kafka.properties -n agent -Dflume.root.logger=INFO,console
  • 运行kafka的producer
代码语言:javascript
复制
./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
hello
world

4、测试

  1. 在kafka生产者客户端数据信息并回车
  2. 观察hbase表中是否有相应数据

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka
    • 介绍
      • 优点
      • 工作原理
        • 消息传递模式:发布—订阅模式
          • kafka中的术语
          • 实战案例----kafka数据通过flume收集并存储到hbase
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档