前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flumeng-kafka-plugin

flumeng-kafka-plugin

作者头像
sanmutongzi
发布2020-03-04 14:33:46
3140
发布2020-03-04 14:33:46
举报
文章被收录于专栏:stream processstream process

github 参考地址:https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin

/* * Copyright (c) 2013.09.06 BeyondJ2EE. * * All right reserved. * * http://beyondj2ee.github.com * * This software is the confidential and proprietary information of BeyondJ2EE * * , Inc. You shall not disclose such Confidential Information and * * shall use it only in accordance with the terms of the license agreement * * you entered into with BeyondJ2EE. * * * * Revision History * * Author Date Description * * =============== ================ ====================================== * * beyondj2ee * */

package org.apache.flume.plugins;

/** * KAFKA Flume Sink (Kafka 0.8 Beta, Flume 1.4). * User: beyondj2ee * Date: 13. 9. 4 * Time: PM 4:32 */

import java.util.Properties;

import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;

import org.apache.commons.lang.StringUtils; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventHelper; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap;

/** * kafka sink. */ public class KafkaSink extends AbstractSink implements Configurable { // - [ constant fields ] ----------------------------------------

/** * The constant logger. */ private static final Logger LOGGER = LoggerFactory .getLogger(KafkaSink.class);

// - [ variable fields ] ---------------------------------------- /** * The Parameters. */ private Properties parameters; /** * The Producer. */ private Producer<String, String> producer; /** * The Context. */ private Context context;

private int i = 100;

// - [ interface methods ] ------------------------------------

/** * Configure void. * * @param context * the context */ @Override public void configure(Context context) {

this.context = context; ImmutableMap<String, String> props = context.getParameters();

parameters = new Properties(); for (String key : props.keySet()) { String value = props.get(key); this.parameters.put(key, value);

LOGGER.info("key is " + key + " value is " + value); } }

/** * Start void. */ @Override public synchronized void start() { super.start(); ProducerConfig config = new ProducerConfig(this.parameters); this.producer = new Producer<String, String>(config); }

/** * Process status. * * @return the status * @throws EventDeliveryException * the event delivery exception */ @Override public Status process() throws EventDeliveryException { Status status = null;

// Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to // do Event event = ch.take();

String partitionKey = (String) parameters .get(KafkaFlumeConstans.PARTITION_KEY_NAME); String encoding = StringUtils.defaultIfEmpty( (String) this.parameters .get(KafkaFlumeConstans.ENCODING_KEY_NAME), KafkaFlumeConstans.DEFAULT_ENCODING); String topic = Preconditions.checkNotNull((String) this.parameters .get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");

String eventData = new String(event.getBody(), encoding);

KeyedMessage<String, String> data;

// if partition key does'nt exist if (StringUtils.isEmpty(partitionKey)) { data = new KeyedMessage<String, String>(topic, eventData); } else { data = new KeyedMessage<String, String>(topic, partitionKey, eventData); }

// if (LOGGER.isInfoEnabled()) { // LOGGER.info("Send Message to Kafka *************************"); // } if (i == 0) { LOGGER.info("100 message send "); i = 100; } i = i - 1; producer.send(data); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { LOGGER.info("send data error ",t); throw (Error) t; } } finally { txn.close(); } return status; }

/** * Stop void. */ @Override public void stop() { producer.close(); } // - [ protected methods ] -------------------------------------- // - [ public methods ] ----------------------------------------- // - [ private methods ] ---------------------------------------- // - [ static methods ] ----------------------------------------- // - [ getter/setter methods ] ---------------------------------- // - [ main methods ] ------------------------------------------- }

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2014-06-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库一体机 TData
数据库一体机 TData 是融合了高性能计算、热插拔闪存、Infiniband 网络、RDMA 远程直接存取数据的数据库解决方案,为用户提供高可用、易扩展、高性能的数据库服务,适用于 OLAP、 OLTP 以及混合负载等各种应用场景下的极限性能需求,支持 Oracle、SQL Server、MySQL 和 PostgreSQL 等各种主流数据库。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档