流式计算常见模块用法说明

说明

StreamingPro有非常多的模块可以直接在配置文件中使用,本文主要针对流式计算中涉及到的模块。

Kafka Compositor

{
   "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
   "params": [{
                 "topics":"your topic",
                 "metadata.broker.list":"brokers",
                 "auto.offset.reset": "smallest|largest"
             }]
}

参数说明:

Property Name

Meaning

topics

Kafka主题,可以多个,按 逗号分隔

metadata.broker.list

Kafka Broker地址

auto.offset.reset

重头消费还是从最新消费

MockInputStreamCompositor

模拟数据源,主要为了方便测试。

{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [{
                      "batch-1":["1","2","3"],
                      "batch-2":["1","2","3"],
                      "batch-3":["1","2","3"],
                      "batch-4":["1","2","3"]
                  }]
}

MockInputStreamFromPathCompositor

模拟数据源,主要为了方便测试。可以接入一个外部文件作为mock数据

{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamFromPathCompositor",
        "params": [{"path":"file:///tmp/test.txt"}]
}

SingleColumnJSONCompositor

把一条日志转化一个单列的json文件。

{
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [{
            "name": "a"
          }]
}

params.name 则是列名,方便后续的sql使用。

ScalaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
}

可以把scala Map转化为JSon

JavaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JavaMapToJSONCompositor",
        "params": [{}]
}

可以把java Map转化为JSon

FlatJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.FlatJSONCompositor",
        "params": [{"a":"$['store']['book'][0]['title']"}]
}

从JSON里抽取字段,映射到新的列名上。主要是对复杂JSON结构进行扁平化。语法参考该库JsonPath

NginxParserCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.NginxParserCompositor",
        "params": [{"time":0,"url":1}]
}

Nginx 日志解析工具,按位置给列进行命名。

SQLCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
      }

Property Name

Meaning

sql

sql 语句

outputTableName

输出的表名,方便后续的SQL语句可以衔接

SQLESOutputCompositor

将数据存储到ES中

{
        "name":"streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",
        "params":[
          {
            "es.nodes":"",
            "es.resource":"",
            "es.mapping.include":"",
            "timeFormat":"yyyyMMdd"
          }
        ]
}

Property Name

Meaning

es.nodes

节点,多个节点用逗号分隔

es.resource

索引名称以及类型名称

....

其他一些elasticsearch-hadoop的配置

SQLPrintOutputCompositor(output)

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",
        "params": [{}]
}

把处理结果打印到终端控制台。主要是为了调试使用

JSONTableCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [{
            "tableName": "test"
          }]
}

把字符串(JSON格式)的数据注册成一张表。 params.tableName可以让你指定表名。

ConsoleOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",
        "params": [{ }]
}

控制台打印,非SQL类。

SQLCSVOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}

Property Name

Meaning

path

cvs 存储路径

mode

ErrorIfExists 或者Overwrite 或者Append或者Ignore

作为CSV 输出,需要前面是一张表。

SQLParquetOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}

Property Name

Meaning

path

parquet 存储路径

mode

ErrorIfExists 或者Overwrite 或者Append或者Ignore

作为parquet 输出,需要前面是一张表。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏me的随笔

使用CodeFirst创建并更新数据库

在使用Entity Framwork的三种方式(ModelFist、DBFirst、CodeFirst)中,CodeFirst方式书写的代码最为干净。

38340
来自专栏程序员的SOD蜜

EF+MySQL乐观锁控制电商并发下单扣减库存,在高并发下的问题

下订单减库存的方式 现在,连农村的大姐都会用手机上淘宝购物了,相信电商对大家已经非常熟悉了,如果熟悉电商开发的同学,就知道在买家下单购买商品的时候,是需要扣减库...

81080
来自专栏GreenLeaves

oracle 表空间tablespace

一、Oracle 表空间的组成 Everoone knows Oracle数据库真正存放数据的是数据文件,Oracle表空间是逻辑上的概念,他在物理上是并不存在...

36880
来自专栏xingoo, 一个梦想做发明家的程序员

Elasticsearch——分页查询From&Size VS scroll

Elasticsearch中数据都存储在分片中,当执行搜索时每个分片独立搜索后,数据再经过整合返回。那么,如果要实现分页查询该怎么办呢? 更多内容参考Ela...

96760
来自专栏蓝天

Google开源html模板库ctemplate的完整使用示例

ctemplate是Google开源的一个C++版本html模板替换库。有了它,在C++代码中操作html模板是一件非常简单和高效的事。通过本文,即可掌握对它的...

29510
来自专栏Hadoop实操

如何使用Scala代码访问Kerberos环境的HDFS

前面Fayson介绍了《如何使用Java API访问HDFS为目录设置配额》,随着开发语言的多样性,也有基于Scala语言进行开发,本篇文章主要介绍如何使用Sc...

419100
来自专栏码农分享

代理模式(Proxy)

转载 https://blog.csdn.net/lovelion/article/details/8228042

13120
来自专栏Django Scrapy

Django安装及简单使用1.3

Django安装及简单使用1.3 代码都在github: URL:https://github.com/njxshr/codes/tree/master/t...

34280
来自专栏java学习

Hibernate学习笔记2

定义hbm.xml映射文件和pojo类时都需要定义主键,Hibernate中定义的主键类型包括:自然主键和代理主键:

11540
来自专栏信安之路

PHP使用了PDO还可能存在sql注入的情况

“用 PDO 来防止 SQL 注入。”大概学过 PHP 的都听说过这句话。代码中出现了 PDO 就行了吗?答案肯定是否定的。接下来给大家介绍几种使用了 PDO ...

49400

扫码关注云+社区

领取腾讯云代金券