前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming中使用HikariCP数据库连接池与MySQL交互

Spark Streaming中使用HikariCP数据库连接池与MySQL交互

原创
作者头像
ZHANGHAO
修改2018-12-24 11:15:04
4.7K1
修改2018-12-24 11:15:04
举报
文章被收录于专栏:张浩的专栏张浩的专栏

在使用Spark Streaming的应用程序时,我们可能需要将计算结果保存到MySQL中,为了高效的与MySQL进行交互,这里我们使用HikariCP这个高效的数据库连接池。

添加依赖

Gradle

Gradle添加HikariCP的依赖,build.gradle文件内容如下所示

代码语言:javascript
复制
plugins {
    id 'idea'
    id 'java'
    id 'scala'
}

group 'io.zhanghao'
version '1.0-SNAPSHOT'

repositories {
    mavenLocal()
    maven {
        url = 'http://maven.aliyun.com/nexus/content/groups/public'
    }
}

dependencies {
    compile 'org.apache.spark:spark-core_2.11:2.4.0'
    compile 'org.apache.spark:spark-streaming_2.11:2.4.0'
    compile 'org.apache.spark:spark-sql_2.11:2.4.0'
    compile 'mysql:mysql-connector-java:8.0.13'
    compile 'com.zaxxer:HikariCP:3.1.0'
    compileOnly 'org.scala-lang:scala-compiler:2.11.8'
}

Maven

使用的构建工具是Maven的话,添加如下依赖到pom.xml即可

代码语言:html
复制
 <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>3.1.0</version>
 </dependency>

创建单例的HikariDataSource

代码语言:java
复制
object DataSourceSingleton {
  @transient private var instance: HikariDataSource = _

  def getDataSourceInstance: HikariDataSource = {
    if (instance == null) {
      try {
        val config = new HikariConfig
        config.setJdbcUrl("jdbc:mysql://localhost:3306/spark")
        config.setUsername("root")
        config.setPassword("123456")
        config.addDataSourceProperty("cachePrepStmts", "true")
        config.addDataSourceProperty("prepStmtCacheSize", "250")
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
        instance = new HikariDataSource(config)
      } catch {
        case ex: Exception => ex.printStackTrace()
      }
    }
    instance
  }
}

使用HikariDataSource

foreachPartition的时候,针对每一个Partition来创建一个Connection对象,并在用完连接的时候使用evictConnection(connection)来释放数据库连接。

这里我是统计单词的频率,并插入或更新词频统计的结果到MySQL中。

代码语言:java
复制
result.foreachRDD(rdd => {
  rdd.foreachPartition(partitionRDD => {
    //获取数据库连接池
    val dsPool = DataSourceSingleton.getDataSourceInstance
    //获取数据库连接池
    val connection = dsPool.getConnection
    partitionRDD.foreach(rdd => {
      val wordcountQuery = s"SELECT wordcount FROM wordcount WHERE word='${rdd._1}'"
      val set = connection.createStatement().executeQuery(wordcountQuery)
      var wordcount = 0
      while (set.next()) {
        wordcount = set.getInt("wordcount")
      }
      var wordCountSql = ""
      if (wordcount != 0) {
        wordCountSql = s"UPDATE wordcount SET wordcount=${rdd._2.toInt}+$wordcount WHERE word='${rdd._1}'"
      } else {
        wordCountSql = s"INSERT INTO wordcount(word,wordcount)VALUES('${rdd._1}',${rdd._2.toInt})"
      }
      //执行update或者insert语句
      connection.createStatement().execute(wordCountSql)
    }
    )
    //释放连接
    dsPool.evictConnection(connection)
  })

})

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 添加依赖
    • Gradle
      • Maven
      • 创建单例的HikariDataSource
      • 使用HikariDataSource
      相关产品与服务
      云数据库 SQL Server
      腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档