首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
客快物流大数据项目(一):物流项目介绍和内容大纲
2
客快物流大数据项目(二):物流项目详细介绍
3
客快物流大数据项目(三):项目解决方案
4
客快物流大数据项目(四):大数据项目为什么使用Docker
5
客快物流大数据项目(五):Docker介绍
6
客快物流大数据项目(六):Docker与虚拟机的形象比喻及组件介绍
7
客快物流大数据项目(七):Docker总结
8
客快物流大数据项目(八):Docker的安装和启动
9
客快物流大数据项目(九):Docker常用命令
10
客快物流大数据项目(十):Docker容器命令
11
客快物流大数据项目(十一):Docker应用部署
12
客快物流大数据项目(十二):Docker的迁移与备份
13
客快物流大数据项目(十三):Docker镜像
14
客快物流大数据项目(十四):DockerFile介绍与构建过程解析
15
客快物流大数据项目(十五):DockeFile常用命令
16
客快物流大数据项目(十六):使用脚本创建镜像
17
客快物流大数据项目(十七):自定义镜像mycentos
18
客快物流大数据项目(十九):项目环境准备
19
客快物流大数据项目(二十):物流管理系统服务器的数据路径配置和软件下载存放位置
20
客快物流大数据项目(二十一):Docker环境初始化
21
客快物流大数据项目(二十二):Docker环境中安装软件
22
客快物流大数据项目(二十三):OGG介绍
23
客快物流大数据项目(二十四):OGG安装部署
24
客快物流大数据项目(二十五):初始化业务数据
25
客快物流大数据项目(二十六):客户关系管理服务器
26
客快物流大数据项目(二十七):Cloudera Manager简单介绍
27
客快物流大数据项目(二十八):大数据服务器环境准备
28
客快物流大数据项目(二十九):下载CDH的安装包
29
客快物流大数据项目(三十):软件下载后存放位置
30
客快物流大数据项目(三十一):常用工具安装
31
客快物流大数据项目(三十二):安装CDH-6.2.1和初始化CDH服务所需的MySQL库
32
客快物流大数据项目(三十三):安装Server和Agent
33
客快物流大数据项目(三十四):CDH开始安装
34
客快物流大数据项目(三十五):CDH使用注意
35
客快物流大数据项目(三十六):安装ElasticSearch-7.6.1
36
客快物流大数据项目(三十七):安装Kinaba-7.6.1
37
客快物流大数据项目(三十八):安装Azkaban-3.71.0
38
客快物流大数据项目(三十九):Hue安装
39
客快物流大数据项目(四十):ETL实现方案
40
客快物流大数据项目(四十一):Kudu入门介绍
41
客快物流大数据项目(四十二):Java代码操作Kudu
42
客快物流大数据项目(四十三):kudu的分区方式
43
客快物流大数据项目(四十四):Spark操作Kudu创建表
44
客快物流大数据项目(四十五):Spark操作Kudu DML操作
45
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu
46
客快物流大数据项目(四十七):Spark操作Kudu Native RDD
47
客快物流大数据项目(四十八):Spark操作Kudu 修改表
48
客快物流大数据项目(四十九):开发环境初始化
49
客快物流大数据项目(五十):项目框架初始化
50
客快物流大数据项目(五十一):数据库表分析

客快物流大数据项目(四十二):Java代码操作Kudu

目录

Java代码操作Kudu

一、构建maven工程

二、导入依赖

三、​​​​​​​创建包结构

四、​​​​​​​初始化方法

五、​​​​​​​创建表

六、​​​​​​​插入数据

七、​​​​​​​查询数据

八、修改数据

九、​​​​​​​删除数据

十、​​​​​​​修改表

十一、​​​​​​​删除表

Java代码操作Kudu

一、​​​​​​​构建maven工程

二、导入依赖

代码语言:javascript
复制
<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client-tools</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

三、​​​​​​​创建包结构

包名

说明

cn.it

代码所在的包目录

四、​​​​​​​初始化方法

代码语言:javascript
复制
package cn.it;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;

public class TestKudu {
    //定义KuduClient客户端对象
    private static KuduClient kuduClient;
    //定义表名
    private static String tableName = "person";

    /**
     * 初始化方法
     */
    @Before
    public void init() {
        //指定master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    }

    //构建表schema的字段信息
    //字段名称   数据类型     是否为主键
    public ColumnSchema newColumn(String name, Type type, boolean isKey) {
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(isKey);
        return column.build();
    }
}

五、​​​​​​​创建表

代码语言:javascript
复制
/**  使用junit进行测试
 *
 * 创建表
 * @throws KuduException
 */
@Test
public void createTable() throws KuduException {
    //设置表的schema
    List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
    columns.add(newColumn("CompanyId", Type.INT32, true));
    columns.add(newColumn("WorkId", Type.INT32, false));
    columns.add(newColumn("Name", Type.STRING, false));
    columns.add(newColumn("Gender", Type.STRING, false));
    columns.add(newColumn("Photo", Type.STRING, false));
    Schema schema = new Schema(columns);
    //创建表时提供的所有选项
    CreateTableOptions tableOptions = new CreateTableOptions();
    //设置表的副本和分区规则
    LinkedList<String> list = new LinkedList<String>();
    list.add("CompanyId");
    //设置表副本数
    tableOptions.setNumReplicas(1);
    //设置range分区
    //tableOptions.setRangePartitionColumns(list);
    //设置hash分区和分区的数量
    tableOptions.addHashPartitions(list, 3);
    try {
        kuduClient.createTable("person", schema, tableOptions);
    } catch (Exception e) {
        e.printStackTrace();
    }
    kuduClient.close();
}

六、​​​​​​​插入数据

代码语言:javascript
复制
/**
 * 向表中加载数据
 * @throws KuduException
 */
@Test
public void loadData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //创建KuduSession对象 kudu必须通过KuduSession写入数据
    KuduSession kuduSession = kuduClient.newSession();
    //采用flush方式 手动刷新
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
    kuduSession.setMutationBufferSpace(3000);
    //准备数据
    for(int i=1; i<=10; i++){
        Insert insert = kuduTable.newInsert();
        //设置字段的内容
        insert.getRow().addInt("CompanyId",i);
        insert.getRow().addInt("WorkId",i);
        insert.getRow().addString("Name","lisi"+i);
        insert.getRow().addString("Gender","male");
        insert.getRow().addString("Photo","person"+i);
        kuduSession.flush();
        kuduSession.apply(insert);
    }
    kuduSession.close();
    kuduClient.close();
}

七、​​​​​​​查询数据

代码语言:javascript
复制
 /**
 * 查询表数据
 * @throws KuduException
 */
@Test
public void queryData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //获取scanner扫描器
    KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
    KuduScanner scanner = scannerBuilder.build();
    //遍历
    while(scanner.hasMoreRows()){
        RowResultIterator rowResults = scanner.nextRows();
        while (rowResults.hasNext()){
            RowResult result = rowResults.next();
            int companyId = result.getInt("CompanyId");
            int workId = result.getInt("WorkId");
            String name = result.getString("Name");
            String gender = result.getString("Gender");
            String photo = result.getString("Photo");
            System.out.print("companyId:"+companyId+" ");
            System.out.print("workId:"+workId+" ");
            System.out.print("name:"+name+" ");
            System.out.print("gender:"+gender+" ");
            System.out.println("photo:"+photo);
        }
    }
    //关闭
    scanner.close();
    kuduClient.close();
}

八、修改数据

代码语言:javascript
复制
/**
 * 修改数据
 * @throws KuduException
 */
@Test
public void upDATEData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //构建kuduSession对象
    KuduSession kuduSession = kuduClient.newSession();
    //设置刷新数据模式,自动提交
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);

    //更新数据需要获取UpDATE对象
    UpDATE upDATE = kuduTable.newUpDATE();
    //获取row对象
    PartialRow row = upDATE.getRow();
    //设置要更新的数据信息
    row.addInt("CompanyId",1);
    row.addString("Name","kobe");
    //操作这个upDATE对象
    kuduSession.apply(upDATE);
    kuduSession.close();
}

九、​​​​​​​删除数据

代码语言:javascript
复制
/**
 * 删除表中的数据
 */
@Test
public void deleteData() throws KuduException {
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    KuduSession kuduSession = kuduClient.newSession();
    //获取Delete对象
    Delete delete = kuduTable.newDelete();
    //构建要删除的行对象
    PartialRow row = delete.getRow();
    //设置删除数据的条件
    row.addInt("CompanyId",2);
    kuduSession.flush();
    kuduSession.apply(delete);
    kuduSession.close();
    kuduClient.close();
}

十、​​​​​​​修改表

代码语言:javascript
复制
package cn.it.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

/**
 * 修改表操作
 */
public class AlterTable {
    //定义kudu的客户端对象
    private static KuduClient kuduClient;
    //定义一张表名称
    private static String tableName = "person";

    /**
     * 初始化操作
     */
    @Before
    public void init() {
        //指定kudu的master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    }

    /**
     * 添加列
     */
    @Test
    public void alterTableAddColumn() {
        AlterTableOptions alterTableOptions = new AlterTableOptions();
        alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
        try {
            kuduClient.alterTable(tableName, alterTableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
       }
    }

    /**
     * 删除列
     */
    @Test
    public void alterTableDeleteColumn(){
        AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
        try {
            kuduClient.alterTable(tableName, alterTableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
       }
    }

    /**
     * 添加分区列
     */
    @Test
    public void alterTableAddRangePartition(){
        int lowerValue = 110;
        int upperValue = 120;
        try {
            KuduTable kuduTable = kuduClient.openTable(tableName);
            List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
            boolean flag = true;
            for (Partition rangePartition : rangePartitions) {
                int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
                if(startKey == lowerValue){
                    flag = false;
                }
            }
            if(flag) {
                PartialRow lower = kuduTable.getSchema().newPartialRow();
                lower.addInt("Id", lowerValue);
                PartialRow upper = kuduTable.getSchema().newPartialRow();
                upper.addInt("Id", upperValue);
                kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
            }else{
                System.out.println("分区已经存在,不能重复创建!");
            }
        } catch (KuduException e) {
            e.printStackTrace();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    /**
     * 删除表
     * @throws KuduException
     */
    @Test
    public void dropTable() throws KuduException {
        kuduClient.deleteTable(tableName);
    }
}

十一、​​​​​​​删除表

代码语言:javascript
复制
/**
 * 删除表
 */
@Test
public void dropTable() throws KuduException {
    //删除表
    DeleteTableResponse response = kuduClient.deleteTable(tableName);
    //关闭客户端连接
    kuduClient.close();
}
下一篇
举报
领券