车联网项目分析的目的
车联网数据分析车辆驾驶信息、车辆行驶信息、车辆车况信息、车辆故障信息、车辆报警信息等。
车联网项目分析的意义
星途车联网项目
通过终端设备采集车辆上的CAN总线实时数据和企业静态hu离线数据,获取当前车辆的位置、行驶高程、车速、油压、电量、行驶里程、告警数据等数据,对其进行业务,车辆行程、故障告警、电子栅栏、百公里油耗、高温报警、低soc告警、疲劳驾驶告警等车联网分析,支撑汽车后服务市场、车企、政府监管部门、车主等服务。
海量存储
海量计算
数据量
原始数据 1kb/报文
5秒上报一条数据,12kb/1分钟 =》 720kb/h => 7200kb/day => 7200 0000kb/1w/day => 7200 0000 * 4/day => 7200 0000 * 4 * 3/day = 823GB/day 存储 / 0.7 * 90 = 103 TB/三个月
物理机 8T 128GB 5w*13=65w
云计算 阿里云
同等配置 5w/year
车联网项目
大数据相关组件
Flink 流处理
kafka 消息队列
HDFS 分布式存储系统
HBase 大表存储
Phoenix 基于SQL的查询
MySQL/mongodb 存储结果数据
Zeppelin 前端SQL图表化
Dolphinscheduler 调度平台
任务调度方式
#!/bin/sh
spark-submit --class com.dfssi.dataplatform.analysis.fuel.stats.ConditionFuelStatisticians
--master yarn
--deploy-mode client
--num-executors 2
--driver-memory 1g
--executor-memory 3g
--executor-cores 2
--jars $(echo ../target/jars/*.jar | tr ' ' ',')
../target/DataMiningAnalysis-0.1-SNAPSHOT.jar
在调度平台中加载 shell 文件
车联网项目框架搭建
实时ETL处理,对 json字符串解析
json字符串
"{\"batteryAlarm\": 0, \"carMode\": 1,\"minVoltageBattery\": 3.89, \"chargeStatus\": 1,\"vin\":\"LS5A3CJC0JF890971\"}"
复杂的 key/value 和 list集合的 json字符串解析示例
package cn.itcast.flink.source.test;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
/**
* Author itcast
* Date 2021/9/19 17:41
* 将json字符串转换成对象,将key-value转换成 HashMap
* 对json字符串进行封装
* 开发步骤:
* 1.定义 json 字符串
* 2.将JSONObject 字符串转换成 HashMap
* 3.将JSONObject转换成 List<HashMap<String,Object>>
* 4.解析 json 字符串
* 5.转换成对象
* 6.打印输出结果
*/
public class JsonPlusParseOptimize {
public static void main(String[] args) {
//1.定义 json 字符串
String json = "{\"batteryAlarm\": 0,\"carMode\": 1,\"minVoltageBattery\": 3.89,\"chargeStatus\": 1,\"vin\": \"LS5A3CJC0JF890971\",\"nevChargeSystemTemperatureDtoList\": [{\"probeTemperatures\": [25, 23, 24, 21, 24, 21, 23, 21, 23, 21, 24, 21, 24, 21, 25, 21],\"chargeTemperatureProbeNum\": 16,\"childSystemNum\": 1}]}";
//2.将JSONObject 字符串转换成 HashMap
JSONObject jsonObject = new JSONObject(json);
HashMap<String,Object> vehicleHashMap = toHashMap(jsonObject);
//3.将JSONObject转换成 List<HashMap<String,Object>>
//读取key,将字符串传递进来
String nevChargeSystemTemperatureDtoStr = vehicleHashMap.get("nevChargeSystemTemperatureDtoList").toString();
List<HashMap<String,Object>> lists = toList(nevChargeSystemTemperatureDtoStr);
//4.解析 json 字符串
//5.转换成对象
System.out.println(Integer.parseInt(vehicleHashMap.getOrDefault("batteryAlarm",-999999).toString()));
//6.打印输出结果
for (HashMap<String, Object> list : lists) {
System.out.println(list.getOrDefault("chargeTemperatureProbeNum",-999999).toString());
}
}
/**
* 将json数组字符串转换成List<HashMap>
* @param value
* @return
*/
private static List<HashMap<String, Object>> toList(String value) {
List<HashMap<String, Object>> lists = new ArrayList<>();
JSONArray objects = new JSONArray(value);
//遍历数组,取出所有的对象并转换成 HashMap
for (Object object : objects) {
String jsonStr = object.toString();
JSONObject jsonObject = new JSONObject(jsonStr);
lists.add(toHashMap(jsonObject));
}
return lists;
}
/**
* 将 JSONObject 转换成 HashMap 对象
* @param jsonObject
* @return
*/
private static HashMap<String, Object> toHashMap(JSONObject jsonObject) {
HashMap<String, Object> kv = new HashMap<String,Object>();
//读取 jsonObject 所有 keys
Set<String> keys = jsonObject.keySet();
for (String key : keys) {
//遍历所有key,得到所有值
kv.put(key,jsonObject.get(key));
}
//将其保存到 kv
return kv;
}
}
具体数据业务分析,流程如下图所示