使用Sink Operator时,需要考虑以下几个方面:
自定义
Flink连接mysql的几种方式(都需要加jdbc驱动)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.12.0</version>
</dependency>
保存视频订单到Mysql
CREATE TABLE `video_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`money` int(11) DEFAULT NULL,
`title` varchar(32) DEFAULT NULL,
`trade_no` varchar(64) DEFAULT NULL,
`create_time` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net"); //url user passwd
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
}
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(VideoOrder videoOrder, Context context) throws Exception {
//给ps中的?设置具体值
ps.setInt(1,videoOrder.getUserId());
ps.setInt(2,videoOrder.getMoney());
ps.setString(3,videoOrder.getTitle());
ps.setString(4,videoOrder.getTradeNo());
ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
ps.executeUpdate();
}
}