StreamingPro 再次支持 Structured Streaming

前言

之前已经写过一篇文章,StreamingPro 支持Spark Structured Streaming,不过当时只是玩票性质的,因为对Spark 2.0+ 版本其实也只是尝试性质的,重点还是放在了spark 1.6 系列的。不过时间在推移,Spark 2.0+ 版本还是大势所趋。所以这一版对底层做了很大的重构,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三个引擎了。

准备工作

下载streamingpro for spark 2.0的包,然后下载spark 2.1 的安装包。 你也可以在 streamingpro目录 找到spark 1.6+ 或者 flink的版本。最新的大体会按如下格式统一格式了:

streamingpro-spark-0.4.14-SNAPSHOT.jar  适配  spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar  适配  spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 适配 flink 1.2.0, scala 2.10

测试例子

写一个json文件ss.json,内容如下:

{
  "scalamaptojson": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
    ],
    "compositor": [
      {
        "name": "ss.sources",
        "params": [
          {
            "format": "socket",
            "outputTable": "test",
            "port":"9999",
            "host":"localhost",
            "path": "-"
          },
          {
            "format": "com.databricks.spark.csv",
            "outputTable": "sample",
            "header":"true",
            "path": "/Users/allwefantasy/streamingpro/sample.csv"
          }
        ]
      },
      {
        "name": "ss.sql",
        "params": [
          {
            "sql": "select city from test left join sample on test.value == sample.name",
            "outputTableName": "test3"
          }
        ]
      },
      {
        "name": "ss.outputs",
        "params": [
          {
            "mode": "append",
            "format": "console",
            "inputTableName": "test3",
            "path": "-"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

大体是一个socket源,一个sample文件。socket源是流式的,sample文件则是批处理的。sample.csv内容如下:

id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35

然后你在终端执行 nc -lk 9999 就好了。

然后运行spark程序:

SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json

在nc 那个终端输入比如eason ,然后回车,马上就可以看到spark终端接受到了数据。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

如何在CDSW上创建Git工程

在CDSW上有多中创建工程的方式(创建一个空的工程、模板工程、本地上传工程或者是创建git工程),前面的都比较好理解,那如何在CDSW上创建Git工程呢?本篇文...

3633
来自专栏LuckQI

Java大数据学习~Hadoop初识一了解其架构

913
来自专栏搜云库

Hadoop-2.7.4 集群快速搭建

Hadoop简介 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力...

3237
来自专栏LanceToBigData

Hadoop(四)HDFS集群详解

前言   前面几篇简单介绍了什么是大数据和Hadoop,也说了怎么搭建最简单的伪分布式和全分布式的hadoop集群。接下来这篇我详细的分享一下HDFS。   H...

5119
来自专栏恰童鞋骚年

Hadoop学习笔记—1.基本介绍与环境配置

说到Hadoop的起源,不得不说到一个传奇的IT公司—全球IT技术的引领者Google。Google(自称)为云计算概念的提出者,在自身多年的搜索引擎业务中构...

961
来自专栏PPV课数据科学社区

Spark的三种集群deploy模式对比

Spark有三种集群部署模式,或者叫做集群管理模式。分别是standalone,YARN和Mesos。这三种模式其实都是master/slave模式。 那么在...

4116
来自专栏我是攻城师

Spark Streaming如何使用checkpoint容错

5877
来自专栏Hadoop实操

如何在CDH集群中加入异构设备

在部署CDH集群或者扩容时,会遇到服务器配置不同(CPU、Memory、DISK等)的情况,那我们应该如何加入异构设备,并进行差异化的配置呢?本篇文章主要讲述如...

5099
来自专栏性能与架构

HDFS 核心原理

HDFS(Hadoop Distribute File System)是一个分布式文件系统 文件系统是操作系统提供的磁盘空间管理服务,只需要我们指定把文件放到...

3267
来自专栏灯塔大数据

每周学点大数据 | No.63 Hadoop MapReduce 实践—环境搭建(下)

NO.63  Hadoop MapReduce 实践—环境搭建(下) Mr. 王:vim 是Linux 下开源的文本编辑器,它的功能非常强大,受到广大编程爱好...

3455

扫码关注云+社区

领取腾讯云代金券