前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark高级操作之json复杂和嵌套数据结构的操作二

Spark高级操作之json复杂和嵌套数据结构的操作二

作者头像
Spark学习技巧
发布2018-01-30 18:56:26
8.5K1
发布2018-01-30 18:56:26
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一,准备阶段

Json格式里面有map结构和嵌套json也是很合理的。本文将举例说明如何用spark解析包含复杂的嵌套数据结构,map。现实中的例子是,一个设备的检测事件,二氧化碳的安全你浓度,高温数据等,需要实时产生数据,然后及时的告警处理。

1,定义schema

代码语言:js
复制
import org.apache.spark.sql.types._

val schema = new StructType()
  .add("dc_id", StringType)                               // data center where data was posted to Kafka cluster
 .add("source",                                          // info about the source of alarm
 MapType(                                              // define this as a Map(Key->value)
 StringType,
 new StructType()
        .add("description", StringType)
        .add("ip", StringType)
        .add("id", LongType)
        .add("temp", LongType)
        .add("c02_level", LongType)
        .add("geo",
 new StructType()
            .add("lat", DoubleType)
            .add("long", DoubleType)
        )
    )
  )

2,准备数据

代码语言:js
复制
 val dataDS = Seq("""
{
"dc_id": "dc-101",
"source": {
    "sensor-igauge": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo": {"lat":38.00, "long":97.00}                        
    },
    "sensor-ipad": {
      "id": 13,
      "ip": "67.185.72.1",
      "description": "Sensor ipad attached to carbon cylinders",
      "temp": 34,
      "c02_level": 1370,
      "geo": {"lat":47.41, "long":-122.00}
    },
    "sensor-inest": {
      "id": 8,
      "ip": "208.109.163.218",
      "description": "Sensor attached to the factory ceilings",
      "temp": 40,
      "c02_level": 1346,
      "geo": {"lat":33.61, "long":-111.89}
    },
    "sensor-istick": {
      "id": 5,
      "ip": "204.116.105.67",
      "description": "Sensor embedded in exhaust pipes in the ceilings",
      "temp": 40,
      "c02_level": 1574,
      "geo": {"lat":35.93, "long":-85.46}
    }
  }
}""").toDS()
 // should only be one item
 dataDS.count()

3,准备处理

代码语言:js
复制
val df = spark.read.schema(schema).json(dataDS.rdd)

查看schema

df.printSchema

二,如何使用explode()

Explode()方法在spark1.3的时候就已经存在了,在这里展示一下如何抽取嵌套的数据结构。在一些场合,会结合explode,to_json,from_json一起使用。

Explode为给定的map的每一个元素创建一个新的行。比如上面准备的数据,source就是一个map结构。Map中的每一个key/value对都会是一个独立的行。

代码语言:js
复制
val explodedDF = df.select($"dc_id", explode($"source"))
explodedDF.printSchema

可以看看操作之后的schema信息

获取内部的 数据

代码语言:js
复制
case class DeviceAlert(dcId: String, deviceType:String, ip:String, deviceId:Long, temp:Long, c02_level: Long, lat: Double, lon: Double)
val notifydevicesDS = explodedDF.select( $"dc_id" as "dcId",
 $"key" as "deviceType",
 'value.getItem("ip") as 'ip,
 'value.getItem("id") as 'deviceId,
 'value.getItem("c02_level") as 'c02_level,
 'value.getItem("temp") as 'temp,
 'value.getItem("geo").getItem("lat") as 'lat,                //note embedded level requires yet another level of fetching.
 'value.getItem("geo").getItem("long") as 'lon)
  .as[DeviceAlert]  // return as a Dataset

查看schema信息

notifydevicesDS.printSchema

三,再复杂一点

在物联网场景里,通畅物联网设备会将很多json 事件数据发给他的收集器。收集器可以是附近的数据中心,也可以是附近的聚合器,也可以是安装在家里的一个设备,它会有规律的周期的将数据通过加密的互联网发给远程的数据中心。说白一点,数据格式更复杂。

我们下面会有三个map的数据格式:恒温计,摄像机,烟雾报警器。

代码语言:js
复制
import org.apache.spark.sql.types._

// a bit longish, nested, and convuloted JSON schema :)
val nestSchema2 = new StructType()
  .add("devices",
 new StructType()
      .add("thermostats", MapType(StringType,
 new StructType()
        .add("device_id", StringType)
        .add("locale", StringType)
        .add("software_version", StringType)
        .add("structure_id", StringType)
        .add("where_name", StringType)
        .add("last_connection", StringType)
        .add("is_online", BooleanType)
        .add("can_cool", BooleanType)
        .add("can_heat", BooleanType)
        .add("is_using_emergency_heat", BooleanType)
        .add("has_fan", BooleanType)
        .add("fan_timer_active", BooleanType)
        .add("fan_timer_timeout", StringType)
        .add("temperature_scale", StringType)
        .add("target_temperature_f", DoubleType)
        .add("target_temperature_high_f", DoubleType)
        .add("target_temperature_low_f", DoubleType)
        .add("eco_temperature_high_f", DoubleType)
        .add("eco_temperature_low_f", DoubleType)
        .add("away_temperature_high_f", DoubleType)
        .add("away_temperature_low_f", DoubleType)
        .add("hvac_mode", StringType)
        .add("humidity", LongType)
        .add("hvac_state", StringType)
        .add("is_locked", StringType)
        .add("locked_temp_min_f", DoubleType)
        .add("locked_temp_max_f", DoubleType)))
      .add("smoke_co_alarms", MapType(StringType,
 new StructType()
        .add("device_id", StringType)
        .add("locale", StringType)
        .add("software_version", StringType)
        .add("structure_id", StringType)
        .add("where_name", StringType)
        .add("last_connection", StringType)
        .add("is_online", BooleanType)
        .add("battery_health", StringType)
        .add("co_alarm_state", StringType)
        .add("smoke_alarm_state", StringType)
        .add("is_manual_test_active", BooleanType)
        .add("last_manual_test_time", StringType)
        .add("ui_color_state", StringType)))
      .add("cameras", MapType(StringType,
 new StructType()
        .add("device_id", StringType)
        .add("software_version", StringType)
        .add("structure_id", StringType)
        .add("where_name", StringType)
        .add("is_online", BooleanType)
        .add("is_streaming", BooleanType)
        .add("is_audio_input_enabled", BooleanType)
        .add("last_is_online_change", StringType)
        .add("is_video_history_enabled", BooleanType)
        .add("web_url", StringType)
        .add("app_url", StringType)
        .add("is_public_share_enabled", BooleanType)
        .add("activity_zones",
 new StructType()
            .add("name", StringType)
            .add("id", LongType))
        .add("last_event", StringType))))

对应的数据

代码语言:js
复制
val nestDataDS2 = Seq("""{
  "devices": {
     "thermostats": {
        "peyiJNo0IldT2YlIVtYaGQ": {
          "device_id": "peyiJNo0IldT2YlIVtYaGQ",
          "locale": "en-US",
          "software_version": "4.0",
          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
          "where_name": "Hallway Upstairs",
          "last_connection": "2016-10-31T23:59:59.000Z",
          "is_online": true,
          "can_cool": true,
          "can_heat": true,
          "is_using_emergency_heat": true,
          "has_fan": true,
          "fan_timer_active": true,
          "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
          "temperature_scale": "F",
          "target_temperature_f": 72,
          "target_temperature_high_f": 80,
          "target_temperature_low_f": 65,
          "eco_temperature_high_f": 80,
          "eco_temperature_low_f": 65,
          "away_temperature_high_f": 80,
          "away_temperature_low_f": 65,
          "hvac_mode": "heat",
          "humidity": 40,
          "hvac_state": "heating",
          "is_locked": true,
          "locked_temp_min_f": 65,
          "locked_temp_max_f": 80
          }
        },
        "smoke_co_alarms": {
          "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
            "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
            "locale": "en-US",
            "software_version": "1.01",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Jane's Room",
            "last_connection": "2016-10-31T23:59:59.000Z",
            "is_online": true,
            "battery_health": "ok",
            "co_alarm_state": "ok",
            "smoke_alarm_state": "ok",
            "is_manual_test_active": true,
            "last_manual_test_time": "2016-10-31T23:59:59.000Z",
            "ui_color_state": "gray"
            }
          },
       "cameras": {
        "awJo6rH0IldT2YlIVtYaGQ": {
          "device_id": "awJo6rH",
          "software_version": "4.0",
          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
          "where_name": "Foyer",
          "is_online": true,
          "is_streaming": true,
          "is_audio_input_enabled": true,
          "last_is_online_change": "2016-12-29T18:42:00.000Z",
          "is_video_history_enabled": true,
          "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
          "app_url": "nestmobile://cameras/device_id?auth=access_token",
          "is_public_share_enabled": true,
          "activity_zones": { "name": "Walkway", "id": 244083 },
          "last_event": "2016-10-31T23:59:59.000Z"
          }
        }
      }
     }""").toDS

通过创建一个简单的dataset,我们可以使用所有的dataset的方法来进行ETL操作,比如from_json(), to_json(), explode() and selectExpr()。

代码语言:js
复制
val nestDF2 = spark                            // spark session 
 .read                             //  get DataFrameReader
 .schema(nestSchema2)             //  use the defined schema above and read format as JSON
 .json(nestDataDS2.rdd)

2,将整个json对象,转化为一个json string

代码语言:js
复制
val stringJsonDF = nestDF2.select(to_json(struct($"*"))).toDF("nestDevice")

3,将三个json object 的map对象抓化为三个单独的map列,然后可以是使用explode方法访问其属性。

代码语言:js
复制
val mapColumnsDF = nestDF2.select($"devices".getItem("smoke_co_alarms").alias ("smoke_alarms"),
 $"devices".getItem("cameras").alias ("cameras"),
 $"devices".getItem("thermostats").alias ("thermostats"))

转化为三个dataframe

代码语言:js
复制
val explodedThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
val explodedCamerasDF = mapColumnsDF.select(explode($"cameras"))
//or you could use the original nestDF2 and use the devices.X notation
val explodedSmokedAlarmsDF =  nestDF2.select(explode($"devices.smoke_co_alarms"))

查看其schema

explodedThermostatsDF.printSchema

访问三个map内部的元素

代码语言:js
复制
val thermostateDF = explodedThermostatsDF.select($"value".getItem("device_id").alias("device_id"),
 $"value".getItem("locale").alias("locale"),
 $"value".getItem("where_name").alias("location"),
 $"value".getItem("last_connection").alias("last_connected"),
 $"value".getItem("humidity").alias("humidity"),
 $"value".getItem("target_temperature_f").alias("target_temperature_f"),
 $"value".getItem("hvac_mode").alias("mode"),
 $"value".getItem("software_version").alias("version"))

val cameraDF = explodedCamerasDF.select($"value".getItem("device_id").alias("device_id"),
 $"value".getItem("where_name").alias("location"),
 $"value".getItem("software_version").alias("version"),
 $"value".getItem("activity_zones").getItem("name").alias("name"),
 $"value".getItem("activity_zones").getItem("id").alias("id"))

val smokedAlarmsDF = explodedSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
 $"value".getItem("where_name").alias("location"),
 $"value".getItem("software_version").alias("version"),
 $"value".getItem("last_connection").alias("last_connected"),
 $"value".getItem("battery_health").alias("battery_health"))

查看内部数据

cameraDF.show

通过version进行join操作

代码语言:js
复制
val joineDFs = thermostateDF.join(cameraDF, "version")

四,总结

这篇文章的重点是介绍几个好用的工具,去获取复杂的嵌套的json数据格式。一旦你将嵌套数据扁平化之后,再进行访问,就跟普通的数据格式没啥区别了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-08-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档