最近在招聘要求下突然看到了Apache kudu 于是花了几天时间研究了下,下面简单的给大家介绍下 记得收藏。
在KUDU之前,大数据主要以两种方式存储;
【1】:静态数据
以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。这类存储的局限性是数据无法进行随机的读写。【2】:动态数据 以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。 局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。 从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢? 这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求,如下图所示:
如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:Impala、hive)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但他有如下缺点:
从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁。
官方介绍:Kudu 是为 Apache Hadoop 平台开发的列式存储管理器。Kudu 具有 Hadoop 生态系统应用程序的共同技术属性:它在商品硬件上运行,具有水平可扩展性,并支持高可用操作。简单来说:kudu是一个与Hbase类似的列式存储分布式数据库。
HDFS 与HBase的数据存储的缺点目前数据存储有了HDFS与HBase,为什么还要额弄一个kudu呢?HDFS : 使用列式存储格式Apache Parquet , Apache ORC,适合离线分析,不支持单条记录级别的update操作,随机读写能力差HBase :可以进行高效读写,却并不是适合基于SQL的数据分析方向,大批量数据获取的性能差。kudu : 正因为HDFS与HBase有上面这些缺点,kudu较好的解决了HDFS与HBase的这些特点,它不及HDFS批处理快,也不及HBase随机读写能力强,但反过来它比HBase批处理快,而且比HDFS随机读写能力强(适合实时写入或这更新场景频繁的场景).这就是他能解决的问题。
客户端将要读取的数据信息发送给master,master对其进行一定的校验,比如表是否存在,字段是否存在,Master返回元数据信息给clinet,然后client与tserver建立连接,通过metaData找到数据所在的rowSet,首先加载内存里面的数据(MemRowSet与DeltMemStore),然后加载磁盘里面的数据,最后返回最终数据给clinet。
Client首先建立master,获取元数据信息, 然后连接 tserver,查询MemRowSet与DeltMemStroe中是否存在相同primary Key,如果存在,则报错,如果不存在,则将待插入的数据写入WAL日志,然后将数据写入MemRowSet。
每个节点运行
yum -y install ntp
注释一下四行
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
在第一台服务器node01
vim /etc/ntp.conf
添加如下内容
restrict 192.168.100.0 mask 255.255.255.0 notrap nomodify
server node01 prefer
server 127.127.1.0
fudge 127.127.1.0 stratum 10
修改node02和node03
vi /etc/ntp.conf
添加如下内容
server node01 prefer
server 127.127.1.0
fudge 127.127.1.0 stratum 10
启动NTP服务
service ntpd start
chkconfig ntpd on
通过ntpstat 检测服务是否正常
[root@node01 ~]# ntpstat
synchronised to local net (127.127.1.0) at stratum 11
time correct to within 11 ms
polling server every 64 s
每个节点都执行(需要安装包后台回复:kudu 安装包)
yum install -y kudu-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
yum install -y kudu-client0-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
yum install -y kudu-client-devel-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
yum install -y kudu-debuginfo-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
yum install -y kudu-master-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
yum install -y kudu-tserver-1.4.0+cdh5.12.2+0-1.cdh5.12.2.p0.8.el7.x86_64.rpm
修改配置 /etc/default/kudu-master (三个节点都需要改下)
export FLAGS_log_dir=/var/log/kudu
export FLAGS_rpc_bind_addresses=node01:7051
修改配置 /etc/default/kudu-tserver
export FLAGS_log_dir=/var/log/kudu
export FLAGS_rpc_bind_addresses=node01:7050
启动服务测试
/etc/init.d/kudu-master start (node01)
Started Kudu Master Server (kudu-master): [ 确定 ]
/etc/init.d/kudu-tserver start(node01)
Started Kudu Tablet Server (kudu-tserver): [ 确定 ]
/etc/init.d/kudu-tserver start(node01)
Started Kudu Tablet Server (kudu-tserver): [ 确定 ]
/etc/init.d/kudu-tserver start(node02)
Started Kudu Tablet Server (kudu-tserver): [ 确定 ]
通过浏览器查看 http://node01:8051/tablet-servers
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.4.0</version>
</dependency>
private static ColumnSchema newColumn(String name, Type type, Boolean isKey){
ColumnSchema.ColumnSchemaBuilder column =
new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);
return column.build();
}
public static void main(String[] args) {
String masterId = "192.168.100.101";
KuduClient client = new KuduClient.KuduClientBuilder(masterId).defaultAdminOperationTimeoutMs(6000).build();
try {
LinkedList<ColumnSchema> columns = new LinkedList<>();
columns.add(newColumn("id", Type.INT32,true));
columns.add(newColumn("name",Type.STRING,false));
Schema schema = new Schema(columns);
CreateTableOptions options = new CreateTableOptions();
LinkedList<String> parcols = new LinkedList<>();
parcols.add("id");
options.setNumReplicas(1);
options.addHashPartitions(parcols, 3);
client.createTable("test", schema, options);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String masterId = "192.168.100.101";
KuduClient client = new KuduClient.KuduClientBuilder(masterId).defaultAdminOperationTimeoutMs(6000).build();
try {
KuduTable table = client.openTable("test");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.setMutationBufferSpace(3000);
for (int i = 0; i < 10; i++) {
Insert insert = table.newInsert();
insert.getRow().addInt("id", i);
insert.getRow().addString("name", String.valueOf("name" + i));
kuduSession.flush();
kuduSession.apply(insert);
}
kuduSession.close();
}catch (Exception e){
}
}
public static void main(String[] args) {
String masterId = "192.168.100.101";
KuduClient client = new KuduClient.KuduClientBuilder(masterId).defaultAdminOperationTimeoutMs(6000).build();
try {
KuduTable table = client.openTable("test");
KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
Update update = table.newUpdate();
PartialRow row = update.getRow();
row.addInt("id", 1);
row.addString("name", "大家好");
kuduSession.apply(update);
kuduSession.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String masterId = "192.168.100.101";
KuduClient client = new KuduClient.KuduClientBuilder(masterId).defaultAdminOperationTimeoutMs(6000).build();
try {
KuduTable table = client.openTable("test");
KuduScanner scanner = client.newScannerBuilder(table).build();
while (scanner.hasMoreRows()) {
for (RowResult rowResult : scanner.nextRows()) {
System.out.println(rowResult.getInt("id") + "\t" + rowResult.getString("name"));
}
}
scanner.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String masterId = "192.168.100.101";
KuduClient client = new KuduClient.KuduClientBuilder(masterId).defaultAdminOperationTimeoutMs(6000).build();
try {
KuduTable table = client.openTable("test");
KuduSession session = client.newSession();
Delete delete = table.newDelete();
delete.getRow().addInt("id", 1);
session.flush();
session.apply(delete);
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
val sparkConf = new SparkConf().setAppName("kudu").setMaster("local")
// 创建spark 环境
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 构建kudu上下文
val kuduContext = new KuduContext("192.168.100.101", spark.sparkContext)
val kuduTableName = "spark_kudu_table_student4"
// 定义schema
var schema = StructType(
StructField("id", StringType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil
)
// 定义主键
val kuduTablePrimaryKey = Seq("id")
val createTableOptions = new CreateTableOptions()
createTableOptions.setRangePartitionColumns(Seq("id").asJava)
.setNumReplicas(3)
kuduContext.createTable(kuduTableName, schema, kuduTablePrimaryKey, createTableOptions)
spark.close()
spark.close()
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("kudu").setMaster("local")
// 创建spark 环境
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 构建kudu上下文
val kuduContext = new KuduContext("192.168.100.101", spark.sparkContext)
val kudu_table_name = "spark_kudu_table_student2"
val table = kuduContext.tableExists(kudu_table_name)
if (table) {
kuduContext.deleteTable(kudu_table_name)
}
spark.close()
spark.close()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("kudu").setMaster("local")
// 创建spark 环境
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 构建kudu上下文
val kuduContext = new KuduContext("192.168.100.101", spark.sparkContext)
val value: RDD[Row] = kuduContext.kuduRDD(spark.sparkContext, "spark_kudu_table_student4",Seq("id","name","age"))
value.foreach(println)
spark.close()
spark.close()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("kudu").setMaster("local")
// 创建spark 环境
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 构建kudu上下文
val kuduContext = new KuduContext("192.168.100.101", spark.sparkContext)
import spark.implicits._
val data = spark.read.text("./data/user.txt").map(row => {
val arr = row.get(0).toString.split(",")
(arr(1), arr(0), arr(2).toInt)
}).toDF("id", "name", "age").dropDuplicates("id")
kuduContext.insertRows(data, "spark_kudu_table_student4")
spark.close()
spark.close()
}
大数据老哥
希望这篇文章可以帮到你~ 记得点赞收藏哦