前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据Apache Druid(六):Druid流式数据加载

大数据Apache Druid(六):Druid流式数据加载

原创
作者头像
Lansonli
发布2022-08-22 00:44:15
5320
发布2022-08-22 00:44:15
举报
文章被收录于专栏:Lansonli技术博客

Druid流式数据加载

一、​​​​​​​Druid与Kafka整合

1、​​​​​​​使用webui加载Kafka数据

Druid也可以与Kafka整合,直接读取Kafka中某个topic的数据在Druid中进行OLAP分析,步骤如下:

  • 启动Kafka,在Kafka中创建topic
代码语言:javascript
复制
#创建Kafka topic
[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic druid-topic  --partitions 3 --replication-factor 3

#向创建的topic中生产一条数据,这里为了方便后面Druid解析数据
[root@node1 bin]# ./kafka-console-producer.sh  --topic druid-topic --broker-list node1:9092,node2:9092,node3:9092
>{"data_dt":"2021-07-01T08:13:23.000Z","uid":"uid001","loc":"北京","item":"衣服","amount":"100"}

  • 进入Druid主页,加载Kafka中数据

进入Druid主页http://node5:8888,点击“Load data”标签:

填写Kafka Server、Topic、点击“Parse data”:

2、​​​​​​​​​​​​​​查询Druid中的数据

点击“Query”编写SQL ,查询DataSource “druid-topic”数据如下:

向Kafka topic druid-topic中继续写入如下数据:

代码语言:javascript
复制
{"data_dt":"2021-07-01T08:20:13.000Z","uid":"uid001","loc":"北京","item":"手机","amount":"200"}
{"data_dt":"2021-07-01T09:24:46.000Z","uid":"uid002","loc":"上海","item":"书籍","amount":"300"}
{"data_dt":"2021-07-01T09:43:42.000Z","uid":"uid002","loc":"上海","item":"书籍","amount":"400"}
{"data_dt":"2021-07-01T09:53:42.000Z","uid":"uid002","loc":"上海","item":"书籍","amount":"500"}
{"data_dt":"2021-07-01T12:19:52.000Z","uid":"uid003","loc":"天津","item":"水果","amount":"600"}
{"data_dt":"2021-07-01T14:53:13.000Z","uid":"uid004","loc":"广州","item":"生鲜","amount":"700"}
{"data_dt":"2021-07-01T15:51:45.000Z","uid":"uid005","loc":"深圳","item":"手机","amount":"800"}
{"data_dt":"2021-07-01T17:21:21.000Z","uid":"uid006","loc":"杭州","item":"电脑","amount":"900"}
{"data_dt":"2021-07-01T20:26:53.000Z","uid":"uid007","loc":"湖南","item":"水果","amount":"1000"}
{"data_dt":"2021-07-01T09:38:11.000Z","uid":"uid008","loc":"山东","item":"书籍","amount":"1100"}

执行聚合查询:select loc,item,sum(amount) as total_amount from "druid-topic" group by loc,item

3、删除Druid数据

删除Druid数据,首先在Ingestion中停止实时接收数据的任务:

然后再DataSource中使所有Segment无效后,再彻底删除对应的数据:

4、​​​​​​​​​​​​​​使用post方式加载Kafka数据

由于前面已经使用Druid加载过当前Kafka“druid-topic”topic的数据,当停止Druid supervisors 中实时读取Kafka topic 任务后,在MySQL 库表“druid.druid_datasource”中会存放当前datasource读取kafka topic的offset信息,如果使用post方式再次提交实时任务生成一样的datasource名称读取相同的Kafka topic时,会获取到该位置的offset信息,所以为了能从头消费Kafka中的数据,我们可以将mysql中“druid.druid_datasource”对应的datasource数据条目删除:

准备json配置,使用postman来提交加载Kafka的任务,配置如下:

代码语言:javascript
复制
{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "node1:9092,node2:9092,node3:9092"
      },
      "topic": "druid-topic",
      "inputFormat": {
        "type": "json"
      },
      "useEarliestOffset": true
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "druid-topic",
      "timestampSpec": {
        "column": "data_dt",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": [
          {
            "type": "long",
            "name": "amount"
          },
          "item",
          "loc",
          "uid"
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    }
  }
}

打开postman,post请求URL:http://node3:8081/druid/indexer/v1/supervisor,在row中写入以上json配置数据提交即可,执行之后可以在Druid页面中看到对应的supervisors和Datasource。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​Druid流式数据加载
    • 一、​​​​​​​Druid与Kafka整合
      • 1、​​​​​​​使用webui加载Kafka数据
      • 2、​​​​​​​​​​​​​​查询Druid中的数据
      • 3、删除Druid数据
      • 4、​​​​​​​​​​​​​​使用post方式加载Kafka数据
相关产品与服务
大数据处理套件 TBDS
腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档