我对线很陌生。我想在每分钟连续24/7小时后,一次与多个传感器进行通信。
Senario:,我有一个与传感器对话的方法,它包含3个参数
public String perform(String command, String ip, String port)
{
//talk to the sensor and then
returns reply;
}
我有一个包含传感器细节的数据库。
我现在在做什么
while(true)
{
//get sensors from database
//run perform method for all instruments
我正在评估用于流处理的Apache作为Apache的替代/补充。我们通常用星火解决的任务之一是丰富数据。
也就是说,我有来自带有传感器ID的IoT传感器的数据流,还有一组传感器元数据。我想将输入流转换为传感器measure+sensor元数据流。
在Spark中,我可以使用RDD加入DStream。
case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafk
客户端:
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
msg = b"X"
for i in range(1500):
s.sendto(msg,("<IP>",<PORT>))
服务器:
import socket
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.bind(("",>PORT>))
counter = 0
for i in range(15