flume RPC 接口开发

创建项目

使用mvn创建项目

mvn archetype:generate -DgroupId=com.youcash.nummy -DartifactId=data-transformation -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

编辑pom.xml,添加相关依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.youcash.nummy</groupId>
    <artifactId>data-transformation</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>data-transformation</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.6.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.6.0</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.youcash.nummy.App</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

编辑代码

编辑App.java:

package com.youcash.nummy;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class App {
  public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("192.168.64.5", 41414);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MyRpcClientFacade {
  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }

}

打包Jar

mvn clean
mvn package

启动flume

编辑配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume:

./bin/flume-ng agent -n a1 -c conf/ -f conf/rpc.conf -Dflume.root.logger=INFO, console

启动kafka

# 启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

执行Jar包

java -jar data-transformation-1.0-SNAPSHOT-shaded.jar 

然后在kafka的终端即可看到发送的消息。

Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

Windows Server AppFabric Beta 2 已经发布

Windows Server AppFabric Beta 2是一个包含完全功能的AppFabric版本(This build represents our “...

1815
来自专栏一个会写诗的程序员的博客

SpringBoot集成日志logback.groovy报错: Groovy classes are not available on the class path. ABORTING INITIAL

SpringBoot集成日志logback.groovy报错: Groovy classes are not available on the class pa...

1063
来自专栏大大的微笑

ActiveMQ几个重要的配置文件

version:5.10,在5.8以后增加了levelDB的方式进行集群配置 ①.wrapper.conf: # -----------------------...

1.1K9
来自专栏Ryan Miao

Spring Boot文档阅读

原因之初 最初习惯百度各种博客教程,然后跟着操作,因为觉得跟着别人走过的路走可以少走很多弯路,省时间。然而,很多博客的内容并不够完整,甚至错误,看多了的博客甚至...

5997
来自专栏北京马哥教育

性能测试与持续集成(JMeter+Jenkins)

目的 将性能测试与持续集成挂接起来 性能测试: JMeter 持续集成: Jenkins JMeter 下载JMeter,官网: http://jmeter.a...

4595
来自专栏别先生

SpringMVC的初始

1:其实一开始对SSH和SSM并不是很熟悉的,对SSH可能熟悉些(Struts,Spring,Hibernate)这三个框架。但是由于框架的更新,和出现了更好的...

2269
来自专栏全栈架构

Spring Boot 与 Kotlin 处理Web表单提交

我们在做web开发的时候,肯定逃不过表单提交,这篇文章通过Spring Boot使用Kotlin 语言 创建和提交一个表单。

762
来自专栏乐沙弥的世界

MySQL PXC 5.7 invalid user‘@MYSQLD_USER@’

最近的PXC 5.7启动的时候收到一个无效用户的提示,invalid user ‘@MYSQLD_USER@’,这个问题还真是头一次碰到,而且这个MYSQLD_...

1273
来自专栏编程坑太多

springboot(13)国际化

1864
来自专栏闵开慧

ERROR security.UserGroupInformation: Priviledge...

"Failed to set permissions of path"问题 参考文献:https://issues.apache.org/jira/browse...

3569

扫码关注云+社区

领取腾讯云代金券