StreamingPro 支持多输入,多输出配置

前言

最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。

最新的下载地址: https://pan.baidu.com/s/1eRO5Wga 依然的,比较大,因为现在他还能支持Thrift JDBC /Rest SQL: 使用StreamingPro 快速构建Spark SQL on CarbonData

输入配置

{
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },

以前用的是 batch.source, 如果你有多个输入源,则需要使用batch.sources 组件。每个源需要配置一个outputTable,也就是说这个源取个名字,方便后面使用。

如果是数据库,则可以这么写:

{
        "name": "batch.sources",
        "params": [
          {
             url:"jdbc:mysql://localhost/test?user=fred&password=secret",
            "dbtable":"table1",
            "driver":"com.mysql...",
            "path": "-",
            "format": "jdbc",
            "outputTable": "test",

          },
          {
            "path": "-",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },

输出

{
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }

我这里同时输出为json以及parquet格式。

一个简单但是涉及点比较多的例子

{
  "convert-multi-csv-to-json": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select city as tp  from test limit 100",
            "outputTableName": "sqlTable"
          }
        ]
      },
      {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "sqlTable",
            "outputTableName": "scriptTable",
            "useDocMap": true
          },
          {
            "-": "val count = doc(\"tp\").toString.length;Map(\"count\"->count)"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select scriptTable.tp,scriptTable.count,test2.city,test2.name  from scriptTable,test2 limit 100",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

在 batch.sql 里你可以引用任何一个源的表,或者之前已经在batch.sql里申明的outputTable, 同理batch.script。 而在batch.outputs里,你则可以将任何一张表写入到MySQL,ES,HDFS等文件存储系统中。

将配置文件保存一下,然后就可以启动了:

SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

大数据基础系列之spark的监控体系介绍

目前有好几种监控spark应用程序的工具:web UIs,指标系统和外部监控仪。 一,web界面 1,界面的基本介绍 每一个Spark应用程序都会启动一个spa...

50750
来自专栏Hadoop实操

如何在CDH5.15中安装CDSW1.4

44760
来自专栏Hadoop实操

如何编译Azkaban插件

前面Fayson介绍了《如何编译安装Azkaban服务》,在安装的时候我们再指定配置文件中有配置Web服务和Executor服务的插件路径,说明Azkaban支...

1.3K60
来自专栏java小白

IDEA快捷键生成serialVersionUID

60360
来自专栏Hadoop实操

如何在Kerberos环境下修改启用HA的CDH集群HOSTNAME

Fayson在前面的文章《如何修改Kerberos的CDH集群的HOSTNAME》介绍了修改集群的HOSTNAME,在文章中并未提到集群启用HA的情况,本篇文章...

16610
来自专栏云计算教程系列

如何在Ubuntu 18.04上创建多节点MySQL集群

MySQL Cluster分布式数据库为MySQL数据库管理系统提供高可用性和吞吐量。MySQL Cluster由一个或多个管理节点(ndb_mgmd)组成,这...

25500
来自专栏JetpropelledSnake

SNMP学习笔记之Linux下安装和配置SNMP

  选择一个SNMP版本,比如5.7.1,下载地址如下:http://sourceforge.net/projects/net-snmp/files/net-s...

29330
来自专栏Hadoop实操

如何在CDH中使用HBase的ACLs进行授权

37240
来自专栏PHP在线

MySQL 主从复制的原理和配置

主从复制的原理: 分为同步复制和异步复制,实际复制架构中大部分为异步复制。 复制的基本过程如下: 1).Slave上面的IO进程连接上Master,并请求从指定...

348120
来自专栏我是攻城师

最新版Solr6.2.1安装记录

36550

扫码关注云+社区

领取腾讯云代金券