前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kudu 2

Apache Kudu 2

作者头像
jasong
发布2022-03-27 16:06:24
1.7K0
发布2022-03-27 16:06:24
举报
文章被收录于专栏:ClickHouseClickHouse

二 性能调优篇

1 kudu性能调优和报错方案解决

代码语言:javascript
复制
报错一:tablet初始化时长很久
解决方案:
    升级版本到kudu1.6.0以上版本 .参考:https://kudu.apache.org/2017/12/08/apache-kudu-1-6-0-released.html
    查看io使用情况 iostat -d -x -k 1 200.(可能是IO瓶颈)
    Recommended maximum number of tablet servers is 100.
    Recommended maximum number of tablets per tablet server is 2000.

添加描述

代码语言:javascript
复制
报错二:rpc连接超时(IO问题)
    RPC can not complete before timeout: KuduRpc(method=CreateTable, tablet=null, attempt=26, DeadlineTracker(timeout=30000, elapsed=29427)
解决方案:session.setTimeoutMillis(60000)
 
报错三:移动tablet,权限不能访问
解决方案:--superuser_acl=*

添加描述

代码语言:javascript
复制
报错四:新增master找不到元数据
解决方案:
    因为master的存储全部在本地磁盘文件,如果额外的添加了一个master,会报错,找不到consensus-meta,也就是master的容错机制,需要对master的元数据数据格式化,
    初始化的时候直接设计好。
    Recommended maximum number of masters is 3.

添加描述

代码语言:javascript
复制
报错五:minidumps文件(存储crash信息)出错
    [New I/O worker #1] WARN org.apache.kudu.client.GetMasterRegistrationReceived - None of the provided masters (hadoop6:7051) is a leader, will retry.
解决方案:
    rm -rf /home/var/lib/kudu/master/log_dir/minidumps
补充:
    minidump文件包含有关崩溃的进程的重要调试信息,包括加载的共享库及其版本,崩溃时运行的线程列表,处理器寄存器的状态和每个线程的堆栈内存副本,
    以及CPU和操作系统版本信息。Minitump可以通过电子邮件发送给Kudu开发人员或附加到JIRA,以帮助Kudu开发人员调试崩溃。
 
报错六:impala操作kudu超时
解决方案:kudu_operation_timeout_ms = 1800000

添加描述

代码语言:javascript
复制
报错七:CDH安装kudu设置master
解决方案:
--master_addresses=hadoop4:7051,hadoop5:7051,hadoop6:7051

添加描述

2 Kudu 参数优化

  1. Kudu Tablet Server Maintenance Threads 解释:Kudu后台对数据进行维护操作,如flush、compaction、inserts、updates、and deletes,一般设置为4,官网建议的是数据目录的3倍 参数:maintenance_manager_num_threads
  2. Kudu Tablet Server Block Cache Capacity Tablet 解释:分配给Kudu Tablet Server块缓存的最大内存量,建议是2-4G 参数:block_cache_capacity_mb
  3. 数据插入都kudu中,使用manual_flush策略
  4. 设置ntp服务器的时间误差不超过20s(默认是10s) 参数:max_clock_sync_error_usec=20000000
  5. Kudu Tablet Server Hard Memory Limit Kudu 解释:写性能,Tablet Server能使用的最大内存量,建议是机器总内存的百分之80,master的内存量建议是2G,Tablet Server在批量写入数据时并非实时写入磁盘, 而是先Cache在内存中,在flush到磁盘。这个值设置过小时,会造成Kudu数据写入性能显著下降。对于写入性能要求比较高的集群,建议设置更大的值 参数:memory_limit_hard_bytes
  6. 建议每个表50columns左右,不能超过300个
  7. kudu的wal只支持单目录,如果快达到极限了,就会初始化tablte失败。所以说在部署集群的时候要单独给wal设置一个单独的目
  8. impala中创建表,底层使用kudu存储(Impala::TableName),通过kudu的client端读取数据,读取不出来。
  9. kudu表如果不新建的情况下,在表中增加字段,对数据是没有影响的,kudu中增加一个字段user_id,之前impala已经和kudu进行关联操作了, impala读取kudu的数据按照之前的所定义的字段读取的。
  10. 设置client长连接过期时间,默认是7天(实际生产环境中设置的是180天) --authn_token_validity_seconds=604800 注意:设置到tserver的配置文件中
  11. tserver宕掉后,5分钟后没有恢复的情况下,该机器上的tablet会移动到其他机器,因为我们通常设置的是3个副本,其中一个副本宕掉,也就是一台机器的tserver出现故障, 实际情况下,还存在一个leader和follower,读写还是能够正常进行的,所以说这个参数很重要,保证数据不会转移。 --follower_unavailable_considered_failed_sec=300
  12. 超过参数时间的历史数据会被清理,如果是base数据不会被清理。而真实运行时数据大小持续累加,没有被清理,默认是900s。 --tablet_history_max_age_sec=900

3 kudu性能测试报告

添加描述

添加描述

添加描述

三 代码篇

1 创建 hash分区 + range分区 两者同时使用 的表、删除表

代码语言:javascript
复制
package src.main.sample;
 
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
 
import java.util.ArrayList;
import java.util.List;
 
public class CreateTable {
 
    public static void main(String[] args) {
        String tableName = "bigData";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.241.128,192.168.241.129,192.168.241.130").defaultAdminOperationTimeoutMs(60000).build();
        KuduSession session = client.newSession();
        // 此处所定义的是rpc连接超时
        session.setTimeoutMillis(60000);
 
        try {
            // 测试,如果table存在的情况下,就删除该表
            if(client.tableExists(tableName)) {
                client.deleteTable(tableName);
                System.out.println("delete the table!");
            }
 
            List<ColumnSchema> columns = new ArrayList();
 
            // 创建列
            columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("user_id", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("start_time", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
 
            // 创建schema
            Schema schema = new Schema(columns);
 
       /*
       创建 hash分区 + range分区 两者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
        */
            // id,user_id相当于联合主键,三个条件都满足的情况下,才可以更新数据,否则就是插入数据
            ImmutableList<String> hashKeys = ImmutableList.of("id","user_id");
            CreateTableOptions tableOptions = new CreateTableOptions();
       /*
        创建 hash分区 + range分区 两者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
       */
            // 设置hash分区,包括分区数量、副本数目
            tableOptions.addHashPartitions(hashKeys,3); //hash分区数量
            tableOptions.setNumReplicas(3); //副本数目
 
            // 设置range分区
            tableOptions.setRangePartitionColumns(ImmutableList.of("start_time"));
 
       // 设置range分区数量    
            // 规则:range范围为时间戳是1-10,10-20,20-30,30-40,40-50
            int count = 0;
            for(long i = 1 ; i <6 ; i++) {
                PartialRow lower = schema.newPartialRow();
                lower.addLong("start_time",count);
                PartialRow upper = schema.newPartialRow();
                count += 10;
                upper.addLong("start_time", count);
                tableOptions.addRangePartition(lower, upper);
            }
 
            System.out.println("create table is success!");
            // 创建table,并设置partition
            client.createTable(tableName, schema, tableOptions);
 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
//          client.deleteTable(tableName);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

2 修改表:增加字段、删除字段

代码语言:javascript
复制
package src.main.sample;
 
import org.apache.kudu.Type;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
 
public class AlterTable {
    public static void main(String[] args) {
        String tableName = "bigData";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
        try {
            Object o = 0L;
            // 创建非空的列
            client.alterTable(tableName, new AlterTableOptions().addColumn("device_id", Type.INT64, o));
 
            // 创建列为空
            client.alterTable(tableName, new AlterTableOptions().addNullableColumn("site_id", Type.INT64));
       // 删除字段
//            client.alterTable(tableName, new AlterTableOptions().dropColumn("site_id"));
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3 插入表数据

代码语言:javascript
复制
package src.main.sample;
 
import org.apache.kudu.client.*;
 
public class InsertData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.241.128,192.168.241.129,192.168.241.130").defaultAdminOperationTimeoutMs(60000).build();
            // 获取table
            KuduTable table = client.openTable(tableName);
 
            // 获取一个会话
            KuduSession session = client.newSession();
            session.setTimeoutMillis(60000);
            /**
             * mode形式:
             * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
             * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
             * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
             */
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); //mode形式
            session.setMutationBufferSpace(10000);// 缓冲大小,也就是数据的条数
 
            // 插入时,初始时间
            long startTime = System.currentTimeMillis();
 
            int val = 0;
            // 插入数据
            for (int i = 0; i < 60; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                // row.addString("字段名", 字段值)、row.addLong(第几列, 字段值) 
                row.addLong(0, i); //指第一个字段 "id"(hash分区的联合主键之一)
                row.addLong(1, i*100);//指第二个字段 "user_id"(hash分区的联合主键之一)
                row.addLong(2, i);//指第三个字段 "start_time"(range分区字段)
                row.addString(3, "bigData");//指第四个字段 "name"
                session.apply(insert);
                if (val % 10 == 0) {
                    session.flush(); //手动刷新提交
                    val = 0;
                }
                val++;
            }
            session.flush(); //手动刷新提交
            // 插入时结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("the timePeriod executed is : " + (endTime - startTime) + "ms");
            session.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4 三种刷新提交数据的模式

代码语言:javascript
复制
package src.main.sample;
 
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
 
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
 
/**
 * 数据刷新策略对比
 */
public class InsertFlushData {
    // 缓冲大小,也就是数据的条数
    private final static int OPERATION_BATCH = 2000;
 
    /**
     * mode形式:
     * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
     * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
     * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
     */
 
    // 支持三个模式的测试用例
    public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode, int recordCount) throws Exception {
        //设置 刷新提交模式
        session.setFlushMode(mode);
 
        //当刷新提交模式 不为 AUTO_FLUSH_SYNC(自动同步刷新)时,才设置缓冲大小(数据条数)
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
            // 缓冲大小,也就是数据的条数
            session.setMutationBufferSpace(OPERATION_BATCH);
        }
 
        int commit = 0;
 
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            // row.addString("字段名", 字段值)、row.addLong(第几列, 字段值) 
            row.addString("id", uuid.toString());
            row.addInt("value1", 16);
            row.addLong("value2", 16);
 
            Long gtmMillis;
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要时间*1000
             * 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
 
            session.apply(insert);
 
            // 对于在MANUAL_FLUSH(手动刷新)模式时,进行 手动刷新提交
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                commit = commit + 1;
                // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
                //如果要提交的数据条数 已经大于 缓冲大小(数据条数)除以2的值的话,则进行一次手动刷新提交
                if (commit > OPERATION_BATCH / 2) {
                    session.flush();//手动刷新提交
                    commit = 0;
                }
            }
        }
 
        // 对于在MANUAL_FLUSH(手动刷新)模式时,进行 手动刷新提交
        // 对于手工提交, 保证完成最后的提交
        if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && commit > 0) {
            session.flush();//手动刷新提交
        }
 
        // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
            session.flush();//手动刷新提交
            RowErrorsAndOverflowStatus error = session.getPendingErrors();
            // 检查错误收集器是否有溢出和是否有行错误
            if (error.isOverflowed() || error.getRowErrors().length > 0) {
                if (error.isOverflowed()) {
                    throw new Exception("kudu overflow exception occurred.");
                }
                StringBuilder errorMessage = new StringBuilder();
                if (error.getRowErrors().length > 0) {
                    for (RowError errorObj : error.getRowErrors()) {
                        errorMessage.append(errorObj.toString());
                        errorMessage.append(";");
                    }
                }
                throw new Exception(errorMessage.toString());
            }
        }
    }
 
    // 支持手动flush的测试用例
    public static void insertTestManualFlush(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(OPERATION_BATCH);
 
        int commit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 17);
            row.addLong("value2", 17);
            Long gtmMillis;
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要时间*1000
             * 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
            session.apply(insert);
            // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
            commit = commit + 1;
            //如果要提交的数据条数 已经大于 缓冲大小(数据条数)除以2的值的话,则进行一次手动刷新提交
            if (commit > OPERATION_BATCH / 2) {
                session.flush();//手动刷新提交
                commit = 0;
            }
        }
 
        // 对于手工提交, 保证完成最后的提交
        if (commit > 0) {
            session.flush();//手动刷新提交
        }
    }
 
    // 自动flush的测试案例
    public static void insertTestAutoFlushSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 18);
            row.addLong("value2", 18);
 
            Long gtmMillis;
 
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要时间*1000
             * 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            gtmMillis = System.currentTimeMillis();
 
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
 
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
 
            // 对于AUTO_FLUSH_SYNC模式, apply()将立即完成数据写入,但是并不是批处理
            session.apply(insert);
        }
    }
 
    /**
     * 测试案例
     */
    public static void testStrategy() throws KuduException {
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").build();
        KuduSession session = client.newSession();
        KuduTable table = client.openTable("bigData2");
 
        SessionConfiguration.FlushMode mode;
        long d1;
        long d2;
        long timeMillis;
        long seconds;
        int recordCount = 200000;
 
        try {
            // 自动刷新策略(默认的刷新策略,同步刷新)
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestAutoFlushSync(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
            // 后台刷新策略(后台批处理刷新)
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestGeneric(session, table, mode, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
            // 手动刷新
            mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestManualFlush(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!session.isClosed()) {
                session.close();
            }
        }
    }
 
    public static void createTable() {
        String tableName = "bigData2";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
        KuduSession session = client.newSession();
        session.setTimeoutMillis(60000);
 
        try {
            // 测试,如果table存在的情况下,就删除该表
            if (client.tableExists(tableName)) {
                client.deleteTable(tableName);
                System.out.println("delete the table is success!");
            }
 
            List<ColumnSchema> columns = new ArrayList();
 
            // 创建列
            columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value1", Type.INT32).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value2", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.INT64).key(true).build());
 
            // 创建schema
            Schema schema = new Schema(columns);
            /*
            创建 hash分区 + range分区 两者同时使用 的表
                addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
                setRangePartitionColumns(ImmutableList.of("字段名"))
            */
            // id和timestamp 组成 联合主键
            ImmutableList<String> hashKeys = ImmutableList.of("id", "timestamp");
            CreateTableOptions tableOptions = new CreateTableOptions();
 
            // 设置hash分区,包括分区数量、副本数目
            tableOptions.addHashPartitions(hashKeys, 20); //hash分区数量
            tableOptions.setNumReplicas(1);//副本数目
 
            System.out.println("create the table is success! ");
            // 创建table,并设置partition
            client.createTable(tableName, schema, tableOptions);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            createTable();
            testStrategy();
            /**
             *AUTO_FLUSH_SYNC is start!
              AUTO_FLUSH_SYNC花费毫秒数: 588863
              AUTO_FLUSH_BACKGROUND is start!
              AUTO_FLUSH_BACKGROUND花费毫秒数: 12284
              MANUAL_FLUSH is start!
              MANUAL_FLUSH花费毫秒数: 17231
             */
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
}
5 查询表数据
代码语言:javascript
复制
package src.main.sample;
 
import org.apache.kudu.Schema;
import org.apache.kudu.client.*;
 
import java.util.ArrayList;
import java.util.List;
 
public class SelectData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
 
            // 获取需要查询数据的列
            List<String> projectColumns = new ArrayList<String>();
            projectColumns.add("id");
            projectColumns.add("user_id");
            projectColumns.add("start_time");
            projectColumns.add("name");
 
            KuduTable table = client.openTable(tableName);
 
            // 简单的读取
            KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).build();
 
            // 根据主键设置读取的上限和下限
//            Schema schema = table.getSchema();
//            PartialRow lower = schema.newPartialRow();
//            lower.addLong("id", 10);
//            lower.addLong("user_id", 10);
//            lower.addLong("start_time", 50);
//            PartialRow upper = schema.newPartialRow();
//            upper.addLong("id", 50);
//            upper.addLong("user_id", 50);
//            upper.addLong("start_time", 50);
 
//            KuduScanner scanner = client.newScannerBuilder(table)
//                    .setProjectedColumnNames(projectColumns)
//                    .lowerBound(lower)
//                    .exclusiveUpperBound(upper)
//                    .build();
 
            while (scanner.hasMoreRows()) {
                RowResultIterator results = scanner.nextRows();
                // 15个tablet,每次从tablet中获取的数据的行数
                int numRows = results.getNumRows();
                System.out.println("numRows count is : " + numRows);
                while (results.hasNext()) {
                    RowResult result = results.next();
                    long id = result.getLong(0);
                    long user_id = result.getLong(1);
                    long start_time = result.getLong(2);
                    String name = result.getString(3);
                    System.out.println("id is : " + id + "  ===  user_id is : " + user_id + "  ===  start_time : " + start_time + "  ===  name is : " + name);
                }
                System.out.println("--------------------------------------");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
6 修改表数据
代码语言:javascript
复制
package src.main.sample;
 
import org.apache.kudu.client.*;
 
public class UpsertData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
            // 获取table
            KuduTable table = client.openTable(tableName);
            /**
             * mode形式:
             * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
             * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
             * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
             */
            // 获取一个会话
            KuduSession session = client.newSession();
            session.setTimeoutMillis(60000);
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); //手动刷新一次性提交N条数据
            session.setMutationBufferSpace(10000); // 缓冲大小,也就是数据的条数
 
            // 插入时,初始时间
            long startTime = System.currentTimeMillis();
 
            int val = 0;
            // 在使用 upsert 语句时,当前需要 三个条件(key)都满足的情况下,才可以更新数据,否则就是插入数据
            // 三个条件(key) 分别指的是 hash分区的联合主键id、user_id,还有range分区字段 start_time
            for (int i = 0; i < 60; i++) {
                //upsert into 表名 values (‘xx’,123) 如果指定的values中的主键值 在表中已经存在,则执行update语义,反之,执行insert语义。
                Upsert upsert = table.newUpsert();
                PartialRow row = upsert.getRow();
                row.addLong(0, i); //指第一个字段 "id"(hash分区的联合主键之一)
                row.addLong(1, i*100); //指第二个字段 "user_id"(hash分区的联合主键之一)
                row.addLong(2, i); //指第三个字段 "start_time"(range分区字段)
                row.addString(3, "bigData"+i); //指第四个字段 "name"
                session.apply(upsert);
                if (val % 10 == 0) {
                    session.flush(); //手动刷新提交
                    val = 0;
                }
                val++;
            }
            session.flush(); //手动刷新提交
            // 插入时结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("the timePeriod executed is : " + (endTime - startTime) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript
复制
impala命令刷新元数据
    1.impala-shell 命令进入交互界面 执行 invalidate metadata; 命令刷新元数据 
    2.Hue的wen页面中,在impala执行sql的窗口 执行 invalidate metadata; 命令刷新元数据 
    
--------------------------------------------------------------------------
从Impala创建一个新的Kudu表
    从Impala在Kudu中创建新表类似于将现有Kudu表映射到Impala表,除了您需要自己指定模式和分区信息。
    使用以下示例作为指导。Impala首先创建表,然后创建映射。
    Impala 中创建一个新的 Kudu 表
    
        CREATE TABLE my_first_table
        (
          id BIGINT,
          name STRING,
          PRIMARY KEY(id)
        )
        PARTITION BY HASH PARTITIONS 16
        STORED AS KUDU;
 
    在CREATE TABLE语句中,必须首先列出组成主键的列。此外,隐式标记主键列NOT NULL。
    创建新的Kudu表时,您需要指定分发方案。
    请参阅分区表:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_tables
    为了为简单起见,上面的表创建示例通过散列 id 列分成 16 个分区。
    有关分区的指导,请参阅 分区规则:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_rules_of_thumb
 
    
CREATE TABLE AS SELECT
    您可以使用 CREATE TABLE ... AS SELECT 语句查询 Impala 中的任何其他表或表来创建表。
    以下示例将现有表 old_table 中的所有行导入到 Kudu 表 new_table 中。 
    new_table 中的列的名称和类型 将根据 SELECT 语句的结果集中的列确定。
    请注意,您必须另外指定主键和分区。
 
        CREATE TABLE new_table
        PRIMARY KEY (ts, name)
        PARTITION BY HASH(name) PARTITIONS 8
        STORED AS KUDU
        AS SELECT ts, name, value FROM old_table;
        
--------------------------------------------------------------------------
 
在Impala中查询现有的Kudu表:Impala中创建映射Kudu表的外部映射表
    通过Kudu API或其他集成(如Apache Spark)创建的表在Impala中不会自动显示。
    要查询它们,必须首先在Impala中创建外部表,以将Kudu表映射到Impala数据库:
    
        CREATE EXTERNAL TABLE `bigData` STORED AS KUDU
        TBLPROPERTIES(
            'kudu.table_name' = 'bigData',
            'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')
            
            
查询 Impala 中现有的 Kudu 表(Impala中创建映射表(外部表)映射Kudu中的表)
    通过 Kudu API 或其他集成(如 Apache Spark )创建的表不会在 Impala 中自动显示。
    要查询它们,您必须先在 Impala 中创建外部表以将 Kudu 表映射到 Impala 数据库中:
 
        CREATE EXTERNAL TABLE my_mapping_table
        STORED AS KUDU
        TBLPROPERTIES (
          'kudu.table_name' = 'my_kudu_table'
        );
 
--------------------------------------------------------------------------
 
内部和外部 Impala 表
    使用 Impala 创建新的 Kudu 表时,可以将表创建为内部表或外部表。
 
Internal ( 内部表 )
    内部表由 Impala 管理,当您从 Impala 中删除时,数据和表确实被删除。当您使用 Impala 创建新表时,通常是内部表。
 
External ( 外部表 )
    外部表(由 CREATE EXTERNAL TABLE 创建)不受 Impala 管理,并且删除此表不会将表从其源位置(此处为 Kudu)丢弃。
    相反,它只会去除 Impala 和 Kudu 之间的映射。这是 Kudu 提供的用于将现有表映射到 Impala 的语法。
            
--------------------------------------------------------------------------
 
Kudu中的分区方法主要有两种:partition by hash 和 partition by range 
    kudu表基于其partition方法被拆分成多个分区,每个分区就是一个tablet,一张kudu表所属的所有tablets均匀分布并存储在tablet servers的磁盘上。
    因此在创建kudu表的时候需要声明该表的partition方法,同时要指定primary key作为partition的依据。
    
基于hash的分区方法的基本原理是:
    基于primary key的hash值将每个row(行)划分到相应的tablet当中,分区的个数即tablet的个数必须在创建表语句中指定,建表语句示例如下:
    注:如果未指定基于某个字段的hash值进行分区,默认以主键的hash值进行分区。
        create table test
        (
            name string,
            age int,
            primary key (name)
        )
        partition by hash (name) partitions 8
        stored as kudu;
 
基于range的分区方法的基本原理是:
    基于指定主键的取值范围将每个row(行)划分到相应的tablet当中,用于range分区的主键以及各个取值范围都必须在建表语句中声明,建表语句示例如下:
    例子:有班级、姓名、年龄三个字段,表中的每个row将会根据其所在的班级划分成四个分区,每个分区就代表一个班级。
        create table test
        (
            classes int,
            name string,
            age int,
            primary key (classes,name)
        )
        partition by range (classes)
        (
            partition value = 1,
            partition value = 2,
            partition value = 3,
            partition value = 4
        )
        stored as kudu;
 
        
kudu表还可以采用基于hash和基于range相结合的分区方式 
    /*
    创建 hash分区 + range分区 两者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
    */
    // 设置hash分区,包括分区数量、副本数目
    tableOptions.addHashPartitions(hashKeys,3); //hash分区数量
    tableOptions.setNumReplicas(3); //副本数目
 
    // 设置range分区
    tableOptions.setRangePartitionColumns(ImmutableList.of("start_time"));
            
--------------------------------------------------------------------------
        
kudu表支持3种insert语句:
    1.insert into test values(‘a’,12);
    2.insert into test values(‘a’,12),(‘b’,13),(‘c’,14);
    3.insert into test select * from other_table;
 
update语句
    kudu表的update操作不能更改主键的值,其他与标准sql语法相同。
 
upsert 语句
    对于 upsert into test values (‘a’,12) 
    如果指定的values中的主键值 在表中已经存在,则执行update语义,反之,执行insert语义。
    注意:如果同时存在 主键/联合主键、hash分区字段、range分区字段时,那么便要求三个条件都符合的情况下,才可以更新数据,否则就是插入数据。
    
delete语句
    与标准sql语法相同。
 
--------------------------------------------------------------------------
 
Impala 中创建一个新的 Kudu 表
        create table test
        (
        classes int,
        name string,
        age int,
        primary key (classes,name)
        )
        partition by range (classes)
        (
        partition value = 1,
        partition value = 2,
        partition value = 3,
        partition value = 4
        )
        stored as kudu;
 
        insert into test values(1,"nagisa",16);
        select * from test;
        
kudu webUI页面显示
    impala::default.test
    
Impala中创建映射Kudu表的外部映射表
    CREATE EXTERNAL TABLE `EXTERNAL_test` STORED AS KUDU
    TBLPROPERTIES(
        'kudu.table_name' = 'impala::default.test',
        'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051');
        
    insert into test values(2,"ushio",5);
    select * from EXTERNAL_test;
    
--------------------------------------------------------------------------
 
指定 Tablet Partitioning ( Tablet 分区 )
    表分为每个由一个或多个 tablet servers 提供的 tablets 。理想情况下,tablets 应该相对平等地拆分表的数据。 
    Kudu 目前没有自动(或手动)拆分预先存在的 tablets  的机制。在实现此功能之前,您必须在创建表时指定分区。
    在设计表格架构时,请考虑使用主键,您可以将表拆分成以类似速度增长的分区。
    使用 Impala 创建表时,可以使用 PARTITION BY 子句指定分区:
    注意:Impala 关键字(如 group)在关键字意义上不被使用时,由背面的字符包围。
    
        CREATE TABLE cust_behavior 
        (
              _id BIGINT PRIMARY KEY,
              salary STRING,
              edu_level INT,
              usergender STRING,
              `group` STRING,
              city STRING,
              postcode STRING,
              last_purchase_price FLOAT,
              last_purchase_date BIGINT,
              category STRING,
              sku STRING,
              rating INT,
              fulfilled_date BIGINT
        )
        PARTITION BY RANGE (_id)
        (
            PARTITION VALUES < 1439560049342,
            PARTITION 1439560049342 <= VALUES < 1439566253755,
            PARTITION 1439566253755 <= VALUES < 1439572458168,
            PARTITION 1439572458168 <= VALUES < 1439578662581,
            PARTITION 1439578662581 <= VALUES < 1439584866994,
            PARTITION 1439584866994 <= VALUES < 1439591071407,
            PARTITION 1439591071407 <= VALUES
        )
        STORED AS KUDU;
    
    
如果您有多个主键列,则可以使用元组语法指定分区边界:('va',1),('ab',2)。该表达式必须是有效的 JSON
 
Impala 数据库和 Kudu
    每个 Impala 表都包含在称为数据库的命名空间中。默认数据库称为默认数据库,用户可根据需要创建和删除其他数据库
 
当从 Impala 中创建一个受管 Kudu 表时,相应的 Kudu 表将被命名为 my_database :: table_name
 
不支持 Kudu 表的 Impala 关键字
    创建 Kudu 表时不支持以下 Impala 关键字: - PARTITIONED - LOCATION - ROWFORMAT
    
    
--------------------------------------------------------------------------
 
优化评估 SQL 谓词的性能
    如果您的查询的 WHERE 子句包含与 operators = , <= , '\ , '\' , > = , BETWEEN 或 IN 的比较,则 Kudu 直接评估该条件,只返回相关结果。
    这提供了最佳性能,因为 Kudu 只将相关结果返回给 Impala 。
    对于谓词 != , LIKE 或 Impala 支持的任何其他谓词类型, Kudu 不会直接评估谓词,而是将所有结果返回给 Impala ,并依赖于 Impala 来评估剩余的谓词并相应地过滤结果。
    这可能会导致性能差异,这取决于评估 WHERE 子句之前和之后的结果集的增量。
分区表
    根据主键列上的分区模式将表格划分为 tablets 。每个 tablet 由至少一台 tablet server 提供。
    理想情况下,一张表应该分成多个 tablets 中分布的 tablet servers ,以最大化并行操作。您使用的分区模式的详细信息将完全取决于您存储的数据类型和访问方式。关于 Kudu 模式设计的全面讨论,请参阅 Schema Design。
Kudu 目前没有在创建表之后拆分或合并 tablets 的机制。创建表时,必须为表提供分区模式。在设计表格时,请考虑使用主键,这样您就可以将表格分为以相同速率增长的 tablets 。
您可以使用 Impala 的 PARTITION BY 关键字对表进行分区,该关键字支持 RANGE 或 HASH 分发。分区方案可以包含零个或多个 HASH 定义,后面是可选的 RANGE 定义。 RANGE 定义可以引用一个或多个主键列。基本 和 高级分区 的示例如下所示。

上文

https://cloud.tencent.com/developer/article/1964369

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二 性能调优篇
    • 1 kudu性能调优和报错方案解决
      • 2 Kudu 参数优化
        • 3 kudu性能测试报告
        • 三 代码篇
          • 1 创建 hash分区 + range分区 两者同时使用 的表、删除表
            • 2 修改表:增加字段、删除字段
              • 3 插入表数据
                • 4 三种刷新提交数据的模式
                • 上文
                相关产品与服务
                数据库
                云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档