flume 1.8.0 开发之RPC

RPC是flume开发中比较核心的部分。

flume开发基础可见:https://cloud.tencent.com/developer/article/1195082

RPC客户端接口

一个RPC客户端接口的实现,包含了支持Flume的RPC方法.用户的程序可以简单地调用Flume SDK客户端的append(Event)或者appendBatch(List<Event>)接口来发送数据,而不用考虑消息交互的细节.用户可以通过使用诸如SimpleEvent类,或者使用EventBuilder的 静态helper方法withBody(),便捷地实现直接提供事件接口所需的事件ARG.

RPC clients - Avro and Thrift

Flume 1.4.0时,Avro成为默认的RPC协议.NettyAvroRpcClient和ThriftRpcClient实现了RpcClient的接口.客户端需要建立一个包含目的Flume agent host和port信息的对象.使用RpcClient来发送数据到客户端.下例展示了如何在程序中使用flume client sdk api.

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class MyApp {
  public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("host.example.org", 41414);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MyRpcClientFacade {
  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }

}

远端的flume agent需要AvroSource在监听相关端口(或者是ThriftSource).下面是一个正在等待MyApp连接的flume agent示例配置文件.

a1.channels = c1 
a1.sources = r1 
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1 
a1.sources.r1.type = avro

# For using a thrift source set the following instead of the above line. # a1.source.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1 
a1.sinks.k1.type = logger

更灵活一点,Flume client实现((NettyAvroRpcClient and ThriftRpcClient) )可以如下配置:

client.type = default (for avro) or thrift (for thrift)
hosts = h1     # default client accepts only 1 host
                # (additional hosts will be ignored)
hosts.h1 = host1.example.org:41414
                # host and port must both be specified
                # (neither has a default)
batch-size = 100        # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)

Secure RPC client - Thrift

Flume 1.6.0时,Thrift source和sink支持基于kerberos的认证.客户端需要使用SecureRpcClientFactory的getThriftInstance方法来实现SecureThriftRpcClient.当你使用SecureRpcClientFactory时,kerberos认证模块需要放在classpath的flume-ng-auth路径下.客户端的主体和密钥表通过properties以参数形式传入.同时目的服务端的Thrift source也需要使用这样处理.用户的数据程序可以参考如下例子来使用SecureRpcClientFactory:

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;

public class MyApp {
  public static void main(String[] args) {
    MySecureRpcClientFacade client = new MySecureRpcClientFacade();
    // Initialize client with the remote Flume agent's host, port
    Properties props = new Properties();
    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
    props.setProperty("hosts", "h1");
    props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));

    // Initialize client with the kerberos authentication related properties
    props.setProperty("kerberos", "true");
    props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
    props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
    props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
    client.init(props);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MySecureRpcClientFacade {
  private RpcClient client;
  private Properties properties;

  public void init(Properties properties) {
    // Setup the RPC connection
    this.properties = properties;
    // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
    this.client = SecureRpcClientFactory.getThriftInstance(properties);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = SecureRpcClientFactory.getThriftInstance(properties);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }
}

远端的ThriftSource需要以kerberos模式启动.下面这个示例Flume agent配置文件用于等待MyApp的连接:

a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab


a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

failover client

这个class包含一个默认的Avro RPC客户端,来提供客户端的故障切换处理能力.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个故障切换组.目前failoverRPC客户端还不支持thrift.如果与选中的host agent通信出现错误,那么failover client会自动的选取列表中下一个host.例如:

// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);

FailoverRpcClient可以以下列参数更灵活的配置:

client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or
                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts
                                     # specified, 3 in this case). A '0'
                                     # value doesn't make much sense because
                                     # it will just cause an append call to
                                     # immmediately fail. A '1' value means
                                     # that the failover client will try only
                                     # once to send the Event, and if it
                                     # fails then there will be no failover
                                     # to a second client, so this value
                                     # causes the failover client to
                                     # degenerate into just a default client.
                                     # It makes sense to set this value to at
                                     # least the number of hosts that you
                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

负载均衡的RPC client

Flume Client SDK同样支持RPC的负载均衡.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个负载均衡组.同时支持随机和轮询两种负载均衡策略.你同样可以通过implement loadBalancingRpcClient$HostSelector接口在你自定义的class中实现选取算法.这种情况下,你需要在host-selector属性中填上你自定义类的FQCN(Full Qualified Class Name).目前负载均衡的RPC client同样没有支持thrift.

如果启用了backoff,则客户端将暂时将失败的主机列入黑名单,从而在给定的timeout时间内将它们排除为故障转移的host.当timeout时间到了,如果host仍然没有响应,那么这被认为是顺序故障,并且timeout时间会以指数方式增加,以避免在无响应的主机上长时间等待时卡住.

可以通过设置maxBackoff(以毫秒为单位)来配置最大backoff时间. maxBackoff默认值为30秒(在OrderSelector类中指定,它是两个负载平衡策略的超类). 退避超时将随着连续故障呈指数级增长,直至最大可能的退避超时(最大可能的退避限制为65536秒(约18.2小时)).

// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
//                                            // selection
props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively
                                  // becomes 30000 ms

// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);

LoadBalancingRpcClient可以以下列参数更灵活的配置:

client.type = default_loadbalance

hosts = h1 h2 h3                     # At least 2 hosts are required

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

backoff = false                      # Specifies whether the client should
                                     # back-off from (i.e. temporarily
                                     # blacklist) a failed host
                                     # (default: false).

maxBackoff = 0                       # Max timeout in millis that a will
                                     # remain inactive due to a previous
                                     # failure with that host (default: 0,
                                     # which effectively becomes 30000)

host-selector = round_robin          # The host selection strategy used
                                     # when load-balancing among hosts
                                     # (default: round_robin).
                                     # Other values are include "random"
                                     # or the FQCN of a custom class
                                     # that implements
                                     # LoadBalancingRpcClient$HostSelector

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

嵌入式的agent

Flume有一个嵌入式式的api,允许用户在他们的应用程序中嵌入.当不使用全部的source、sink、channel时,agent就会很轻量.具体而言,使用的source是一个特殊的嵌入式的source,事件通过EmbeddedAgent对象的put、putall方法来发送数据到source.Avro是唯一支持的sink,而channel只允许是File和Memory Channel.嵌入式的agent同样支持Interceptors.

注意:嵌入式agent依赖hadoop-core.jar.

嵌入式agent的配置类似于完整agent的配置.以下是一份详尽的可选配置列表:

加粗的为必选项.

属性名

默认

描述

source.type

embedded

只能选embedded source.

channel.type

-

memory或者file

channel.*

-

详见MemoryChannel和FileChannel的用户指引

sinks

-

sink名的列表

sink.type

-

必须为avro

sink.*

-

参考AvroSink的用户指引

processor.type

-

failover or load_balance

processor.*

-

参考failover or load_balance的用户指引

source.interceptors

-

空格分隔的interceptors列表

source.interceptors.*

-

每个独立source.interceptors配置

使用案例:

Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port",  "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");

EmbeddedAgent agent = new EmbeddedAgent("myagent");

agent.configure(properties);
agent.start();

List<Event> events = Lists.newArrayList();

events.add(event);
events.add(event);
events.add(event);
events.add(event);

agent.putAll(events);

...

agent.stop();

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏haifeiWu与他朋友们的专栏

阿里 RPC 框架 DUBBO 初体验

最近研究了一下阿里开源的分布式RPC框架dubbo,楼主写了一个 demo,体验了一下dubbo的功能。

3312
来自专栏偏前端工程师的驿站

Eclipse魔法堂:任务管理器

一、前言                                  Eclipse的任务管理器为我们提供一个方便的入口查看工程代办事宜,并定位到对应的代...

2138
来自专栏Netkiller

PHP 高级编程之多线程

PHP 高级编程之多线程 http://netkiller.github.io/journal/php.thread.html ---- 目录 1. 多线程环境...

4865
来自专栏阿杜的世界

Spring实战5-基于Spring构建Web应用主要内容

写在前面:关于Java Web,首先推荐一篇文章——写给java web一年左右工作经验的人,这篇文章的作者用精练的话语勾勒除了各种Java框架的缘由和最基本的...

1082
来自专栏java思维导图

Spring思维导图,让Spring不再难懂(ioc篇)

写在前面 写过java的都知道:所有的对象都必须创建;或者说:使用对象之前必须先创建。而使用ioc之后,你就可以不再手动创建对象,而是从ioc容器中直接获取对象...

3977
来自专栏java工会

Spring工作原理

内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建、...

871
来自专栏java工会

Spring工作原理

内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建、...

731
来自专栏玩转JavaEE

使用Spring Boot开发Web项目

按:最近公众号文章主要是整理一些老文章,以个人CSDN上的博客为主,也会穿插一些新的技术点。 ---- 前面两篇博客中我们简单介绍了Spring Boot项目的...

3005
来自专栏LuckQI

SpringBoot~SpringBatch 使用

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spr...

1283
来自专栏JMCui

Netty 系列四(ChannelHandler 和 ChannelPipeline).

    先来整体的介绍一下这篇博文要介绍的几个概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerC...

1192

扫码关注云+社区

领取腾讯云代金券