前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink实战】玩转Flink里面核心的Sink Operator实战

【Flink实战】玩转Flink里面核心的Sink Operator实战

作者头像
大数据小禅
发布2023-09-14 08:08:28
4530
发布2023-09-14 08:08:28
举报
文章被收录于专栏:YO大数据YO大数据

Flink Sink Operator简介

  • 在Flink中,Sink Operator(也称为Sink Function或Sink)是指负责将DataStream或DataSet的数据发送到外部存储或外部系统的操作符。Sink Operator是Flink的数据输出端,它的作用是将处理过的数据写入目标位置,如数据库、文件系统、消息队列等。
  • Sink Operator通过将数据传输到外部系统来完成最终的数据存储、展示或其他类型的处理。它可以将数据单个地或批量地发送到目标系统,具体取决于Sink操作符的实现。例如,可以将数据写入关系型数据库、NoSQL数据库、消息队列、文件系统等。
  • 在Flink中,可以使用预定义的Sink操作符,如addSink()方法,或自定义Sink函数来实现数据的输出。预定义的Sink操作符可以满足一般的输出需求,而自定义Sink函数可以根据具体的业务逻辑实现特定的输出操作。
  • 自定义Sink函数需要实现SinkFunction接口或RichSinkFunction抽象类,并重写其中的方法。这些方法包括open()、invoke()和close()等,用于初始化和管理连接,以及处理数据发送等操作。

使用Sink Operator时,需要考虑以下几个方面:

  • 目标系统的可用性和容错性:保证目标系统的可用性,并确保在故障发生时能够进行重试或恢复。
  • 写入的一致性:根据需求选择适当的写入一致性级别,如精确一次(exactly-once)或最少一次(at-least-once)语义。
  • 并行度和性能:根据目标系统的特性和可用资源,设置合适的并行度以提高任务并行处理和整体性能。

Flink 核心知识 Sink Operator速览

  • Flink编程模型
在这里插入图片描述
在这里插入图片描述
  • Sink 输出源
    • 预定义
      • print
      • writeAsText (过期)
    • 自定义
      • SinkFunction
      • RichSinkFunction
        • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
    • flink官方提供 Bundle Connector
      • kafka、ES 等
    • Apache Bahir
      • kafka、ES、Redis等

Flink 自定义的Sink 连接Mysql存储商品订单案例实战

自定义

  • SinkFunction
  • RichSinkFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

Flink连接mysql的几种方式(都需要加jdbc驱动)

  • 方式一:自带flink-connector-jdbc 需要加依赖包
代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
  • 方式二:自定义sink

保存视频订单到Mysql

代码语言:javascript
复制
CREATE TABLE `video_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `money` int(11) DEFAULT NULL,
  `title` varchar(32) DEFAULT NULL,
  `trade_no` varchar(64) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 添加jdbc依赖
代码语言:javascript
复制
<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
</dependency>
  • 编码
代码语言:javascript
复制
public class MysqlSink extends RichSinkFunction<VideoOrder> {

    private Connection conn = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");   //url user passwd
        String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
        ps = conn.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    @Override
    public void invoke(VideoOrder videoOrder, Context context) throws Exception {
        //给ps中的?设置具体值
        ps.setInt(1,videoOrder.getUserId());
        ps.setInt(2,videoOrder.getMoney());
        ps.setString(3,videoOrder.getTitle());
        ps.setString(4,videoOrder.getTradeNo());
        ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));

        ps.executeUpdate();
    }
}
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink Sink Operator简介
  • Flink 核心知识 Sink Operator速览
  • Flink 自定义的Sink 连接Mysql存储商品订单案例实战
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档