使用spark与MySQL进行数据交互的方法

在项目中,遇到一个场景是,需要从Hive数据仓库中拉取数据,进行过滤、裁剪或者聚合之后生成中间结果导入MySQL。

对于这样一个极其普通的离线计算场景,有多种技术选型可以实现。例如,sqoop,MR,HSQL。

我们这里使用的spark,优点来说是两个:一是灵活性高,二是代码简洁。

1)灵活性高

相比sqoop和HSQL,spark可以更灵活的控制过滤和裁剪逻辑,甚至你可以通过外部的配置或者参数,来动态的调整spark的计算行为,提供定制化。

2)代码简洁

相比MR来说,代码量上少了很多。也无需实现MySQL客户端。

我抽象了一下需求,做了如下一个demo。

涉及的数据源有两个:Hive&MySQL;计算引擎:spark&spark-sql。我们的demo中分为两个步骤:

1)从Hive中读取数据,交给spark计算,最终输出到MySQL;

2)从MySQL中读取数据,交给spark计算,最终再输出到MySQL另一张表。

1、 数据准备

创建了Hive外部分区表

关于分区和外部表这里不说了。

CREATE EXTERNAL TABLE `gulfstream_test.accounts`(
  `id` string COMMENT '用户id', 
  `order_id` string COMMENT '订单id', 
  `status` bigint COMMENT '用户状态', 
  `count` decimal(16,9) COMMENT '订单数')
COMMENT '用户信息'
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 
  'org.autonavi.udf.CustomInputFormat' 
OUTPUTFORMAT 
  'org.autonavi.udf.CustomHiveOutputFormat'
LOCATION
  'hdfs://mycluster-tj/***/acounts'
TBLPROPERTIES (
  'LEVEL'='1', 
  'TTL'='60', 
  'last_modified_by'='yangfan', 
  'last_modified_time'='2017-10-23', 
  'transient_lastDdlTime'='1508746808')

建立分区,并指定分区路径

这里分区使用的年月日三级分区。通过下面的命令将year=2017/month=10/day=23这个Hive分区的数据指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23

hive> alter table gulfstream_test.accounts add partition(year='2017', month='10', day='23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';

查询一下分区是否建立成功

可以看到分区已经有了。

show partitions gulfstream_test.accounts;
OK
partition
year=2017/month=10/day=23

上传本地测试数据到hdfs

hadoop fs -put a.txt  hdfs://mycluster-tj/***/acounts/2017/10/23

看一下数据,取了前10行,原谅我数据比较假。

[data_monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10
0       0       0       0
1       1       1       1
2       2       2       2
3       3       3       3
4       4       4       4
5       5       5       5
6       6       6       6
7       7       7       7
8       8       8       8
9       9       9       9

在Hive中,也查一下前10条,是一样的。只是多了分区字段。

hive (default)> select * from gulfstream_test.accounts where year=2017 and month=10 and day=23 limit 10;
OK
accounts.id     accounts.order_id       accounts.status accounts.count  accounts.year   accounts.month  accounts.day
0       0       0       0       2017    10      23
1       1       1       1       2017    10      23
2       2       2       2       2017    10      23
3       3       3       3       2017    10      23
4       4       4       4       2017    10      23
5       5       5       5       2017    10      23
6       6       6       6       2017    10      23
7       7       7       7       2017    10      23
8       8       8       8       2017    10      23
9       9       9       9       2017    10      23
Time taken: 1.38 seconds, Fetched: 10 row(s)

至此,测试数据准备好了。一共1000000条,1百万。

2、代码

1)POM依赖

可以通过pom依赖来看一下笔者使用的组件版本。

这里就不赘述了。

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

打包方式

<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--这里要替换成jar包main方法所在类 -->
                            <mainClass>com.kangaroo.studio.algorithms.filter.LoadDB</mainClass>

                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2)java spark代码

先贴上代码,再说明

package com.kangaroo.studio.algorithms.filter;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;

import java.io.Serializable;
import java.util.Properties;


public class LoadDB implements Serializable {

    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;
    private HiveContext hiveContext;
    private SQLContext sqlContext;

    /*
    *   初始化Load
    *   创建sparkContext, sqlContext, hiveContext
    * */
    public LoadDB() {
        initSparckContext();
        initSQLContext();
        initHiveContext();
    }

    /*
    *   创建sparkContext
    * */
    private void initSparckContext() {
        String warehouseLocation = System.getProperty("user.dir");
        sparkConf = new SparkConf()
                .setAppName("from-to-mysql")
                .set("spark.sql.warehouse.dir", warehouseLocation)
                .setMaster("yarn-client");
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    /*
    *   创建hiveContext
    *   用于读取Hive中的数据
    * */
    private void initHiveContext() {
        hiveContext = new HiveContext(javaSparkContext);
    }

    /*
    *   创建sqlContext
    *   用于读写MySQL中的数据
    * */
    private void initSQLContext() {
        sqlContext = new SQLContext(javaSparkContext);
    }

    /*
    *   使用spark-sql从hive中读取数据, 然后写入mysql对应表.
    * */
    public void hive2db() {
        String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
        String table = "accounts";
        Properties props = new Properties();
        props.put("user", "root");
        props.put("password", "1234");
        String query = "select * from gulfstream_test.accounts where year=2017 and month=10 and day=23";
        DataFrame rows = hiveContext.sql(query).select("id", "order_id", "status", "count");;
        rows.write().mode(SaveMode.Append).jdbc(url, table, props);
    }

    /*
    *   使用spark-sql从db中读取数据, 处理后再回写到db
    * */
    public void db2db() {
        String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
        String fromTable = "accounts";
        String toTable = "accountsPart";
        Properties props = new Properties();
        props.put("user", "root");
        props.put("password", "1234");
        DataFrame rows = sqlContext.read().jdbc(url, fromTable, props).where("count < 1000");
        rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
    }


    public static void main(String[] args) {
        LoadDB loadDB = new LoadDB();
        System.out.println(" ---------------------- start hive2db ------------------------");
        loadDB.hive2db();
        System.out.println(" ---------------------- finish hive2db ------------------------");
        System.out.println(" ---------------------- start db2db ------------------------");
        loadDB.db2db();
        System.out.println(" ---------------------- finish db2db ------------------------");
    }
}

说明:

  • hive2db

核心动作是使用hiveContext.sql(query)执行了hiveSQL,过滤出Hive表中year=2017/month=10/day=23分钟的数据,返回一个DataFrame对象。

DataFrame是spark-sql数据处理的核心。对DataFrame的操作推荐这样一篇博客。你可以去使用这些方法,实现复杂的逻辑。

对DataFrame对象,我们使用了select裁剪了其中4列数据(id, order_id, status, count)出来,不过不裁剪的话,会有7列(加上分区的year,month,day)。

然后将数据以SaveMode.Append的方式,写入了mysql中的accounts表。

SaveMode.Append方式,数据会追加,而不会覆盖。如果想覆盖,还有一个常用的SaveMode.Overwrite。推荐这样一篇博客

最终accounts中的数据有1000000条,百万。

  • db2db

db2db从刚刚生成的MySQL表accounts中读取出数据,也是返回了一个dataframe对象,通过执行where过滤除了其中id<1000的数据,这里正好是1000条。

然后写入了accountsPart。最终accountsPart数据应该有1000条。

3)编译和执行

 编译完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

使用默认参数提交到yarn队列。

spark-submit --queue=root.zhiliangbu_prod_datamonitor from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

 片刻之后,观察输出。已经全部finish了。

4)查看一下结果

我们到mysql中瞅一瞅。

accounts表

有没有注意到,其实不用建立mysql表!这个过程会自动给你创建,相当于if not exists。

细心的你可能已经注意到了,hive里的string类型,到了MySQL中变成了Text。有个兄弟说,如果你手动创建了表,并且字段设置为String会报错,我没有试,只是记录了一下。

CREATE TABLE `accounts` (
  `id` text,
  `order_id` text,
  `status` bigint(20) DEFAULT NULL,
  `count` decimal(16,9) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

简单看一下里面有多少数据。1百万

MariaDB [big_data]> select count(1) from accounts ;    
+----------+
| count(1) |
+----------+
|  1000000 |
+----------+
1 row in set (0.32 sec)

acountsPart表

 CREATE TABLE `accountsPart` (
  `id` text,
  `order_id` text,
  `status` bigint(20) DEFAULT NULL,
  `count` decimal(16,9) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

查看有多少数据,1000条,果然是没有问题的

MariaDB [big_data]> select count(1) from accountsPart;
+----------+
| count(1) |
+----------+
|     1000 |
+----------+
1 row in set (0.00 sec)

到此为止。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android知识点总结

SpringBoot-07-之数据库JPA(CRUD)

1312
来自专栏xingoo, 一个梦想做发明家的程序员

如何在Elasticsearch中安装中文分词器(IK+pinyin)

如果直接使用Elasticsearch的朋友在处理中文内容的搜索时,肯定会遇到很尴尬的问题——中文词语被分成了一个一个的汉字,当用Kibana作图的时候,按照...

4587
来自专栏battcn

一起来学SpringBoot | 第八篇:通用Mapper与分页插件的集成

在一起来学SpringBoot | 第七篇:整合Mybatis一文中,我们介绍了 Mybatis这款优秀的框架,顺便提及了民间大神开发的两款插件( 通用Mapp...

1382
来自专栏GopherCoder

专栏:008:MySQLdb及其银行模拟转账

1944
来自专栏纯洁的微笑

springboot(六):如何优雅的使用mybatis

这两天启动了一个新项目因为项目组成员一直都使用的是mybatis,虽然个人比较喜欢jpa这种极简的模式,但是为了项目保持统一性技术选型还是定了 mybatis。...

54312
来自专栏Java编程技术

MySQL中流式查询使用

MySQL 是目前使用比较广泛的关系型数据库,而从数据库里面根据条件查询数据到内存的情况想必大家在日常项目实践中都有使用。

1422
来自专栏王硕

原 为PostgreSQL添加插件

6275
来自专栏伦少的博客

hive查询报错:java.io.IOException:org.apache.parquet.io.ParquetDecodingException

转载请务必注明原创地址为:https://dongkelun.com/2018/05/20/hiveQueryException/

48117
来自专栏伦少的博客

Spark通过修改DataFrame的schema给表字段添加注释

通过Spark将关系型数据库(以Oracle为例)的表同步的Hive表,要求用Spark建表,有字段注释的也要加上注释。Spark建表,有两种方法:

2433
来自专栏LanceToBigData

MySQL(三)之SQL语句分类、基本操作、三大范式

一、SQL语句的分类   DML(Data Manipulation Langauge,数据操纵/管理语言) (insert,delete,update,se...

2915

扫码关注云+社区

领取腾讯云代金券