前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Fluentd-kafka插件用法详解

Fluentd-kafka插件用法详解

作者头像
Fluentd中文网
修改2020-06-11 10:13:17
5.7K0
修改2020-06-11 10:13:17
举报
文章被收录于专栏:Fluentd学习交流Fluentd学习交流

Fluentd支持从kafka订阅数据,同时支持向kafka发布数据。这两项功能集成在一个插件中:fluent-plugin-kafka,我们在下文中分别称之为输入插件和输出插件。

【安装说明】

通过以下命令安装fluent-plugin-kafka:

代码语言:javascript
复制
td-agent-gem install fluent-plugin-kafka

此插件需要Ruby版本不低于2.1,且输入插件要求源kafka版本不低于0.9,输出插件要求目的kafka版本不低于0.8。

如果要使用插件的zookeeper相关参数,需要安装zookeeper gem,可能还需要安装linux开发工具,如ruby-devel、gcc、make等。

【输入插件 - kafka】

插件以“单消费者”模式订阅kafka消息。

单消费者模式是指:每个kafka输入插件独立地订阅kafka消息。

这种模式可以满足极简单的应用场景。其缺点为:

  • 每次只能从一个topic获取消息
  • 如果有多个单消费者进程同时订阅相同的topic,进程之间无法协调如何分配不同的分区
  • 如果多个单消费者进程中某个进程挂掉,其他进程无法从该进程原先订阅位置进行恢复。

单消费者模式下,kafka输入插件配置说明如下:

代码语言:javascript
复制
<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>

  # Optionally, you can manage topic offset by using zookeeper
  offset_zookeeper    <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
  offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'

  # ruby-kafka consumer options
  max_bytes     (integer) :default => nil (Use default of ruby-kafka)
  max_wait_time (integer) :default => nil (Use default of ruby-kafka)
  min_bytes     (integer) :default => nil (Use default of ruby-kafka)
</source>
  • @type:插件类型,取值为kafka
  • brokers:逗号分隔的broker列表,每个broker需要指定ip和端口
  • topics:逗号分隔的topic列表
  • format:输入消息的格式,有text、json、ltsv、msgpack等几种
  • message_key:消息格式为text时,指定文本中message字段的名称
  • add_prefix:tag增加前缀
  • add_suffix:tag增加后缀

kafka输入插件以topic作为Fluentd内部事件的tag。如果订阅的topic为app_event,输入插件产生的tag就会是app_event。

add_prefix和add_suffix可用于修改tag值。比如:

代码语言:javascript
复制
add_prefix kafka

会将app_event修改为kafka.app_event。

单消费者模式支持单独设置每个topic的读取偏移。

配置说明如下:

代码语言:javascript
复制
<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  format <input text type (text|json|ltsv|msgpack)>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
</source>

【输入插件 - kafka_group】

插件以“消费者组”模式订阅kafka消息。

消费者组模式解决了单消费者模式存在的几个缺点,可以同时启动多个Fluentd进程协同工作。

配置说明如下:

代码语言:javascript
复制
<source>
  @type kafka_group

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  consumer_group <consumer group name, must set>
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>
  retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
  time_source <source for message timestamp (now|kafka|record)> :default => now
  time_format <string (Optional when use_record_time is used)>
 
  # ruby-kafka consumer options
  max_bytes               (integer) :default => 1048576
  max_wait_time           (integer) :default => nil (Use default of ruby-kafka)
  min_bytes               (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_interval  (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
  fetcher_max_queue_size  (integer) :default => nil (Use default of ruby-kafka)
  start_from_beginning    (bool)    :default => true
</source>
  • @type:插件类型,取值为kafka_group
  • consumer_group:设定消费者组名称,必选
  • time_source:指定日志事件中时间戳来源,可取now、kafka和record
  • time_format:当时间源为record时,设置时间格式以提取其中的时间戳
  • offset_commit_interval:设置offset提交时间间隔,默认10秒
  • offset_commit_threshold:插件可批量处理消息后再提交一次offset,此参数用于设置批量处理的消息数。默认为0,不采用批量提交机制。
  • start_from_beginning:true,从头开始消费topic;false,只消费新消息。默认为true。

【输出插件】

用于向kafka发布消息。

插件类型为kafka2,适用于fluentd v1.0及后续版本。

未来将使用out_kafka替代kafka2。

配置说明如下:

代码语言:javascript
复制
<match app.**>
  @type kafka2

  brokers               <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  topic_key             (string) :default => 'topic'
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool)   :default => false
  exclude_partition_key (bool)   :default => false
  exclude_partition     (bool)   :default => false
  exclude_message_key   (bool)   :default => false
  get_kafka_client_log  (bool)   :default => false
  headers               (hash)   :default => {}
  headers_from_record   (hash)   :default => {}
  use_default_for_unknown_topic (bool) :default => false

  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  # Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
  <inject>
    tag_key tag
    time_key time
  </inject>

  # See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
  # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
  <buffer topic>
    flush_interval 10s
  </buffer>

  # ruby-kafka producer options
  idempotent        (bool)    :default => false
  sasl_over_ssl     (bool)    :default => true
  max_send_retries  (integer) :default => 1
  required_acks     (integer) :default => -1
  ack_timeout       (integer) :default => nil (Use default of ruby-kafka)
  compression_codec (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
</match>
  • @type:插件类型,取值为kafka2
  • topic_key:设置目的topic取自日志记录中的哪个字段。 比如:topic_key为日志中的category字段,如果该字段的某个值为app,那么消息会被发布到kafka的名称为app的topic中。 需要注意的是,在插件的缓存配置中也需要设置该参数的取值。
代码语言:javascript
复制
topic_key category
<buffer category> # topic_key should be included in buffer chunk key
  # ...
</buffer>

如果你设置了topic_key为category,那么在<buffer>配置中也需要以此作为chunk的类型值。

  • default_topic:默认topic,若未设置topic_key,则topic取此处的值。
  • <format>:设置输出消息格式,支持json、ltsv或其他输出插件
  • required_acks:设置每个请求的ack数,可设置1、2这样的小的数字以提高性能。
  • compression_codec:设置输出消息的压缩方式,支持gzip和snappy。

【输出插件的负载均衡策略】 默认情况下,发布的消息会被随机分配到kafka topic的一个分区。

输出插件支持通过设置default_partition_key或partition_key_key的方式将消息分配到特定的分区中。 具有相同partition值的消息会被分配到同一个分区。

default_partition_key

partition_key_key

消息负载均衡方式

未设置

不存在

随机分配分区

已设置

不存在

分配到default_partition_key指定的分区

未设置

存在

含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息随机分配一个分区

已设置

存在

含有partition_key_key字段的消息被分配到该字段指定的分区;其他消息分配到default_partition_key指定的分区

【常见问题】

【Q】为什么fluent-plugin-kafka无法发送数据到kafka集群? 通常是由于插件使用的ruby-kafka和kafka集群版本不匹配导致的。

解决办法有两个:

  • 升级kafka集群到最新版本,最新版更快更健壮
  • 降级ruby-kafka或fluent-plugin-kafka以适配当前使用的kafka

我们这里只是简单介绍了一些fluent-plugin-kafka插件的使用规则,后续将会根据通过一些示例来进一步了解其用法。

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档