如何使用Java API访问CDH的Kudu

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Kudu是Cloudera开源的新型列式存储系统,是Apache Hadoop生态圈的新成员之一,专门为了对快速变化的数据进行快速分析,填补了以往Hadoop存储层的空缺,在前面的文章Fayson介绍了Kudu的安装及与Impala集成使用的文章,本篇文章Fayson主要介绍如何使用Java API操作Kudu。

  • 内容概述

1.环境准备

2.编写Java示例代码及运行

3.KuduMaster查看表

4.Kudu表与Impala集成

  • 测试环境

1.CM和CDH版本为5.14.3

2.Kudu版本为1.6.0

2.环境准备


1.安装Kudu服务,Fayson这里就不在介绍了,可以参考《如何在CDH中安装Kudu&Spark2&Kafka

2.修改Kudu配置,由于Fayson使用的是AWS环境这里跨了网段需要进行配置,如果局域网可以跳过此步

在KuduMaster服务的高级配置” gflagfile 的 Master 高级配置代码段(安全阀)”增加配置

--trusted_subnets=0.0.0.0/0

(可左右滑动)

添加受新人的子网名单,将其设置为0.0.0.0/0则表示允许所有的远程IP未经身份验证/未加密的连接。

如果未配置在使用Java API访问Kudu时报如下错误

W1128 16:56:55.749083 93981 negotiation.cc:318] Unauthorized connection attempt: Server connection negotiation failed: server connection from 100.73.0.57:42533: unauthenticated connections from publicly routable IPs are prohibited. See --trusted_subnets flag for more information

(可左右滑动)

3.编写Java示例


1.创建Kudu的Maven工程

2.工程的pom.xml文件内容如下

<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.6.0</version>
    </dependency>
    <!-- For logging messages. -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>com.cloudera</groupId>
        <artifactId>generate-data</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

(可左右滑动)

在pom.xml文件中,添加了generatedata工程的依赖。

3.编写KuduExample.java类,内容如下

package com.cloudera;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
/**
 * package: com.cloudera
 * describe: 使用API方式访问Kudu数据库
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/5/12
 * creat_time: 上午2:04
 * 公众号:Hadoop实操
 */
public class KuduExample {
    /**
     * 使用Kudu API创建一个Kudu表
     * @param client
     * @param tableName
     */
    public static void createTable(KuduClient client, String tableName) {
        List<ColumnSchema> columns = new ArrayList<>();
        //在添加列时可以指定每一列的压缩格式
        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("sex", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("city", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("occupation", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("tel", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("fixPhoneNum", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("bankName", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("address", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("marriage", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("childNum", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        Schema schema = new Schema(columns);
        CreateTableOptions createTableOptions = new CreateTableOptions();
        List<String> hashKeys = new ArrayList<>();
        hashKeys.add("id");
        int numBuckets = 8;
        createTableOptions.addHashPartitions(hashKeys, numBuckets);
        try {
            if(!client.tableExists(tableName)) {
                client.createTable(tableName, schema, createTableOptions);
            }
            System.out.println("成功创建Kudu表:" + tableName);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 向指定的Kudu表中upsert数据,数据存在则更新,不存在则新增
     * @param client KuduClient对象
     * @param tableName 表名
     * @param numRows 向表中插入的数据量
     */
    public static void upsert(KuduClient client, String tableName, int numRows ) {
        try {
            KuduTable kuduTable = client.openTable(tableName);
            KuduSession kuduSession = client.newSession();
            //设置Kudu提交数据方式,这里设置的为手动刷新,默认为自动提交
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            for(int i =0; i < numRows; i++) {
                String userInfo_str = RandomUserInfo.getUserInfo("测试数据");
                Upsert upsert = kuduTable.newUpsert();
                PartialRow row = upsert.getRow();
                String[] userInfo = userInfo_str.split(",");
                if(userInfo.length == 11) {
                    row.addString("id", userInfo[0]);
                    row.addString("name", userInfo[1]);
                    row.addString("sex", userInfo[2]);
                    row.addString("city", userInfo[3]);
                    row.addString("occupation", userInfo[4]);
                    row.addString("tel", userInfo[5]);
                    row.addString("fixPhoneNum", userInfo[6]);
                    row.addString("bankName", userInfo[7]);
                    row.addString("address", userInfo[8]);
                    row.addString("marriage", userInfo[9]);
                    row.addString("childNum", userInfo[10]);
                }
                kuduSession.apply(upsert);
            }
            kuduSession.flush();
            kuduSession.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 查看Kudu表中数据
     * @param client
     * @param tableName
     */
    public static void scanerTable(KuduClient client, String tableName) {
        try {
            KuduTable kuduTable = client.openTable(tableName);
            KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();
            while(kuduScanner.hasMoreRows()) {
                RowResultIterator rowResultIterator =kuduScanner.nextRows();
                while (rowResultIterator.hasNext()) {
                    RowResult rowResult = rowResultIterator.next();
                    System.out.println(rowResult.getString("id"));
                }
            }
            kuduScanner.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 删除表
     * @param client
     * @param tableName
     */
    public static void dropTable(KuduClient client, String tableName) {
        try {
            client.deleteTable(tableName); 
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 列出Kudu下所有的表
     * @param client
     */
    public static void tableList(KuduClient client) {
        try {
            ListTablesResponse listTablesResponse = client.getTablesList();
            List<String> tblist = listTablesResponse.getTablesList();
            for(String tableName : tblist) {
                System.out.println(tableName);
            }
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    private static final String KUDU_MASTER = System.getProperty("kuduMasters", "ip-172-31-5-38.ap-southeast-1.compute.internal:7051,ip-172-31-7-193.ap-southeast-1.compute.internal:7051,ip-172-31-8-230.ap-southeast-1.compute.internal:7051");
    public static void main(String[] args) {
        KuduClient kuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        String tableName = "user_info";
        //删除Kudu的表
        dropTable(kuduClient, tableName);
        //创建一个Kudu的表
        createTable(kuduClient, tableName);
        //列出Kudu下所有的表
        tableList(kuduClient);
        //向Kudu指定的表中插入数据
        upsert(kuduClient, tableName, 100);
        //扫描Kudu表中数据
        scanerTable(kuduClient, tableName);
        try {
            kuduClient.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
}

(可左右滑动)

4.示例代码运行

删除已存在的user_info表

创建一个新的user_info表

查看Kudu中所有的表名

向user_info中插入100条数据

扫描user_info表数据

运行成功

4.KuduMaster查看创建的表


1.登录CM,通过CM访问KuduMaster界面

2.点击上图标注的连接

进入Tables页面

点击连接进入user_info详细界面

5.Impala访问集成


在这里通过Java API创建的Kudu表默认Impala是不能访问的,需要在Impala中执行如下建表语句:

CREATE EXTERNAL TABLE `user_info` STORED AS KUDU
TBLPROPERTIES(
    'kudu.table_name' = 'user_info',
    'kudu.master_addresses' = 'ip-172-31-5-38.ap-southeast-1.compute.internal:7051,ip-172-31-7-193.ap-southeast-1.compute.internal:7051,ip-172-31-8-230.ap-southeast-1.compute.internal:7051')

(可左右滑动)

该建表语句就是上一章节通过访问KuduMaster查看user_info表详细信息获取到的。

1.登录Hue,使用Impala引擎查看,未显示在Kudu下创建的user_info表

2.在Hue执行建表SQL

3.查看user_info表数据

执行Count操作

6.总结


  • 在使用Java API访问Kudu时如果跨了网络则需要增加配置--trusted_subnets=0.0.0.0/0将网络添加到受新人列表
  • 通过Java API接口创建的Kudu表,默认是没有与Impala集成的,需要通过KuduMaster中提供的SQL在Impala中执行。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/kududemo/src/main/java/com/cloudera/KuduExample.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-05-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何在CDH未启用认证的情况下安装及使用Sentry

CDH平台中的安全,认证(Kerberos/LDAP)是第一步,授权(Sentry)是第二步。如果要启用授权,必须先启用认证。但在CDH平台中给出了一种测试模式...

2.9K80
来自专栏Hadoop实操

如何在CDH启用Kerberos的情况下安装及使用Sentry(一)

本文档主要讲述如何在启用Kerberos的CDH集群中安装配置及使用Sentry。

1K70
来自专栏Hadoop实操

如何在Redhat7.4安装CDH6.0.0_beta1

79720
来自专栏Hadoop实操

如何在Redhat7.4安装CDH5.15

76530
来自专栏技术专栏

慕课网Spark SQL日志分析 - 2.Spark 实战环境搭建

下载地址: http://spark.apache.org/downloads.html

30410
来自专栏Hadoop实操

如何在Kerberos与非Kerberos的CDH集群BDR不可用时复制数据

本文档描述了在Kerberos与非Kerberos的CDH集群之间BDR不可用的情况下实现数据互导。文档主要讲述

909110
来自专栏Hadoop实操

如何使用Sentry管理Hive外部表(补充)

/extwarehouse/student_hive数据目录不存,在创建外部表时自动生成,且数据目录属主为hive。

45240
来自专栏Hadoop实操

如何启用Oozie的HA

Oozie是基于Hadoop的作业调度工具,工作流引擎,在实际工作中,遇到对数据进行一连串的操作的时候很实用,不需要自己写一些处理代码了,只需要定义好各个act...

1.6K60
来自专栏ytkah

laravel dingo/api添加jwt-auth认证

前面我们学了laravel dingo/api创建简单的api,这样api是开放给所有人的,如何查看和限制api的调用呢?可以用jwt-auth来验证,JSON...

18120
来自专栏蓝天

Hive 1.2.1&Spark&Sqoop安装指南

本文的安装参照《Hive 0.12.0安装指南》,内容来源于官方的:GettingStarted,将Hive 1.2.1安装在Hadoop 2.7.1上。本...

27510

扫码关注云+社区

领取腾讯云代金券