首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Source/Sink探究与实践:RocketMQ数据写入HBase

Flink Source/Sink探究与实践:RocketMQ数据写入HBase

作者头像
王知无-import_bigdata
发布2019-12-05 15:23:14
2K0
发布2019-12-05 15:23:14
举报

前言

最近我们正在尝试把原有的一些Spark Streaming任务改造成Flink Streaming任务,自定义Source和Sink是遇到的第一个主要问题,稍微记录一下。

Flink既可以做流处理,也可以做批处理。不管哪种处理方式,都要有数据来源(输入)和数据汇集(输出),前者叫做Source,后者叫做Sink。Flink已经默认包含了最基本的Source和Sink(文件、Socket等)。另外也有些常用的与第三方组件交互的Source和Sink,这些叫做连接器(Connectors),如与HDFS、Kafka、ElasticSearch等对接的连接器。对于那些没有默认实现的数据源和数据汇,就必须自己动手丰衣足食了。

SourceFunction

SourceFunction是定义Flink Source的根接口,其源码如下。

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    
    void cancel();

    interface SourceContext<T> {
        void collect(T element);

        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        void emitWatermark(Watermark mark);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

SourceFunction接口定义了run()方法,该方法用于源源不断地产生源数据,因此重写的时候一般都写成循环,用标志位控制是否结束。cancel()方法则用来打断run()方法中的循环,终止产生数据的过程。

SourceFunction中还嵌套定义了SourceContext接口,它表示这个Source对应的上下文,用来发射数据。其中起主要作用的是前三个方法:

  • collect():发射一个不带自定义时间戳的元素。如果流程序的时间特征(TimeCharacteristic)是处理时间(ProcessingTime),元素没有时间戳;如果是摄入时间(IngestionTime),元素会附带系统时间;如果是事件时间(EventTime),那么初始没有时间戳,但一旦要做与时间戳相关的操作(如窗口)时,就必须用TimestampAssigner设定一个。
  • collectWithTimestamp():发射一个带有自定义时间戳的元素。该方法对于时间特征为事件时间的程序是绝对必须的,如果为处理时间就会被直接忽略,如果为摄入时间就会被系统时间覆盖。
  • emitWatermark():发射一个水印,仅对于事件时间有效。一个带有时间戳t的水印表示不会有任何t' <= t的事件再发生,如果发生,会被当做迟到事件忽略掉。

SourceFunction还有一些其他实现,如:

  • ParallelSourceFunction,表示该Source可以按照设置的并行度并发执行。
  • RichSourceFunction,继承自富函数RichFunction,表示该Source可以感知到运行时上下文(RuntimeContext,如Task、State、并行度的信息),以及可以自定义初始化和销毁逻辑(通过open()/close()方法)。
  • RichParallelSourceFunction,以上两者的综合。
RocketMQ Source

我们有些老旧业务的消息总线采用的是RocketMQ。在自己造轮子实现对应的Source之前,先去GitHub上的rocketmq-externals项目看了一眼,发现已经有了对应的连接器(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink),免去了自己实现的麻烦。下面来看一下RocketMQSource类的源码,比较长,但写得很漂亮,也容易理解。

public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
    implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);

    private transient MQPullConsumerScheduleService pullConsumerScheduleService;
    private DefaultMQPullConsumer consumer;
    private KeyValueDeserializationSchema<OUT> schema;

    private RunningChecker runningChecker;

    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;
    private LinkedMap pendingOffsetsToCommit;

    private Properties props;
    private String topic;
    private String group;

    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

    private transient volatile boolean restored;
    private transient boolean enableCheckpoint;

    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
        this.schema = schema;
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.debug("source open....");
        Validate.notEmpty(props, "Consumer properties can not be empty");
        Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");

        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);

        Validate.notEmpty(topic, "Consumer topic can not be empty");
        Validate.notEmpty(group, "Consumer group can not be empty");

        this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();

        if (offsetTable == null) {
            offsetTable = new ConcurrentHashMap<>();
        }
        if (restoredOffsets == null) {
            restoredOffsets = new ConcurrentHashMap<>();
        }
        if (pendingOffsetsToCommit == null) {
            pendingOffsetsToCommit = new LinkedMap();
        }

        runningChecker = new RunningChecker();
        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
        RocketMQConfig.buildConsumerConfigs(props, consumer);
    }

    @Override
    public void run(SourceContext context) throws Exception {
        LOG.debug("source run....");

        final Object lock = context.getCheckpointLock();

        int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);

        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
        int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
        int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);

        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
        pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
                try {
                    long offset = getMessageQueueOffset(mq);
                    if (offset < 0) {
                        return;
                    }

                    PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
                    boolean found = false;
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messages = pullResult.getMsgFoundList();
                            for (MessageExt msg : messages) {
                                byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
                                byte[] value = msg.getBody();
                                OUT data = schema.deserializeKeyAndValue(key, value);

                                synchronized (lock) {
                                    context.collectWithTimestamp(data, msg.getBornTimestamp());
                                }
                            }
                            found = true;
                            break;
                        case NO_MATCHED_MSG:
                            LOG.debug("No matched message after offset {} for queue {}", offset, mq);
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            LOG.warn("Offset {} is illegal for queue {}", offset, mq);
                            break;
                        default:
                            break;
                    }

                    synchronized (lock) {
                        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    }

                    if (found) {
                        pullTaskContext.setPullNextDelayTimeMillis(0);
                    } else {
                        pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });

        try {
            pullConsumerScheduleService.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

        runningChecker.setRunning(true);
        awaitTermination();
    }

    private void awaitTermination() throws InterruptedException {
        while (runningChecker.isRunning()) {
            Thread.sleep(50);
        }
    }

    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
        Long offset = offsetTable.get(mq);
        if (restored && offset == null) {
            offset = restoredOffsets.get(mq);
        }
        if (offset == null) {
            offset = consumer.fetchConsumeOffset(mq, false);
            if (offset < 0) {
                String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
                switch (initialOffset) {
                    case CONSUMER_OFFSET_EARLIEST:
                        offset = consumer.minOffset(mq);
                        break;
                    case CONSUMER_OFFSET_LATEST:
                        offset = consumer.maxOffset(mq);
                        break;
                    case CONSUMER_OFFSET_TIMESTAMP:
                        offset = consumer.searchOffset(mq, getLong(props,
                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                }
            }
        }
        offsetTable.put(mq, offset);
        return offsetTable.get(mq);
    }

    private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
        offsetTable.put(mq, offset);
        if (!enableCheckpoint) {
            consumer.updateConsumeOffset(mq, offset);
        }
    }

    @Override
    public void cancel() {
        LOG.debug("cancel ...");
        runningChecker.setRunning(false);

        if (pullConsumerScheduleService != null) {
            pullConsumerScheduleService.shutdown();
        }

        offsetTable.clear();
        restoredOffsets.clear();
        pendingOffsetsToCommit.clear();
    }

    @Override
    public void close() throws Exception {
        LOG.debug("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("snapshotState() called on closed source; returning null.");
            return;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
        }

        unionOffsetStates.clear();

        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());

        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));

        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            currentOffsets.put(entry.getKey(), entry.getValue());
        }

        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);

        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        LOG.debug("initialize State ...");

        this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
            OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { })));

        this.restored = context.isRestored();

        if (restored) {
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                }
            }
            LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
        } else {
            LOG.info("No restore state for the consumer.");
        }
    }

    @Override
    public TypeInformation<OUT> getProducedType() {
        return schema.getProducedType();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!runningChecker.isRunning()) {
            LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
            return;
        }

        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
        if (posInMap == -1) {
            LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
            return;
        }

        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap);

        for (int i = 0; i < posInMap; i++) {
            pendingOffsetsToCommit.remove(0);
        }

        if (offsets == null || offsets.size() == 0) {
            LOG.debug("Checkpoint state was empty.");
            return;
        }

        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
            consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
        }
    }
}

RocketMQSource在open()方法中校验并初始化了所有配置,并创建了拉模式的RocketMQ消费者线程。在run()方法中启动线程,不断执行注册的回调逻辑,拉取消息并调用collectWithTimestamp()方法发射消息数据与时间戳,然后更新Offset。发射数据与更新Offset的操作都用检查点锁保护。

该Source除了实现RichParallelSourceFunction接口之外,还另外实现了CheckpointedFunction接口,说明它支持检查点,对应的方法为snapshotState()和initializeState()。这不是本文要说的东西,之后再提。

SinkFunction

SinkFunction是自定义Sink的根接口,其源码如下。

public interface SinkFunction<IN> extends Function, Serializable {
    @Deprecated
    default void invoke(IN value) throws Exception {}

    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }

    @Public 
    interface Context<T> {
        long currentProcessingTime();

        long currentWatermark();

        Long timestamp();
    }
}

它的定义比SourceFunction要简单,只有一个invoke()方法,对收集来的每条数据都会调用它来处理。SinkFunction也有对应的上下文对象Context,可以从中获得当前处理时间、当前水印和时间戳。它也有衍生出来的富函数版本RichSinkFunction。

Flink内部提供了一个最简单的实现DiscardingSink。顾名思义,就是将所有汇集的数据全部丢弃。

@Public
public class DiscardingSink<T> implements SinkFunction<T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void invoke(T value) {}
}
HBase Sink

下面是自己实现的一个简陋的(当然用起来没问题的)HBase Sink。代码里写了部分注释。

public class CalendarHBaseSink extends RichSinkFunction<JSONObject> {
    private static final long serialVersionUID = -6140896663267559061L;

    private static final Logger LOGGER = LoggerFactory.getLogger(CalendarHBaseSink.class);
    private static byte[] CF_BYTES = Bytes.toBytes("f");
    private static TableName TABLE_NAME = TableName.valueOf("dw_hbase:calendar_record");
    private Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // HBase连接封装为单例
        // RichSinkFunction.open()方法是按并行度执行的,而创建HBase连接是个很贵的操作
        connection = HBaseConnection.get();
        LOGGER.info("Opened CalendarHBaseSink");
    }

    @Override
    public void close() throws Exception {
        super.close();
        HBaseConnection.close();
        LOGGER.info("Closed CalendarHBaseSink");
    }

    // 数据预先解析为JSON
    @Override
    public void invoke(JSONObject record, Context context) throws Exception {
        Integer eventType = record.getInteger("eventtype");
        // 获取HBase中的列名映射,列名尽量短就是了
        String qualifier = EventType.getColumnQualifier(eventType);
        if (qualifier == null) {
            return;
        }

        Long uid = record.getLong("uid");
        Integer recordDate = record.getInteger("dateline");
        String data = record.getString("data");
        Long uploadTime = record.getLong("updatetime");

       // try-with-resources语法。创建Table就很轻量级了
       // 为了提高写入效率,在并发大时还可以使用HBase的BufferedMutator
        try (Table table = connection.getTable(TABLE_NAME)) {
            // 以UID和记录日期作为主键。注意设计一个好的RowKey
            Put put = new Put(Bytes.toBytes(RowKeyUtil.getForCalendar(uid, recordDate)));
            // 将上传时间作为HBase时间戳写入
            put.addColumn(CF_BYTES, Bytes.toBytes(qualifier), uploadTime * 1000, Bytes.toBytes(data));
            table.put(put);
        }
    }
}

在写这个Sink的过程中由于粗心,出了两个小插曲。

一是程序写完在本地运行时,没有任何报错信息,但就是写入不了数据。Debug时发现上传时间的JSON Field名字搞错了,实际上抛出了NPE,但在正常运行时无法发现。

二是创建检查点频繁超时,并且过一段时间就会抛出HBase连接不成功的异常。这是因为本地hosts文件中没有正确配置新的HBase集群的域名导致的,修改hosts文件之后就好了。

Streaming主程序

将Source和Sink联系起来,提交到Flink on YARN集群,就可以跑了。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(300000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // at least once的检查点就够用了
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        // 逻辑很简单,并且RocketMQ会记录offset,不必非要用FsStateBackend
        env.setStateBackend(new MemoryStateBackend(true));

        Properties consumerProps = new Properties();
        consumerProps.setProperty(NAME_SERVER_ADDR, RocketMQConst.NAME_SERVER_TEST);
        consumerProps.setProperty(CONSUMER_OFFSET_RESET_TO, "latest");
        consumerProps.setProperty(CONSUMER_TOPIC, "calendar");
        consumerProps.setProperty(CONSUMER_TAG, "*");
        consumerProps.setProperty(CONSUMER_GROUP, "FLINK_STREAM_CALENDAR_TEST_1");

        env.addSource(new RocketMQSource<>(new JSONDeserializationSchema(), consumerProps))
            .name("calendar-rocketmq-source")
            .addSink(new CalendarHBaseSink())
            .name("calendar-hbase-sink");

        env.execute();
    }
其中将RocketMQ消息解析为JSON的Schema类也很简单。
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(300000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // at least once的检查点就够用了
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        // 逻辑很简单,并且RocketMQ会记录offset,不必非要用FsStateBackend
        env.setStateBackend(new MemoryStateBackend(true));

        Properties consumerProps = new Properties();
        consumerProps.setProperty(NAME_SERVER_ADDR, RocketMQConst.NAME_SERVER_TEST);
        consumerProps.setProperty(CONSUMER_OFFSET_RESET_TO, "latest");
        consumerProps.setProperty(CONSUMER_TOPIC, "calendar");
        consumerProps.setProperty(CONSUMER_TAG, "*");
        consumerProps.setProperty(CONSUMER_GROUP, "FLINK_STREAM_CALENDAR_TEST_1");

        env.addSource(new RocketMQSource<>(new JSONDeserializationSchema(), consumerProps))
            .name("calendar-rocketmq-source")
            .addSink(new CalendarHBaseSink())
            .name("calendar-hbase-sink");

        env.execute();
    }

在这里仍然用默认的处理时间作为时间特征,没有使用事件时间(即上面的uploadTime字段)。这是因为Flink中的水印目前是Operator级别的,而不是Key级别的。如果直接使用事件时间和水印的话,不同用户ID与记录日期之间的时间戳就会互相干扰,导致用户A的正常数据因为用户B的数据水印更改而被误识别为迟到数据。要解决这个问题,有两种思路:

  • 使用KeyedStream和分键状态(Keyed State),参见http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-td7288.html。由于我们采用(UID, 日期)的双字段作为Key,状态空间有可能会奇大无比,目前持保留意见。
  • 利用自带时间戳机制的外部存储。HBase的每个Cell都带有时间戳,在建表时设定版本数为1,就可以保证只留下最新的那条数据,这个对我们来说显然更实用。

来源:jianshu/p/97dae75c266c

作者:LittleMagic

欢迎点赞+收藏+转发朋友圈素质三连

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SourceFunction
  • RocketMQ Source
  • SinkFunction
  • HBase Sink
  • Streaming主程序
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档