前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >快速学习-Druid数据摄入之Tranquility

快速学习-Druid数据摄入之Tranquility

作者头像
cwl_java
发布2022-11-30 08:56:03
2180
发布2022-11-30 08:56:03
举报
文章被收录于专栏:cwl_Javacwl_Java

第7章 Druid数据摄入之Tranquility

在上面的内容中,我们学习了如何通过索引服务器摄取数据,然而索引服务器的API太过底层,运用起来比较麻烦。Tranquility对索引服务的API进行了封装,可以方便创建任务,处理分片,复制、服务器发现以及无缝的数据结构调整。

向Druid发送数据,需将Tranquility库依赖到程序中。

地址:https://github.com/druid-io/tranquility

Tranquility提供了两种Core API:

  • Tranquilizer,一个高级API,发送单个message。
  • Beam,一种低级API,发送批次数据。

7.1 高级API

7.1.1环境准备

依赖

代码语言:javascript
复制
<dependencies>
        <dependency>
            <groupId>io.druid</groupId>
            <artifactId>tranquility-core_2.11</artifactId>
            <version>0.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.metamx</groupId>
            <artifactId>java-util</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>

数据格式的配置:

代码语言:javascript
复制
{
  "dataSources": [
    {
      "spec": {
        "dataSchema": {
          "dataSource": "wikipedia02",
          "parser": {
            "type": "string",
            "parseSpec": {
              "format": "json",
              "timestampSpec": {
                "column": "timestamp",
                "format": "auto"
              },
              "dimensionsSpec": {
                "dimensions": [
                  "page",
                  "language",
                  "user",
                  "unpatrolled",
                  "newPage",
                  "robot",
                  "anonymous",
                  "namespace",
                  "continent",
                  "country",
                  "region",
                  "city"
                ],
                "dimensionExclusions": [],
                "spatialDimensions": []
              }
            }
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            },
            {
              "type": "doubleSum",
              "name": "added",
              "fieldName": "added"
            },
            {
              "type": "doubleSum",
              "name": "deleted",
              "fieldName": "deleted"
            },
            {
              "type": "doubleSum",
              "name": "delta",
              "fieldName": "delta"
            }
          ],
          "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "DAY",
            "queryGranularity": "NONE"
          }
        },
        "tuningConfig": {
          "type": "realtime",
          "maxRowsInMemory": 100000,
          "intermediatePersistPeriod": "PT10m",
          "windowPeriod": "PT10M"
        }
      }
    }
  ],
  "properties": {
    "zookeeper.connect": "hadoop102:2181",
    "druid.selectors.indexing.serviceName": "druid/overlord",
    "druid.discovery.curator.path": "/druid/discovery",
    "druidBeam.taskLocator": "overlord",
    "druidBeam.overlordPollPeriod": "PT5S"
  }
}

7.1.2 Java代码编写

代码语言:javascript
复制
public class JavaExample {
    private static final Logger log = new Logger(JavaExample.class);

    public static void main(String[] args)
    {
        // Read config from "example.json" on the classpath.
        final InputStream configStream = JavaExample.class.getClassLoader().getResourceAsStream("example.json");
        final TranquilityConfig<PropertiesBasedConfig> config = TranquilityConfig.read(configStream);
        final DataSourceConfig<PropertiesBasedConfig> wikipediaConfig = config.getDataSource("wikipedia02");
        final Tranquilizer<Map<String, Object>> sender = DruidBeams.fromConfig(wikipediaConfig)
                .buildTranquilizer(wikipediaConfig.tranquilizerBuilder());

        sender.start();

        try {
            // Send 10000 objects

            for (int i = 0; i < 10; i++) {
                // Build a sample event to send; make sure we use a current date
                final Map<String, Object> obj = ImmutableMap.<String, Object>of(
                        "timestamp", new DateTime().toString(),
                        "page", "foo",
                        "added", i
                );

                // Asynchronously send event to Druid:
                sender.send(obj).addEventListener(
                        new FutureEventListener<BoxedUnit>()
                        {
                            @Override
                            public void onSuccess(BoxedUnit value)
                            {
                                log.info("Sent message: %s", obj);
                            }

                            @Override
                            public void onFailure(Throwable e)
                            {
                                if (e instanceof MessageDroppedException) {
                                    log.warn(e, "Dropped message: %s", obj);
                                } else {
                                    log.error(e, "Failed to send message: %s", obj);
                                }
                            }
                        }
                );
            }
        }
        finally {
            sender.flush();
            sender.stop();
        }
    }
}

7.1.3 scala代码编写

代码语言:javascript
复制
object ScalaExample extends Logging
{
  def main(args: Array[String]) {
    // Read config from "example.json" on the classpath.
    val configStream = getClass.getClassLoader.getResourceAsStream("example.json")
    val config: TranquilityConfig[PropertiesBasedConfig] = TranquilityConfig.read(configStream)
    val wikipediaConfig: DataSourceConfig[PropertiesBasedConfig] = config.getDataSource("wikipedia03")
    val sender: Tranquilizer[java.util.Map[String, AnyRef]] = DruidBeams
      .fromConfig(config.getDataSource("wikipedia03"))
      .buildTranquilizer(wikipediaConfig.tranquilizerBuilder())

    sender.start()

    try {
      // Send 10000 objects.

      for (i <- 0 until 10) {
        val obj = Map[String, AnyRef](
          "timestamp" -> new DateTime().toString,
          "page" -> "foo",
          "added" -> Int.box(i)
        )

        // Asynchronously send event to Druid:
        sender.send(obj.asJava) respond {
          case Return(_) =>
            log.info("Sent message: %s", obj)

          case Throw(e: MessageDroppedException) =>
            log.warn(e, "Dropped message: %s", obj)

          case Throw(e) =>
            log.error(e, "Failed to send message: %s", obj)
        }
      }
    }
    finally {
      sender.flush()
      sender.stop()
    }
  }
}

7.2 低级API

Druid 整合SparkStreaming

7.2.1 环境准备

代码语言:javascript
复制
<dependencies>
        <dependency>
            <groupId>io.druid</groupId>
            <artifactId>tranquility-core_2.11</artifactId>
            <version>0.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.metamx</groupId>
            <artifactId>java-util</artifactId>
            <version>1.3.2</version>
        </dependency>
<dependency>
            <groupId>io.druid</groupId>
            <artifactId>tranquility-spark_2.11</artifactId>
            <version>0.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

7.2.2 代码编写

代码语言:javascript
复制
class SimpleEventBeamFactory extends BeamFactory[SimpleEvent]
{
  // Return a singleton, so the same connection is shared across all tasks in the same JVM.
  def makeBeam: Beam[SimpleEvent] = SimpleEventBeamFactory.BeamInstance
}

object SimpleEventBeamFactory
{
  val BeamInstance: Beam[SimpleEvent] = {
    // Tranquility uses ZooKeeper (through Curator framework) for coordination.
    val curator = CuratorFrameworkFactory.newClient(
      "localhost:2181",
      new BoundedExponentialBackoffRetry(100, 3000, 5)
    )
    curator.start()

    val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
    val discoveryPath = "/druid/discovery"     // Your overlord's druid.discovery.curator.path
    val dataSource = "foo"
    val dimensions = IndexedSeq("bar")
    val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
    val isRollup = true

    // Expects simpleEvent.timestamp to return a Joda DateTime object.
    DruidBeams
      .builder((simpleEvent: SimpleEvent) => simpleEvent.timestamp)
      .curator(curator)
      .discoveryPath(discoveryPath)
      .location(DruidLocation(indexService, dataSource))
      .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE, isRollup))
      .tuning(
        ClusteredBeamTuning(
          segmentGranularity = Granularity.HOUR,
          windowPeriod = new Period("PT10M"),
          partitions = 1,
          replicants = 1
        )
      )
      .buildBeam()
  }
}

// Add this import to your Spark job to be able to propagate events from any RDD to Druid
import com.metamx.tranquility.spark.BeamRDD._

// Now given a Spark DStream, you can send events to Druid.
dstream.foreachRDD(rdd => rdd.propagate(new SimpleEventBeamFactory))
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第7章 Druid数据摄入之Tranquility
    • 7.1 高级API
      • 7.1.1环境准备
      • 7.1.2 Java代码编写
      • 7.1.3 scala代码编写
    • 7.2 低级API
      • 7.2.1 环境准备
      • 7.2.2 代码编写
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档