Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

物联网+大数据+机器学习将会是以后的趋势,这里介绍一篇这方面的文章包含源码。 混合机器学习基础架构构建了一个场景,利用Apache Kafka作为可扩展的中枢神经系统。 公共云用于极大规模地训练分析模型(例如,通过Google ML Engine在Google Cloud Platform(GCP)上使用TensorFlow和TPU,预测(即模型推断)在本地Kafka基础设施的执行( 例如,利用Kafka Streams或KSQL进行流分析)。 本文重点介绍内部部署。 创建了一个带有KSQL UDF的Github项目,用于传感器分析。 它利用KSQL的新API功能,使用Java轻松构建UDF / UDAF函数,对传入事件进行连续流处理。 使用案例:Connected Cars - 使用深度学习的实时流分析 从连接设备(本例中的汽车传感器)连续处理数百万个事件:

为此构建了不同的分析模型。 他们在公共云上接受TensorFlow,H2O和Google ML Engine的训练。 模型创建不是此示例的重点。 最终模型已经可以投入生产,可以部署用于实时预测。 模型服务可以通过模型server 完成,也可以本地嵌入到流处理应用程序中。 参阅RPC与流处理的权衡,以获得模型部署和.... 演示:使用MQTT,Kafka和KSQL在Edge进行模型推理 Github项目:深度学习+KSQL UDF 用于流式异常检测MQTT物联网传感器数据 (下载源码:

ksql-udf-deep-learning-mqtt-iot-master.zip (474.64 KB, 下载次数: 0) ) 该项目的重点是通过MQTT将数据提取到Kafka并通过KSQL处理数据:

Confluent MQTT Proxy的一大优势是无需MQTT Broker即可实现物联网方案的简单性。 可以通过MQTT代理将消息直接从MQTT设备转发到Kafka。 这显着降低了工作量和成本。 如果你“只是”想要在Kafka和MQTT设备之间进行通信,这是一个完美的解决方案。 如果你想看到另一部分(与Elasticsearch / Grafana等接收器应用程序集成),请查看Github项目“KSQL for streaming IoT data”。 这实现了通过Kafka Connect和Elastic连接器与ElasticSearch和Grafana的集成。(源码下载:链接: https://pan.baidu.com/s/1FCFgAoF9v1ihp9fyqHeKag 密码: 67sz) KSQL UDF - 源代码 开发UDF非常容易。 只需在UDF类中的一个Java方法中实现该函数:

[Bash shell] 纯文本查看 复制代码

?

@Udf(description = "apply analytic model to sensor input")             public String anomaly(String sensorinput){ "YOUR LOGIC" }

这里是所有代码:

[Java] 纯文本查看 复制代码

?

package com.github.megachucky.kafka.streams.machinelearning;
 
import java.util.Arrays;
 
import hex.genmodel.GenModel;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.RowData;
import hex.genmodel.easy.exception.PredictException;
import hex.genmodel.easy.prediction.AutoEncoderModelPrediction;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
 
 
@UdfDescription(name = "anomaly", description = "anomaly detection using deep learning")
public class Anomaly {
     
     
    // Model built with H2O R API:
      // anomaly_model <- h2o.deeplearning(x = names(train_ecg),training_frame =
      // train_ecg,activation = "Tanh",autoencoder = TRUE,hidden =
      // c(50,20,50),sparse = TRUE,l1 = 1e-4,epochs = 100)
 
      // Name of the generated H2O model
      private static String modelClassName = "io.confluent.ksql.function.udf.ml"
                                             + ".DeepLearning_model_R_1509973865970_1"; 
     
  @Udf(description = "apply analytic model to sensor input")
  public String anomaly(String sensorinput) {
       
      System.out.println("Kai: DL-UDF starting");
          
      GenModel rawModel;
        try {
            rawModel = (hex.genmodel.GenModel) Class.forName(modelClassName).newInstance();
         
        EasyPredictModelWrapper model = new EasyPredictModelWrapper(rawModel);
         
        // Prepare input sensor data to be in correct data format for the autoencoder model (double[]):
        String[] inputStringArray = sensorinput.split("#");
        double[] doubleValues = Arrays.stream(inputStringArray)
                .mapToDouble(Double::parseDouble)
                .toArray();
         
        RowData row = new RowData();
        int j = 0;
        for (String colName : rawModel.getNames()) {
          row.put(colName, doubleValues[j]);
          j++;
        }
 
        AutoEncoderModelPrediction p = model.predictAutoEncoder(row);
        // System.out.println("original: " + java.util.Arrays.toString(p.original));
        // System.out.println("reconstructedrowData: " + p.reconstructedRowData);
        // System.out.println("reconstructed: " + java.util.Arrays.toString(p.reconstructed));
 
        double sum = 0;
        for (int i = 0; i < p.original.length; i++) {
          sum += (p.original[i] - p.reconstructed[i]) * (p.original[i] - p.reconstructed[i]);
        }
        // Calculate Mean Square Error => High reconstruction error means anomaly
        double mse = sum / p.original.length;
        System.out.println("MSE: " + mse);
 
        String mseString = "" + mse;
 
        return (mseString);
         
        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
            System.out.println(e.toString());
             
        } catch (PredictException e) {
            System.out.println(e.toString());
        }
         
        return null;
       
  }
 
 
}

如何使用Apache Kafka和MQTT Proxy运行演示? 执行演示的所有步骤都在Github项目中描述。 你只需安装Confluent Platform,然后按照以下步骤部署UDF,创建MQTT事件并通过KSQL levera处理它们.... 这里使用Mosquitto生成MQTT消息。 当然,也可以使用任何其他MQTT客户端。 这是开放和标准化协议的巨大好处。

到此结束,文章虽然简短,但是内容确实很丰富,特别项目的源码的阅读,在github上有详细的介绍。为方便阅读,微信点此可查看

https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot

https://github.com/kaiwaehner/ksql-fork-with-deep-learning-function

原文发布于微信公众号 - About云(wwwaboutyuncom)

原文发表时间:2018-08-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据挖掘DT机器学习

机器学习数据采集入门经验分享

在新的一年里,很多人都在思考如何利用机器学习(ML)算法来提高产品或服务的质量。 PredictionIO公司与许多公司合作,部署他们的第一个ML系统和大数据基...

43380
来自专栏大魏分享(微信公众号:david-share)

云时代企业如何建设绿色数据中心(第一篇)

建设绿色数据中心的必要性 中国目前是是全球最大的温室气体排放国,节能减排是“十二五”的重要工作内容。发改委在2013年就明确了首批10个行业的企业温室气...

36760
来自专栏Danny的专栏

UML图——用例图

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

14520
来自专栏奇点大数据

python需要系统学习一下

1、python是脚本语言,作为程序员我觉得至少应该掌握一本通用脚本语言,因为脚本语言与编译语言的开发测试过程不同,可以极大的提高编程效率。

14920
来自专栏从流域到海域

用JAVA的DEA算法衡量社交媒体页面的流行度

原文作者:Vasilis Vryniotis

29860
来自专栏吉浦迅科技

DAY47:阅读read only cache和Time Function

The read-only data cache load function is only supported by devices of compute c...

9110
来自专栏美团技术团队

业务赋能利器之外卖特征档案

应用背景及现状 美团外卖业务自2013年9月启动至今已运营三年时间。截至2016年12月,美团点评整个外卖平台的日订单超过900万。从发展速度和体量上看,外卖业...

43170
来自专栏九彩拼盘的叨叨叨

通过分类来管理

在工作的过程中,发现很多人都没有分类的习惯:电脑桌面放着各种各样一大堆的文件;一个原型图文件夹下无序的放着很多原型图;图标文件下放着一堆文件;写的技术类的长文,...

9140
来自专栏瓜大三哥

FPGA内部资源介绍

39420
来自专栏机器之心

终于!大家心心念念的PyTorch Windows官方支持来了

机器之心整理 参与:机器之心编辑部 五个小时前,PyTorch 官方 GitHub 发布 0.4.0 版本,大家心心念念的 Windows 支持终于来了。 Gi...

31380

扫码关注云+社区

领取腾讯云代金券