Spark jdbc postgresql数据库连接和写入操作源码解读

概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行。整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中。附带完整项目源码(完整项目源码github)。

1.首先在postgreSQL中创建一张测试表,并插入数据。(完整项目源码Github)

1.1. 在postgreSQL中的postgres用户下,创建 products

CREATE TABLE products (
    product_no integer,
    name text,
    price numeric
);

1.2. 在 products 插入数据

INSERT INTO products (product_no, name, price) VALUES
    (1, 'Cheese', 9.99),
    (2, 'Bread', 1.99),
    (3, 'Milk', 2.99);

查看数据库写入结果。

2.编写SPARK程序。(完整项目源码Github

2.1.读取Postgresql某一张表的数据为DataFrame(完整项目源码Github

SparkPostgresqlJdbc.java
Properties connectionProperties = new Properties();


//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties.put("user","postgres");
connectionProperties.put("password","123456");
connectionProperties.put("driver","org.postgresql.Driver");

//SparkJdbc读取Postgresql的products表内容
Dataset<Row> jdbcDF = spark.read()
        .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");

//显示jdbcDF数据内容
jdbcDF.show();

2.2.写入Postgresql某张表中

//将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
jdbcDF.write().mode("append")
        .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

3.运行程序,并查看结果(如果在IDEA中开发不熟练,可以看我另一篇博文spark (java API) 在Intellij IDEA中开发并运行)。

3.1.直接在intellij IDEA(社区版)中运行。

a.在运行按钮的“Edit Configeration”中的VM option中添加“-Dspark.master=local”

3.2.在终端(Terminal)中运行。

/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
  --class "SparkPostgresqlJdbc" \
  --master local[4] \
  --driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar \
  target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar

其中 --driver-class-path 指定下载的postgresql JDBC数据 库驱动路径,命令执行要在项目的根目录中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。

查看Spark写入数据库中的数据

4.以下为项目中主要源码(完整项目源码Github):

4.1.项目配置源码pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wangxiaolei</groupId>
    <artifactId>SparkPostgresqlJdbc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4.1212</version>
        </dependency>
    </dependencies>
</project>

4.2.java源码SparkPostgresqlJdbc.java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-9.
 * Project:SparkPostgresqlJdbc.
 */
public class SparkPostgresqlJdbc {
    public static void main (String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkPostgresqlJdbc")
                .config("spark.some.config.option","some-value")
                .getOrCreate();
    //启动runSparkPostgresqlJdbc程序
        runSparkPostgresqlJdbc(spark);

        spark.stop();

    }

    private static void runSparkPostgresqlJdbc(SparkSession spark){
        //new一个属性
        System.out.println("确保数据库已经开启,并创建了products表和插入了数据");
        Properties connectionProperties = new Properties();


        //增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
        System.out.println("增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)");
        connectionProperties.put("user","postgres");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","org.postgresql.Driver");



        //SparkJdbc读取Postgresql的products表内容
        System.out.println("SparkJdbc读取Postgresql的products表内容");
        Dataset<Row> jdbcDF = spark.read()
                .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
        //显示jdbcDF数据内容
        jdbcDF.show();



        //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
        jdbcDF.write().mode("append")
                .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

    }
}

完整项目源码Github

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏NetCore

Identity Service - 解析微软微服务架构eShopOnContainers(二)

接上一篇,众所周知一个网站的用户登录是非常重要,一站式的登录(SSO)也成了大家讨论的热点。微软在这个Demo中,把登录单独拉了出来,形成了一个Service,...

2405
来自专栏乐沙弥的世界

配置共享服务器模式

两者完成相同的任务,即处理所有指定的SQL操作。假定从客户端提交一个任意查询(DQL)到数据库服务器不论是专用模式还是共享

1263
来自专栏乐沙弥的世界

RMAN 配置、监控与管理

一个通道代表一个到设备(磁盘或磁带)的数据流并且在目标数据库或辅助数据库实例上产生一个相应的服务器会话(server session)

991
来自专栏一名合格java开发的自我修养

3.sparkSQL整合Hive

  spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1....

1663
来自专栏王小雷

Hadoop YARN学习监控JVM和实时监控Ganglia、Ambari(5)

Hadoop YARN学习监控JVM和实时监控Ganglia、Ambari(5) 1.0 监控ResourceManager进程Java虚拟机中堆空间的特定部分...

2687
来自专栏生信宝典

Airflow配置和使用

Airflow能做什么 Airflow是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。 Airflow独立于我们要运行...

2.1K6
来自专栏AILearning

Apache Zeppelin 中 JDBC通用 解释器

概述 JDBC解释器允许您无缝地创建到任何数据源的JDBC连接。 在运行每个语句后,将立即应用插入,更新和升级。 到目前为止,已经通过以下测试: ...

2597
来自专栏Golang语言社区

Golang 语言调用动态库实现OpenGL及windows的API编程

最近晚上没有事情的时候,研究下了开源的walk-master源码,自己简单的分析了下,如果在 import ( "github.com/lxn/win...

30012
来自专栏CSDN技术头条

资源控制在大数据和云计算平台中的应用

本文针对大数据平台中资源控制这个层面来详细介绍资源控制在不同操作系统上的具体技术实现,以及大数据平台和资源控制的集成。

5818
来自专栏自由而无用的灵魂的碎碎念

用IE打开Reporting Service时提示权限不足的解决方法

在IE中打开http://localhost/ReportServer时,经常提示“为用户“*”授予的权限不足,无法执行此操作。 (rsAccessDenied...

1044

扫码关注云+社区