前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink之Datastram3

flink之Datastram3

原创
作者头像
用户11134802
发布2024-06-16 23:07:10
430
发布2024-06-16 23:07:10
举报
文章被收录于专栏:flink基础知识点flink基础知识点

七、Sink输出算子

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

老版本:

Flink1.12以前(当前使用的是flink1.17),Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

代码语言:javascript
复制
stream.addSink(new SinkFunction(…));

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

新版本:

代码语言:javascript
复制
stream.sinkTo(…)

Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架(doris也有了适配Flink的API ),也实现了一些其他第三方系统与Flink的连接器。

1、输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

代码语言:javascript
复制
 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

案例:

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 每个目录中,都有 并行度个数的 文件在写入
    env.setParallelism(2);

    // 必须开启checkpoint,否则一直都是 .inprogress
    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


    DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
            new GeneratorFunction<Long, String>() {
                @Override
                public String map(Long value) throws Exception {
                    return "Number:" + value;
                }
            },
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(1000),
            Types.STRING
    );

    DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

    // 输出到文件系统
    FileSink<String> fieSink = FileSink
            // 输出行式存储的文件,指定路径、指定编码
            .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
            // 输出文件的一些配置: 文件名的前缀、后缀
            .withOutputFileConfig(
                    OutputFileConfig.builder()
                            .withPartPrefix("atguigu-")
                            .withPartSuffix(".log")
                            .build()
            )
            // 按照目录分桶:如下,就是每个小时一个目录
            .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
            // 文件滚动策略:  1分钟 或 1m
            .withRollingPolicy(
                    DefaultRollingPolicy.builder()
                            .withRolloverInterval(Duration.ofMinutes(1))
                            .withMaxPartSize(new MemorySize(1024*1024))
                            .build()
            )
            .build();


    dataGen.sinkTo(fieSink);

    env.execute();
}

2、输出到kafka

步骤:

(1)添加Kafka 连接器依赖

(2)启动Kafka集群

(3)编写输出到Kafka的代码

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 如果是精准一次,必须开启checkpoint(后续章节介绍)
    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


    SingleOutputStreamOperator<String> sensorDS = env
            .socketTextStream("hadoop102", 7777);

    /**
     * Kafka Sink:
     * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
     * 1、开启checkpoint(后续介绍)
     * 2、设置事务前缀
     * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
     */
    KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
            // 指定 kafka 的地址和端口
            .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
            // 指定序列化器:指定Topic名称、具体的序列化
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.<String>builder()
                            .setTopic("ws")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build()
            )
            // 写到kafka的一致性级别: 精准一次、至少一次
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            // 如果是精准一次,必须设置 事务的前缀
            .setTransactionalIdPrefix("atguigu-")
            // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
            .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
            .build();


    sensorDS.sinkTo(kafkaSink);


    env.execute();
}

自定义序列化器,实现带key的record:

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    env.setRestartStrategy(RestartStrategies.noRestart());


    SingleOutputStreamOperator<String> sensorDS = env
            .socketTextStream("hadoop102", 7777);


    /**
     * 如果要指定写入kafka的key,可以自定义序列化器:
     * 1、实现 一个接口,重写 序列化 方法
     * 2、指定key,转成 字节数组
     * 3、指定value,转成 字节数组
     * 4、返回一个 ProducerRecord对象,把key、value放进去
     */
    KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
            .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
            .setRecordSerializer(
                    new KafkaRecordSerializationSchema<String>() {

                        @Nullable
                        @Override
                        public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                            String[] datas = element.split(",");
                            byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                            byte[] value = element.getBytes(StandardCharsets.UTF_8);
                            return new ProducerRecord<>("ws", key, value);
                        }
                    }
            )
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setTransactionalIdPrefix("atguigu-")
            .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
            .build();


    sensorDS.sinkTo(kafkaSink);


    env.execute();
}

在这里 要提一嘴 , 当从kafka获取数据的时候,即kafka成为source算子的时候,需要注意空值的传递,此时需要筛选控制

代码语言:javascript
复制
KafkaSource.<String>builder()
        .setBootstrapServers(Constant.KAFKA_BROKERS)
        .setGroupId(groupId)
        .setTopics(topic)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new DeserializationSchema<String>() {
            @Override
            public String deserialize(byte[] message) throws IOException {
                if (message != null) {
                    return new String(message, StandardCharsets.UTF_8);
                }
                return null;
            }

            @Override
            public boolean isEndOfStream(String nextElement) {
                return false;
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return Types.STRING;
            }
        })
        .build();

定义了一个自定义的 DeserializationSchema<String> 实例。在这个实例中:

deserialize(byte[] message) throws IOException 方法用于将字节数组形式的消息反序列化为字符串。它根据字节数组是否为空进行相应处理,将其转换为字符串,使用了指定的 StandardCharsets.UTF_8 字符集。

isEndOfStream(String nextElement) 方法用于判断是否到达流的结尾,这里返回 false 表示未到达。

getProducedType() 方法返回值的类型信息,这里明确为字符串类型。

通过这样的设置,确保了从 Kafka 中读取到的数据能够按照指定的方式正确地进行值的反序列化,以便后续程序进行处理和使用。例如,在后续的流程中,可以方便地将反序列化得到的字符串进行各种操作和分析。

3、输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。

(1)添加依赖

(2)启动MySQL,在目标数据库下建对应的表 , 此博客 在test库下建表ws

代码语言:javascript
复制
//ws对应的表结构
CREATE TABLE `ws` (
        `id` varchar(100) NOT NULL,
        `ts` bigint(20) DEFAULT NULL,
        `vc` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的代码

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);


    SingleOutputStreamOperator<WaterSensor> sensorDS = env
            .socketTextStream("hadoop102", 7777)
            .map(new WaterSensorMapFunction());


    /**
     * TODO 写入mysql
     * 1、只能用老的sink写法: addsink
     * 2、JDBCSink的4个参数:
     *    第一个参数: 执行的sql,一般就是 insert into
     *    第二个参数: 预编译sql, 对占位符填充值
     *    第三个参数: 执行选项 ---》 攒批、重试
     *    第四个参数: 连接选项 ---》 url、用户名、密码
     */
    SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
            "insert into ws values(?,?,?)",
            new JdbcStatementBuilder<WaterSensor>() {
                @Override
                public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                    //每收到一条WaterSensor,如何去填充占位符
                    preparedStatement.setString(1, waterSensor.getId());
                    preparedStatement.setLong(2, waterSensor.getTs());
                    preparedStatement.setInt(3, waterSensor.getVc());
                }
            },
            JdbcExecutionOptions.builder()
                    .withMaxRetries(3) // 重试次数
                    .withBatchSize(100) // 批次的大小:条数
                    .withBatchIntervalMs(3000) // 批次的时间
                    .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                    .withUsername("root")
                    .withPassword("000000")
                    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                    .build()
    );


    sensorDS.addSink(jdbcSink);


    env.execute();
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 七、Sink输出算子
    • 1、输出到文件
      • 2、输出到kafka
        • 自定义序列化器,实现带key的record:
      • 3、输出到MySQL(JDBC)
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档