前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何使用Java API访问CDH的Kudu

如何使用Java API访问CDH的Kudu

作者头像
Fayson
发布2018-07-12 14:58:29
6K1
发布2018-07-12 14:58:29
举报
文章被收录于专栏:Hadoop实操

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Kudu是Cloudera开源的新型列式存储系统,是Apache Hadoop生态圈的新成员之一,专门为了对快速变化的数据进行快速分析,填补了以往Hadoop存储层的空缺,在前面的文章Fayson介绍了Kudu的安装及与Impala集成使用的文章,本篇文章Fayson主要介绍如何使用Java API操作Kudu。

  • 内容概述

1.环境准备

2.编写Java示例代码及运行

3.KuduMaster查看表

4.Kudu表与Impala集成

  • 测试环境

1.CM和CDH版本为5.14.3

2.Kudu版本为1.6.0

2.环境准备


1.安装Kudu服务,Fayson这里就不在介绍了,可以参考《如何在CDH中安装Kudu&Spark2&Kafka

2.修改Kudu配置,由于Fayson使用的是AWS环境这里跨了网段需要进行配置,如果局域网可以跳过此步

在KuduMaster服务的高级配置” gflagfile 的 Master 高级配置代码段(安全阀)”增加配置

代码语言:javascript
复制
--trusted_subnets=0.0.0.0/0

(可左右滑动)

添加受新人的子网名单,将其设置为0.0.0.0/0则表示允许所有的远程IP未经身份验证/未加密的连接。

如果未配置在使用Java API访问Kudu时报如下错误

代码语言:javascript
复制
W1128 16:56:55.749083 93981 negotiation.cc:318] Unauthorized connection attempt: Server connection negotiation failed: server connection from 100.73.0.57:42533: unauthenticated connections from publicly routable IPs are prohibited. See --trusted_subnets flag for more information

(可左右滑动)

3.编写Java示例


1.创建Kudu的Maven工程

2.工程的pom.xml文件内容如下

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.6.0</version>
    </dependency>
    <!-- For logging messages. -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>com.cloudera</groupId>
        <artifactId>generate-data</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

(可左右滑动)

在pom.xml文件中,添加了generatedata工程的依赖。

3.编写KuduExample.java类,内容如下

代码语言:javascript
复制
package com.cloudera;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
/**
 * package: com.cloudera
 * describe: 使用API方式访问Kudu数据库
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/5/12
 * creat_time: 上午2:04
 * 公众号:Hadoop实操
 */
public class KuduExample {
    /**
     * 使用Kudu API创建一个Kudu表
     * @param client
     * @param tableName
     */
    public static void createTable(KuduClient client, String tableName) {
        List<ColumnSchema> columns = new ArrayList<>();
        //在添加列时可以指定每一列的压缩格式
        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("city", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("occupation", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("tel", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("fixPhoneNum", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("bankName", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("address", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("marriage", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("childNum", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        Schema schema = new Schema(columns);
        CreateTableOptions createTableOptions = new CreateTableOptions();
        List<String> hashKeys = new ArrayList<>();
        hashKeys.add("id");
        int numBuckets = 8;
        createTableOptions.addHashPartitions(hashKeys, numBuckets);
        try {
            if(!client.tableExists(tableName)) {
                client.createTable(tableName, schema, createTableOptions);
            }
            System.out.println("成功创建Kudu表:" + tableName);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 向指定的Kudu表中upsert数据,数据存在则更新,不存在则新增
     * @param client KuduClient对象
     * @param tableName 表名
     * @param numRows 向表中插入的数据量
     */
    public static void upsert(KuduClient client, String tableName, int numRows ) {
        try {
            KuduTable kuduTable = client.openTable(tableName);
            KuduSession kuduSession = client.newSession();
            //设置Kudu提交数据方式,这里设置的为手动刷新,默认为自动提交
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            for(int i =0; i < numRows; i++) {
                String userInfo_str = RandomUserInfo.getUserInfo("测试数据");
                Upsert upsert = kuduTable.newUpsert();
                PartialRow row = upsert.getRow();
                String[] userInfo = userInfo_str.split(",");
                if(userInfo.length == 11) {
                    row.addString("id", userInfo[0]);
                    row.addString("name", userInfo[1]);
                    row.addString("sex", userInfo[2]);
                    row.addString("city", userInfo[3]);
                    row.addString("occupation", userInfo[4]);
                    row.addString("tel", userInfo[5]);
                    row.addString("fixPhoneNum", userInfo[6]);
                    row.addString("bankName", userInfo[7]);
                    row.addString("address", userInfo[8]);
                    row.addString("marriage", userInfo[9]);
                    row.addString("childNum", userInfo[10]);
                }
                kuduSession.apply(upsert);
            }
            kuduSession.flush();
            kuduSession.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 查看Kudu表中数据
     * @param client
     * @param tableName
     */
    public static void scanerTable(KuduClient client, String tableName) {
        try {
            KuduTable kuduTable = client.openTable(tableName);
            KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
            while(kuduScanner.hasMoreRows()) {
                RowResultIterator rowResultIterator =kuduScanner.nextRows();
                while (rowResultIterator.hasNext()) {
                    RowResult rowResult = rowResultIterator.next();
                    System.out.println(rowResult.getString("id"));
                }
            }
            kuduScanner.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 删除表
     * @param client
     * @param tableName
     */
    public static void dropTable(KuduClient client, String tableName) {
        try {
            client.deleteTable(tableName); 
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 列出Kudu下所有的表
     * @param client
     */
    public static void tableList(KuduClient client) {
        try {
            ListTablesResponse listTablesResponse = client.getTablesList();
            List<String> tblist = listTablesResponse.getTablesList();
            for(String tableName : tblist) {
                System.out.println(tableName);
            }
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    private static final String KUDU_MASTER = System.getProperty("kuduMasters", "ip-172-31-5-38.ap-southeast-1.compute.internal:7051,ip-172-31-7-193.ap-southeast-1.compute.internal:7051,ip-172-31-8-230.ap-southeast-1.compute.internal:7051");
    public static void main(String[] args) {
        KuduClient kuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        String tableName = "user_info";
        //删除Kudu的表
        dropTable(kuduClient, tableName);
        //创建一个Kudu的表
        createTable(kuduClient, tableName);
        //列出Kudu下所有的表
        tableList(kuduClient);
        //向Kudu指定的表中插入数据
        upsert(kuduClient, tableName, 100);
        //扫描Kudu表中数据
        scanerTable(kuduClient, tableName);
        try {
            kuduClient.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
}

(可左右滑动)

4.示例代码运行

删除已存在的user_info表

创建一个新的user_info表

查看Kudu中所有的表名

向user_info中插入100条数据

扫描user_info表数据

运行成功

4.KuduMaster查看创建的表


1.登录CM,通过CM访问KuduMaster界面

2.点击上图标注的连接

进入Tables页面

点击连接进入user_info详细界面

5.Impala访问集成


在这里通过Java API创建的Kudu表默认Impala是不能访问的,需要在Impala中执行如下建表语句:

代码语言:javascript
复制
CREATE EXTERNAL TABLE `user_info` STORED AS KUDU
TBLPROPERTIES(
    'kudu.table_name' = 'user_info',
    'kudu.master_addresses' = 'ip-172-31-5-38.ap-southeast-1.compute.internal:7051,ip-172-31-7-193.ap-southeast-1.compute.internal:7051,ip-172-31-8-230.ap-southeast-1.compute.internal:7051')

(可左右滑动)

该建表语句就是上一章节通过访问KuduMaster查看user_info表详细信息获取到的。

1.登录Hue,使用Impala引擎查看,未显示在Kudu下创建的user_info表

2.在Hue执行建表SQL

3.查看user_info表数据

执行Count操作

6.总结


  • 在使用Java API访问Kudu时如果跨了网络则需要增加配置--trusted_subnets=0.0.0.0/0将网络添加到受新人列表
  • 通过Java API接口创建的Kudu表,默认是没有与Impala集成的,需要通过KuduMaster中提供的SQL在Impala中执行。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/kududemo/src/main/java/com/cloudera/KuduExample.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
专用宿主机
专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档