StreamingPro添加Scala script 模块支持

SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。

我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:

"udf_register": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "sql.udf",
        "params": [
          {
            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
      }
    ]
  }

这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。

这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。

核心是ScriptCompositor模块:

 {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      }

如果我想在代码里直接处理所有的列,则如下:

{
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "test2",
            "outputTableName": "test3",
            "useDocMap": true
          },
          {
            "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
          }
        ]
}

通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。

如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。

你可以把代码放到一个文件里,如下:

{
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。

这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:

{
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3",
            "schema": "file:///tmp/schema.scala",
            "useDocMap": true
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

schema.scala的内容大致如下:

Some(
StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
)

后续roadmap是:

  1. 支持外部脚本,比如放在hdfs或者http服务器上。
  2. 支持java 脚本
  3. 支持javascript脚本
  4. 支持 python 脚本
  5. 支持 ruby脚本
  6. 支持 groovy 脚本

举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。

{
  "convert_data_parquet": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/hdfsfile",
            "format": "org.apache.spark.sql.execution.datasources.hdfs",
            "fieldName": "raw",
            "outputTableName":"test"
          }
        ]
      },     
      {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select a,b  from test3 "
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
           "format":"console"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏鹅厂少年的奇妙之旅

Go内存模型

Go语言中内存分配大致有3种模式:Stack、Heap、Fixed Size Segment。

80250
来自专栏运维一切

ceph对象存储折腾记 原

###前言 一直想弄对象存储,以前弄过一次,不是很理解region是个什么东西,后来时间和工作上的原因没有再折腾,这两天闲了下来,再次折腾了一次。我是参考的ce...

17410
来自专栏与神兽党一起成长

解析XML和JSON内容的一点技巧

在没有统一标准的情况下,一个系统对接多个外部系统往往会遇到请求接口响应数据异构的情况,有可能返回的是XML,也有可能返回 JSON。除了返回类型不同,内容结构也...

22920
来自专栏分布式系统和大数据处理

.Net 项目代码风格参考

代码风格没有正确与否,重要的是整齐划一,这是我拟的一份《.Net 项目代码风格参考》,供大家参考。

15020
来自专栏FreeBuf

缓冲区溢出攻击初学者手册(更新版)

说明 之前版本翻译质量不佳,本人赵阳在这里对本文的读者表示深深的歉意。由于本人的疏忽和大意导致您不能很好的读完这篇文章,同时也对原文内容进行了破坏,也对IDF和...

30290
来自专栏后端技术探索

php中常见编码问题

PHP程序设计中中文编码问题曾经困扰很多人,导致这个问题的原因其实很简单,每个国家(或区域)都规定了计算机信息交换用的字符编码集,如美国的扩展 ASCII 码,...

24120
来自专栏同步博客

单例模式

  一种是饿汉式单例,饿汉式单例在单例类被加载时候,就实例化一个对象交给自己的引用;

15230
来自专栏安恒网络空间安全讲武堂

[Web安全]PHP伪协议

[Web安全]PHP伪协议 最近php伪协议的各种神奇妙用好像突然又常常提到了,php中支持的伪协议有下面这么多 复制代码 file:// — 访问本地文件...

58780
来自专栏H2Cloud

C++中消息自动派发之一 About JSON

1. 闲序   游戏服务器之间通信大多采用异步消息通信。而消息打包常用格式有:google protobuff,facebook thrift, 千千万万种自定...

26630
来自专栏大史住在大前端

webpack4.0各个击破(5)—— Module篇

使用webpack对脚本进行合并是非常方便的,因为webpack实现了对各种不同模块规范的兼容处理,对前端开发者来说,理解这种实现方式比学习如何配置webpac...

13720

扫码关注云+社区

领取腾讯云代金券