前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >快速学习-Druid数据摄入

快速学习-Druid数据摄入

作者头像
cwl_java
发布2022-11-30 08:55:42
7150
发布2022-11-30 08:55:42
举报
文章被收录于专栏:cwl_Javacwl_Java

第6章 数据摄入

6.1 数据格式

  1. 摄入规范化数据:JSON、CSV、TSV
  2. 自定义格式
  3. 其他格式

6.2 配置

主要是摄入的规则ingestion Spec 摄入规则主要包含3个部分

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
{
  "dataSchema" : {...},
  "ioConfig" : {...},
  "tuningConfig" : {...}
}

6.2.1 DataSchema

在这里插入图片描述
在这里插入图片描述

1、parser (1)string parser

在这里插入图片描述
在这里插入图片描述

parseSpec两个功能:

  • String Parser用parseSpec判定将要处理rows的数据格式( JSON, CSV, TSV)
  • 所有的Parsers 用parseSpec判定将要处理rows的 timestamp 和 dimensionsAll

format字段默认为tsv格式 JSON ParseSpec

在这里插入图片描述
在这里插入图片描述

CSV ParseSpec

在这里插入图片描述
在这里插入图片描述

TSV ParseSpec

在这里插入图片描述
在这里插入图片描述

TimestampSpec

在这里插入图片描述
在这里插入图片描述

DimensionsSpec

在这里插入图片描述
在这里插入图片描述

2. metricsSpec

在这里插入图片描述
在这里插入图片描述

一些简单的聚合函数: count 、longSum、longMin、longMax、doubleSum、doubleMin、doubleMax

3、GranularitySpec

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
"dataSchema" : {
  "dataSource" : "wikipedia",
  "parser" : {
    "type" : "string",
    "parseSpec" : {
      "format" : "json",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name" : "commentLength", "type" : "long" },
          { "name" : "deltaBucket", "type" : "long" },
          "flags",
          "diffUrl",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      }
    }
  },
  "metricsSpec" : [],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "day",
    "queryGranularity" : "none",
    "intervals" : ["2016-06-27/2016-06-28"],
    "rollup" : false
  }
}

6.2.2 ioConfig

ioConfig 指明了真正具体的数据源

在这里插入图片描述
在这里插入图片描述

不同的firehose的格式不太一致,以kafka为例

代码语言:javascript
复制
{
    firehose : {
        consumerProps : {
            auto.commit.enable : false
            auto.offset.reset : largest
            fetch.message.max.bytes : 1048586
            group.id : druid-example
            zookeeper.connect : localhost:2181
                       zookeeper.connect.timeout.ms : 15000
                        zookeeper.session.timeout.ms : 15000
                        zookeeper.sync.time.ms : 5000
        },
        feed : wikipedia
        type : kafka-0.8
    }
}

ioConfig 的案例:

代码语言:javascript
复制
"ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "wikipedia-2016-06-27-sampled.json"
      },
      "appendToExisting" : false
    }

6.2.3 tuningConfig

tuningConfig 这部分配置是优化数据输入的过程

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
"tuningConfig" : {
      "type" : "index",
      "targetPartitionSize" : 5000000,
      "maxRowsInMemory" : 25000,
      "forceExtendableShardSpecs" : true
}

6.3 从hadoop加载数据

6.3.1加载数据

批量摄取维基百科样本数据,文件位于quickstart/wikipedia-2016-06-27-sampled.json。使用quickstart/wikipedia-index-hadoop.json 摄取任务文件。

代码语言:javascript
复制
 bin/post-index-task --file quickstart/wikipedia-index-hadoop.json

此命令将启动Druid Hadoop摄取任务。 摄取任务完成后,数据将由历史节点加载,并可在一两分钟内进行查询。

6.3.2查询数据

在这里插入图片描述
在这里插入图片描述

6.4 从kafka加载数据

6.4.1 准备kafka

  1. 启动kafka
代码语言:javascript
复制
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties
  1. 创建wikipedia主题
代码语言:javascript
复制
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 –topic
wikipedia --partitions 1 --replication-factor 1 –create
Created topic "wikipedia".
  1. 查看主题是否创建成功
代码语言:javascript
复制
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
__consumer_offsets
first
wikipedia

6.4.2 启动索引服务

我们将使用Druid的Kafka索引服务从我们新创建的维基百科主题中提取消息。要启动该服务,我们需要通过从Imply目录运行以下命令向

代码语言:javascript
复制
Druid的overlord提交supervisor spec
[atguigu@hadoop102 imply-2.7.10]$ curl -XPOST -H'Content-Type: application/json' -d
 @quickstart/wikipedia-kafka-supervisor.json http://hadoop102:8090/druid/indexer/v1/supervisor

说明: curl是一个利用URL规则在命令行下工作的文件传输工具。它支持文件的上传和下载,所以是综合传输工具。

  • -X 为 HTTP 数据包指定一个方法,比如 PUT、DELETE。默认的方法是 GET 6.4.3
  • -H 为 HTTP 数据包指定 Header 字段内容
  • -d 为 POST 数据包指定要向 HTTP 服务器发送的数据并发送出去,如果的内容以符号 @ 开头,其后的字符串将被解析为文件名,curl 命令会从这个文件中读取数据发送。

6.4.3 加载历史数据

启动kafka生产者生产数据

代码语言:javascript
复制
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 –
-topic wikipedia < /opt/module/imply-2.7.10/quickstart/wikipedia-2016-06-27-sampled.json

说明: < 将文件作为命令输入 可在kafka本地看到相应的数据生成

代码语言:javascript
复制
[atguigu@hadoop103 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu   4096 3月  30 11:16 wikipedia-0
 [atguigu@hadoop103 logs]$ pwd
/opt/module/kafka/logs

将样本事件发布到Kafka的wikipedia主题,然后由Kafka索引服务将其提取到Druid中。你现在准备运行一些查询了!

在这里插入图片描述
在这里插入图片描述

6.4.4 加载实时数据

下载一个帮助应用程序,该应用程序将解析维基媒体的IRC提要中的event,并将这些event发布到我们之前设置的Kafka的wikipedia主题中。

代码语言:javascript
复制
[atguigu@hadoop102 imply-2.7.10]$ curl -O
 https://static.imply.io/quickstart/wikiticker-0.4.tar.gz

说明: -O 在本地保存获取的数据时,使用她们在远程服务器上的文件名进行保存。

代码语言:javascript
复制
[atguigu@hadoop102 imply-2.7.10]$ tar -zxvf wikiticker-0.4.tar.gz
[atguigu@hadoop102 imply-2.7.10]$ cd wikiticker-0.4
现在运行带有参数的wikiticker,指示它将输出写入我们的Kafka主题:
[atguigu@hadoop102 wikiticker-0.4]$ bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka –
topic Wikipedia

查询多次,对比结果的变化

在这里插入图片描述
在这里插入图片描述

6.4.5 加载自定义kafka主题数据

可以通过编写自定义supervisor spec来加载自己的数据集。要自定义受监督的Kafka索引服务提取,您可以将包含的quickstart/wikipedia-kafka-supervisor.json 规范复制到自己的文件中,根据需要进行编辑,并根据需要创建或关闭管理程序。没有必要自己重启Imply或Druid服务。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第6章 数据摄入
    • 6.1 数据格式
      • 6.2 配置
        • 6.2.1 DataSchema
        • 6.2.2 ioConfig
        • 6.2.3 tuningConfig
      • 6.3 从hadoop加载数据
        • 6.3.1加载数据
        • 6.3.2查询数据
      • 6.4 从kafka加载数据
        • 6.4.1 准备kafka
        • 6.4.2 启动索引服务
        • 6.4.3 加载历史数据
        • 6.4.4 加载实时数据
        • 6.4.5 加载自定义kafka主题数据
    相关产品与服务
    文件存储
    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档