前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Springboot 整合 sqoop

Springboot 整合 sqoop

作者头像
Maynor
发布2021-12-12 10:08:58
4500
发布2021-12-12 10:08:58
举报

Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。 —来自:百度百科 以上是对sqoop的一个简单说明,具体我就不再多赘述。日常企业开发过程中,我们可能面对增删改查的业务比较多,但是作为一个程序员,我觉得不要局限于此,可能面对业务的场景不同。自然而然的对整个业务技术框架的认知也是有一定的局限性。今天跟大家分享这个Sqoop框架,基于springBoot进行整合。也许能够帮助你在你的简历中锦上添花,希望能够你带来薪资上的变化。 说起sqoop,我们必须要了解它的用途,主要应用于 RDBMS 与 Hadoop ( HDFS / Hive / HBase )数据传输迁移。我们主要通过这个工具主要作为归档数据同步使用辅助企业智能推荐及可视化大屏使用。为什么会用到sqoop,因为它解决了关系数据库与Hadoop之间的数据传输问题。基于它底层MR的本质,具有性能高、易用、灵活的特点。

下面通过实际的案例,对整个整合过程做进一步的详解。

1.首先我们先创建一个简单的springboot工程。具体过程我就不多赘述,可参考以下地址:springboot超详细搭建过程 - 简书

2.引入依赖文件

代码语言:javascript
复制
 <dependencies>
    <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.sqoop/sqoop -->
        <dependency>
            <groupId>org.apache.sqoop</groupId>
            <artifactId>sqoop</artifactId>
            <version>1.4.7</version>
            <classifier>hadoop260</classifier>
        </dependency>


        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
        <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.8.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.8.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-mapred</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-common</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>

注意:

3.创建Crotoller类

代码语言:javascript
复制
package com.ienglish.batch.data.sqoop.controller;

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:53
 **/

import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/data")
public class SqoopController {
    @Autowired
    private SqoopService sqoopService;

    /**
     * MYSQL数据到HDFS
     * @param connect  jdbc:mysql://127.0.0.1:3306/test
     * @param driver com.mysql.cj.jdbc.Driver
     * @param username /
     * @param password /
     * @param table  /
     * @param m  MR的并行度
     * @param targetdir   hdfs目录
     * @param hdfsAddr  hdfs地址
     * @return
     * @throws Exception
     */
    @PostMapping("/mysqlTohdfs")
    @ResponseBody
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception {
        return sqoopService.mysqlTohdfs(connect, driver, username, password, table, m, targetdir, hdfsAddr);
    }

    /**
     *
     * @param jdbc
     * @param driver
     * @param username
     * @param password
     * @param mysqlTable
     * @param hbaseTableName
     * @param columnFamily
     * @param rowkey
     * @param m
     * @return
     * @throws Exception
     */
    @PostMapping("/mysql2hbase")
    @ResponseBody
    public sqoopBean transformMysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        return sqoopService.mysql2Hbase(jdbc, driver, username, password, mysqlTable, hbaseTableName, columnFamily, rowkey, m);
    }


}

3.创建接口类

代码语言:javascript
复制
package com.ienglish.batch.data.sqoop.service;/**
 * @description:
 * Author: wsy
 * Date: 2021/10/15 16:56
 * Content:
 */

import com.ienglish.batch.data.sqoop.entity.sqoopBean;

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:56
 **/


public interface SqoopService {
    /**
     * mysql到hdfs
     * @param connect
     * @param driver
     * @param username
     * @param password
     * @param table
     * @param m
     * @param targetdir
     * @param hdfsAddr
     * @return
     * @throws Exception
     */
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception;

    /**
     * mysql到hbase
     * @param jdbc
     * @param driver
     * @param username
     * @param password
     * @param mysqlTable
     * @param hbaseTableName
     * @param columnFamily
     * @param rowkey
     * @param m
     * @return
     * @throws Exception
     */
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception;


}

4.创建实现类

代码语言:javascript
复制
package com.ienglish.batch.data.sqoop.service.serviceimpl;/**
 * @description:
 * Author: wsy
 * Date: 2021/10/15 16:57
 * Content:
 */

/**
 * @program: english-batch-data-sqoop
 *
 * @description:
 *
 * @author: ShiY.WANG
 *
 * @create: 2021-10-15 16:57
 **/

import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;
import org.apache.hadoop.conf.Configuration;
import org.springframework.stereotype.Service;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
import java.sql.Timestamp;
import java.util.Date;

@Service
public class SqoopServiceImpl implements SqoopService {

    @Override
    public sqoopBean mysqlTohdfs(String connect, String driver, String username,
                                 String password, String table, int m, String targetdir,
                                 String hdfsAddr) throws Exception {
        String[] args = new String[]{
                "--connect", connect,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", table,
                "-m", String.valueOf(m),
                "--target-dir", targetdir,
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        conf.set("fs.default.name", hdfsAddr);
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;


    }

    @Override
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        String[] args = new String[]{
                "--connect", jdbc,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", mysqlTable,
                "--hbase-table", hbaseTableName,
                "--column-family", columnFamily,
                "--hbase-create-table",
                "--hbase-row-key", rowkey,
                "-m", String.valueOf(m),
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;
    }
}
代码语言:javascript
复制
以上就是正常的整合配置过程,这只是一个demo,可以根据实际需求进行封装使用。

具体执行结果如下:

在这里插入图片描述
在这里插入图片描述

文件传输成功。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-12-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档