前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark读写MySQL数据

Spark读写MySQL数据

作者头像
每天学Java
发布2020-06-02 10:22:05
2.7K0
发布2020-06-02 10:22:05
举报
文章被收录于专栏:每天学Java每天学Java

文章来源:www.study-java.cn/#/homework

大家可以到网站看相关文章:

导入依赖
代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0-preview</version>
        </dependency>

执行的过程中,出现了很多次的jar冲突,我这边和Hadoop-common 以及 hadoop-dfs有依赖冲突,具体的根据自己实际情况去除

代码语言:javascript
复制
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-lang3</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
            <version>2.9.2</version>
        </dependency>
       <dependency>
       <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
                        <exclusions>
                            <exclusion>
                                <groupId>io.netty</groupId>
                                <artifactId>netty-all</artifactId>
                            </exclusion>
                            <exclusion>
                                <groupId>javax.servlet</groupId>
                                <artifactId>servlet-api</artifactId>
                            </exclusion>
                        </exclusions>
                        <version>2.9.2</version>
        </dependency>

其中有一个错误需要将spark移到pom文件的最前面,具体原因未知:信息如下

代码语言:javascript
复制
javax.servlet.FilterRegistration"'s signer information does not match signer information
将 spark 移到最前面就搞定了。。
读取MySQL
代码语言:javascript
复制
public class SparkMySQL {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .master("local[4]")
                .getOrCreate();
        SQLContext sqlContext = new SQLContext(sparkSession);
        //读取mysql数据
        readMySQL(sqlContext);

        //停止SparkContext
        sparkSession.stop();
    }

    public static void readMySQL(SQLContext sqlContext) {
        String url = "jdbc:mysql://IP/DB?useSSL=false";
        String table = "qc_work_order";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "pwd");
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        Dataset<Row> jdbcDF = sqlContext.read().jdbc(url, table, connectionProperties)
                .select("*")
                .where("qc_user_id = '12'").limit(100).orderBy(new Column("id").desc());
        jdbcDF.show(50);
    }
}

这里我们可直接执行,控制台会直接输出我们查询的数据。

执行Jar

使用IDEA可以直接在控制台查看查询的数据,我们也可以将Java打包成Jar,通过spark-submit执行

这里要带上驱动路径,不然会报错找不到MySQL的驱动

./spark-submit --class 'package.SparkMySQL' --jar /mysql-connection.jar /SparkMySQL.jar 2>&1

写入MySQL

和读取数据库有很大的不同,写入数据需要创建DataFrame,也就是createDataFrame方法, 其参数有多种形式JavaRDD<Row>,List<Row> rows,RDD<?> rdd等等,我这里选择 List<Row>。该List存储的是每一行的值,structFields变量存储值对应的字段。mode方法指的是操作方式,append会在现在的数据基础上拼接,overwrite则会覆盖,并改变表的结构。

代码语言:javascript
复制
public class SparkMySQL {
    static String url = "jdbc:mysql://IP/DB?useSSL=false";
    static Properties connectionProperties = new Properties();

    static {
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "pwd");
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
    }

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .master("local[4]")
                .getOrCreate();
        SQLContext sqlContext = new SQLContext(sparkSession);
        //读取mysql数据
//        readMySQL(sqlContext);
        writeMySQL(sqlContext);
        //停止sparkSession
        sparkSession.stop();
    }

    public static void writeMySQL(SQLContext sqlContext) {
        String table = "qc_user";
        List<Row> list = new ArrayList<>();
        Row row = RowFactory.create(20, "c3p0", "张琰菲", 2);
        list.add(row);
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("user_id", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("user_name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("rule", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(structFields);
        //overwrite 会把字段覆盖掉
        sqlContext.createDataFrame(list, structType).write().mode("append").jdbc(url, table, connectionProperties);
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 每天学Java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导入依赖
  • 读取MySQL
  • 执行Jar
  • 写入MySQL
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档