flink1.7自定义source实现

flink读取source data

数据的来源是flink程序从中读取输入的地方。我们可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源添加到程序中。 flink附带大量预先实现好的各种读取数据源的函数,也可以通过为非并行源去实现SourceFunction接口或者为并行源实现ParallelSourceFunction接口或扩展RichParallelSourceFunction来编写满足自己业务需要的定制源。

flink预先实现好数据源

下面有几个预定义的流源可以从StreamExecutionEnvironment访问

基于文件

readTextFile(path): 读取文本文件,该文件要符合TextInputFormat规范,逐行读取并作为字符串返回。 readFile(fileInputFormat,path): 根据指定的文件输入格式指定读取文件。 readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo): 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。根据提供的watchType,该源可能会定期监视(每间隔ms)该路径下来到的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理当前路径中的数据后并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除文件的处理。

基于套接字

socketTextStream : 从套接字读取。元素可以用分隔符分隔。

基于集合

fromCollection(Collection) : 从Java Java.util.Collection创建一个数据流。集合中的所有元素必须是相同的类型。 fromCollection(Iterator,Class) :从迭代器创建数据流。该类要指定迭代器返回的元素的数据类型。 fromElements(T ...) :根据给定的对象序列创建数据流。所有对象必须是相同的类型。 fromParallelCollection(SplittableIterator,Class) : 并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。 generateSequence(from,to) : 在给定的区间内并行生成数字序列 。

自定义数据原

package com.intsmaze.flink.streaming.source;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

/**
* @Description: 自定义数据源的模板
* @Author: intsmaze
* @Date: 2019/1/4
*/ 
public class CustomSource {

    private static final int BOUND = 100;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource());

        inputStream.map(new InputMap()).print();

        env.execute("Intsmaze Custom Source");
    }


    /**
    * @Description: 
    * @Author: intsmaze
    * @Date: 2019/1/5
    */ 
    private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private Random rnd = new Random();

        private volatile boolean isRunning = true;
        private int counter = 0;

        /**
        * @Description: 
        * @Param: 
        * @return: 
        * @Author: intsmaze
        * @Date: 2019/1/5
        */ 
        @Override
        public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            while (isRunning && counter < BOUND) {
                int first = rnd.nextInt(BOUND / 2 - 1) + 1;
                int second = rnd.nextInt(BOUND / 2 - 1) + 1;
                ctx.collect(new Tuple2<>(first, second));
                counter++;
                Thread.sleep(50L);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }


    /**
    * @Description: 
    * @Param: 
    * @return: 
    * @Author: intsmaze
    * @Date: 2019/1/5
    */ 
    public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
            Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
                Exception {
            return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0);
        }
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java基础系列(五):数组

    在Java中,有一种数据结构叫做数组,它用来存储同一类型的值的集合。通过一个整型下标可以访问数组中的每一个值。例如,如果a是一个整型数组,那么a[i]就是数组中...

    Vi的技术博客
  • Android多线程的使用

    在很多编程语言中,线程都是一个重要的组成部分,多线程的支持可以给程序员更加灵活的程序功能实现代码编写方式,线程一般用于处理一些比较耗时的任务(下载文件、复制或者...

    指点
  • 异常、堆内存溢出、OOM的几种情况

    【情况一】:    java.lang.OutOfMemoryError: Java heap space:这种是java堆内存不够,一个原因是真不够,另一个...

    业余草
  • Java 基础系列(一):基础数据类型

    今天我们来聊一下Java这门语言的数据类型,众所周知,Java是一种强类型语言。在Java中,一共有8种基本类型,其中4种整形,2种浮点类型,1种用于表示Uni...

    Vi的技术博客
  • Java基础专题(三):字符串

    从概念上来讲,Java字符串就是Unicode字符序列。例如,"Java\u2122" 由5个Unicode字符J,a,v,a,和 ™。Java没有内置的字符串...

    Vi的技术博客
  • Java基础系列(四):控制流程

    和其他程序设计语言一样,Java使用条件语句和循环结构确定控制流程,在介绍这些条件语句和循环结构之前,我们先来了解一下块作用域这个概念。

    Vi的技术博客
  • Java基础系列(二):运算符

    计算机的最基本用途之一就是执行数学运算,作为一门计算机语言,Java也提供了一套丰富的运算符来操纵变量。我们可以把运算符分成以下几组:

    Vi的技术博客
  • Spring Boot 2.0 系列(一):快速开始

    Spring Boot可以使我们轻松地创建独立的、生产级的基于Spring的应用程序,由于整合了一些对Spring和第三方库的配置,我们可以快速开始一个应用程序...

    Vi的技术博客
  • Kotlin入门(20)几种常见的对话框

    手机上的App极大地方便了人们的生活,很多业务只需用户拇指一点即可轻松办理,然而这也带来了一定的风险,因为有时候用户并非真的想这么做,只是不小心点了一下而已,如...

    用户4464237
  • Kotlin入门(21)活动页面的跳转处理

    Activity的活动页面跳转是App最常用的功能之一,在前几章的demo源码中便多次见到了,常常是点击界面上的某个按钮,然后跳转到与之对应的下一个页面。对于A...

    用户4464237

扫码关注云+社区

领取腾讯云代金券