专栏首页大数据成神之路17-Flink消费Kafka写入Mysql

17-Flink消费Kafka写入Mysql

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

Flink时间戳和水印

Broadcast广播变量

FlinkTable&SQL

Flink实战项目实时热销排行

Flink写入RedisSink

Flink消费Kafka写入Mysql

本文介绍消费Kafka的消息实时写入Mysql

1. maven新增依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>

2.重写RichSinkFunction,实现一个Mysql Sink

public class MysqlSink extends
    RichSinkFunction<Tuple3<Integer, String, Integer>> {

  private Connection connection;
  private PreparedStatement preparedStatement;
  String username = "";
  String password = "";
  String drivername = "";   //配置改成自己的配置
  String dburl = "";

  @Override
  public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
    Class.forName(drivername);
    connection = DriverManager.getConnection(dburl, username, password);
    String sql = "replace into table(id,num,price) values(?,?,?)"; //假设mysql 有3列 id,num,price
    preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setInt(1, value.f0);
    preparedStatement.setString(2, value.f1);
    preparedStatement.setInt(3, value.f2);
    preparedStatement.executeUpdate();
    if (preparedStatement != null) {
      preparedStatement.close();
    }
    if (connection != null) {
      connection.close();
    }
  }
}

3. Flink主类

public class MysqlSinkTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

// 1,abc,100 类似这样的数据,当然也可以是很复杂的json数据,去做解析
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
env.getConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(5, 5000));
env.enableCheckpointing(2000);
DataStream<String> stream = env
.addSource(consumer);
DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
.map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
String[] args1 = value.split(",");
return new Tuple3<Integer, String, Integer>(Integer
.valueOf(args1[0]), args1[1],Integer
.valueOf(args1[2]));
});
sourceStream.addSink(new MysqlSink());
env.execute("data to mysql start");
}
}

改成自己的本地配置试一试吧!

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-03-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 17-Flink消费Kafka写入Mysql

    王知无
  • 7-Flink的分布式缓存

    Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。

    王知无
  • Flink从入门到放弃-Flink分布式缓存

    在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

    王知无
  • 17-Flink消费Kafka写入Mysql

    王知无
  • Java漫谈11

    这次我们接着聊String,这次我们聊聊String类为什么是final的。 之所以聊这个,是因为在知乎上看了一篇帖子,看完后让我对这个点有了认识,在这里跟你分...

    用户1335799
  • 函数式接口,方法和构造函数引用

    如何让现有的函数更友好地支持 Lambda,最好的方法是:增加函数式接口。所谓 “函数式接口”,是指仅仅只包含一个抽象方法,但是可以有多个非抽象方法(也就是之前...

    happyJared
  • 函数式接口(Functional Interfaces)

    Java 语言设计者们投入了大量精力来思考如何使现有的函数友好地支持Lambda。最终采取的方法是:增加函数式接口的概念。“函数式接口”是指仅仅只包含一个抽象方...

    崔笑颜
  • Windows 10 S中的Device Guard详解(上篇)

    本文探讨Windows 10 S(下称Win10S)中的Device Guard(设备保护,下称DG)。我将提取策略,并弄清楚在默认Win10S系统上可以和不可...

    FB客服
  • Shiro入门这篇就够了【Shiro的基础知识、回顾URL拦截】

    前言 本文主要讲解的知识点有以下: 权限管理的基础知识 模型 粗粒度和细粒度的概念 回顾URL拦截的实现 Shiro的介绍与简单入门 一、Shiro基础知识 在...

    Java3y
  • Android开发获取系统中已安装程序信息的方法

    本文实例讲述了Android开发获取系统中已安装程序信息的方法。分享给大家供大家参考,具体如下:

    砸漏

扫码关注云+社区

领取腾讯云代金券