前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python开发---基于HJ 212协议的简单接收程序

Python开发---基于HJ 212协议的简单接收程序

原创
作者头像
MiaoGIS
修改2022-04-01 16:51:25
2.6K0
修改2022-04-01 16:51:25
举报
文章被收录于专栏:Python in AI-IOTPython in AI-IOT

HJ 212-2017是污染物在线监控(监测)系统数据传输标准的一种。

本标准适用于污染物在线监控(监测)系统、污染物排放过程(工况)自动监控系统与监控中心之间的数据传输,规定了传输的过程及参数命令、交互命令、数据命令和控制命令的格式,给出了代码定义,本标准允许扩展,但扩展内容时不得与本标准中所使用或保留的控制命令相冲突。 本标准还规定了在线监控(监测)仪器仪表和数据采集传输仪之间的数据传输格式,同时给出了代码定义。

传感器设备通过TCP连接,使用HJ 212协议向服务器发送报文数据。

服务器接收程序运行如下:

设备作为TCP Client上传数据,所以服务端接收程序是一个TCP Server程序,接收到报文,解析并存储。代码如下:

代码语言:python
复制
# -*-coding:utf-8 -*-
import socket
import threading
from datetime import datetime
from hjt212 import *

def deleteConnection(item):
    global connectionList
    del connectionList['connection'+item]



class WebSocket(threading.Thread):#继承Thread

    #def __init__(self,conn,index,name,remote,path='/'):
    def __init__(self,conn,name,remote,path='/'):
        threading.Thread.__init__(self)#初始化父类Thread
        self.conn=conn
        #self.index=index
        self.name=name
        
        self.remote=remote
        self.path=path
        
        self.buffer=''
        self.buffer_utf8=''
        self.length_buffer=0

    def run(self):#重载Thread的run
        #print ('Socket %s Start!' % self.index)
        print ('Socket %s Start!'% (self.remote[0]+":"+str(self.remote[1])))
        headers={}
        self.handshaken=False

        while True:
            print(self.name)
            #print(self.handshaken)
                      
            if self.handshaken==False:
                message=self.conn.recv(1024)
                if(len(message)!=0):
		    
                    sourceIP=self.remote[0]+":"+str(self.remote[1])
                    print(u"IP::"+sourceIP+u"\n报文::"+message.hex()+u"\n时间::"+str(datetime.now()))
                    nowStr=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    t=threading.Thread(target=savePack,args=(sourceIP,message,nowStr,self))
                    t.run()
                    
                    #print('Socket %s Start Handshaken with %s!' % (self.index,self.remote))
                    print('Socket  Start Handshaken with %s!' % (self.remote,))
                    #print('Socket %s Handshaken with %s success!' %(self.index,self.remote))
                    
                
                     
                   
                else:
                    self.handshaken = True
            else:
                
                #deleteConnection(str(self.index))
                self.conn.close()
                break #退出线程

class WebSocketServer(object):
    def __init__(self):
        self.socket=None

    def begin(self):
        print ('WebSocketServer Start!')
        self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self.socket.bind(('0.0.0.0',8091))
        self.socket.listen(50)

        #global connectionlist

        #i=0
    
        while True:
            connection,address=self.socket.accept()
            
            ip=address[0]    
            
            newSocket=WebSocket(connection,ip,address)
            newSocket.start() #开始线程,执行run函数
            #global connectionList
            #connectionList['connection'+str(i)]=connection
            #i=i+1


if __name__=='__main__':
    #global connectionList
    #connectionList = {}
    server=WebSocketServer()
    server.begin()

hjt212.py文件中定义了hj212协议数据包的解析和存储:包括CRC校验,报文解析成json字典,最后根据CN编号(2051表示为分钟数据,2061表示为小时数据)分别保存在MongoDB的不同表中。

代码语言:python
复制
# -*-coding:utf-8 -*-

"""
{ $and: [ { "CN": "1" },equals
{ "CN": { $ne: "2" } },doesn't equal
{ "CN": /.*abc.*/i },contains
{ "CN": { $not: /.*abc.*/i } },doesn't contain
{ "CN": /^abc.*/i },starts with
{ "CN": { $not: /^abc.*/i } },doesn't start with
{ "CN": /.*abc$/i },ends with
{ "CN": { $not: /.*abc$/i } },doesn't end with
{ "CN": { $exists: true } }, exists
{ "CN": { $exists: false } },doesn't exist
{ "CN": { $in: [abc] } },in 
{ "CN": { $nin: [abc] } },not in
{ "CN": { $all: [abc] } },array contains all
{ "CN": { $gt: "1" } },>
{ "CN": { $gte: "1" } },>=
{ "CN": { $lt: "1" } },<
{ "CN": { $lte: "1" } },<=
{ "CN": { $gte: "1", $lte: "3" } },<=...<
{ "CN": { $gt: "1", $lte: "3" } },<..<=
{ "CN": { $gte: "1", $lt: "3" } },<..<
{ "CN": { $type: 16 } }, has type 
{ "CN": { $not: { $type: 16 } } ,doesn't have type
{ $text: { $search: "abc", $language: "zhs", $caseSensitive: true } } text index search

{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } } ] }

{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } }, { $and: [ { "CN": "1" } ] } ] }

{ $or: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $and: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $or: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { "CP": { $elemMatch: {"CN": "1" } } }, { $or: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { "CP": { $not: { $elemMatch: {"CN": "1" } } } }, { $or: [ { "CN": "1" } ] } ] }


{ "PW": 1, "MN": 0}

{ "PW": 1, "Flag": -1}
"""


dictFactor1={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}

dictFactor2={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}
from datetime import datetime
global bs
global data
import binascii
import pymongo
import logging
logger=logging.getLogger('HJT212')
fh=logging.FileHandler('log.txt')
logFormatter=logging.Formatter('%(asctime)s -%(name)s -%(message)s')
fh.setFormatter(logFormatter)
logger.addHandler(fh)
logger.setLevel(logging.ERROR)

mongo=pymongo.MongoClient()
t1=mongo.water.minute
t2=mongo.water.hour
def crc32asii(v):
    return '0x%8x' % (binascii.crc32(v) & 0xffffffff)


def crc2hex(crc): 
    return '%08x' % (binascii.crc32(binascii.a2b_hex(crc)) & 0xffffffff)

def crc16(x, invert):
    a = 0xFFFF
    b = 0xA001
    for byte in x:
        a ^= ord(byte)
        for i in range(8):
            last = a % 2
            a >>= 1
            if last == 1:
                a ^= b
    s = hex(a).upper()
 
    return s[4:6]+s[2:4] if invert == True else s[2:4]+s[4:6]
def calc_crc(string):
    data = bytearray.fromhex(string)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if ((crc & 1) != 0):
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8) + (crc >> 8))


"""
unsigned int CRC16_Checkout ( unsigned char *puchMsg, unsigned int usDataLen ) 
{ 
unsigned int i,j,crc_reg,check; 
crc_reg = 0xFFFF; 
for(i=0;i<usDataLen;i++) 
{ 
crc_reg = (crc_reg>>8) ^ puchMsg[i]; 
 for(j=0;j<8;j++) 
{ 
 check = crc_reg & 0x0001; 
 crc_reg >>= 1; 
 if(check==0x0001) 
{ 
 crc_reg ^= 0xA001; 
 } 
 } 
} 
return crc_reg; 
}
"""
 
def crc(string):
    data=bytearray.fromhex(string)
    crc_reg=0xFFFF
    for x in data:
        crc_reg=(crc_reg>>8)^x
        for i in range(8):
            check=crc_reg&0x0001
            crc_reg=(crc_reg>>1)
            if(check==0x0001):
                crc_reg=(crc_reg^0xA001)
    return crc_reg

def parsePack(ip,bs,receiveTime):
    #global bs
    #bs=text.strip().split(' ')
    #packHead=bytes.fromhex(''.join(bs[:2]))
    text=bs.hex()
    packHead=bytes.fromhex(text[:4])
    packHead=packHead.decode('ascii')
    print(u"包头:%s"%packHead)
    #dataLength=bytes.fromhex(''.join(bs[2:6]))
    dataLength=bytes.fromhex(text[4:12])
    #dataLength=int(dataLength)
    dataLength=int(dataLength)*2
    print(u"数据段长度:%s"%dataLength)
    global data
    #data=bs[6:dataLength+6]
    data=text[12:dataLength+12]
    #crc1=crc(''.join(data))
    crc1=crc(data)
    print("计算CRC为:%s"%crc1)
    print(text[dataLength+12:dataLength+12+8])
    #crc2=int(bytes.fromhex(''.join(bs[dataLength+6:dataLength+6+4])),16)
    crc2=int(bytes.fromhex(text[dataLength+12:dataLength+12+8]),16)
    print("数据CRC为:%s"%crc2)
    print(u"CRC比对成功"if crc1==crc2 else u"CRC比对失败")
    print(u'数据段为:\n%s'%''.join(data))
    #print(u'数据为:\n%s'%bytes.fromhex(''.join(data)).decode('ascii'))
    print(u'数据为:\n%s'%bytes.fromhex(data).decode('ascii'))
    #dataStr=bytes.fromhex(''.join(data)).decode('ascii')
    dataStr=bytes.fromhex(data).decode('ascii')
    sepNum=dataStr.find('DataTime')
    dataStr1=dataStr[:sepNum]
    
    dataDict1=dict(map(lambda x:(x.split('=')[0],x.split('=')[1]),dataStr1.split(';')))
    print(dataDict1)
    dataStr2=dataStr[sepNum:]
    dateText=dataStr2.split(';')[0].split('=')[1]
    date=datetime.strptime(dateText,'%Y%m%d%H%M%S')
    
    print(u"数据时间为:%s"%date.strftime('%Y-%m-%d %H:%M:%S'))
    dateStr=date.strftime('%Y-%m-%d %H:%M:%S')
    list2=list(map(lambda x:x.split(','),dataStr2.split(';')[1:]))
    print(list2)
    dictValues=dict(map(lambda x:[x[0].split('=')[0],x[0].split('=')[1].strip('&')],list2))
    dictFlags=dict(map(lambda x:[x[1].split('=')[0],x[1].split('=')[1].strip('&')],list2))
    
    dictValues0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictValues.items()))
    dictFlags0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictFlags.items()))
    print(dictValues0)
    print(dictFlags0)
    #dataFoot=bytes.fromhex(''.join(bs[-2:]))
    dataFoot=bytes.fromhex(text[-4:])
    print(u'包尾为:%s'%dataFoot.decode('ascii'))
    qn=datetime.strptime(dataDict1['QN'][:-3],'%Y%m%d%H%M%S')
    qnStr=qn.strftime('%Y-%m-%d %H:%M:%S')
    dataDict1.update({'ip':ip.split(':')[0],'port':ip.split(':')[1],'DataTime':dateText,'dateStr':dateStr,'qnStr':qnStr,'receiveTime':receiveTime})
    return crc1==crc2,dataDict1,dictValues,dictFlags
    
def savePack(ip,bs,receiveTime,self):
    try:
    #if True:
        success,heads,values,flags=parsePack(ip,bs,receiveTime)
    except Exception as e:
        print("error occurred")
        logger.error(e)
        #self.handshaken = True
        return 
         

    if(success):
        #mongo.insert_one()
        print(heads)
        print(values)
        print(flags)
        values.update(heads)
        values.update(flags)
        CN=int(heads['CN'])
        if(CN==2051):
            insId=t1.insert_one(values)
            print(u'ID:%s,已经保存到分钟数据表'%str(insId.inserted_id))
        elif(CN==2061):
            insId=t2.insert_one(values)
            print(u'ID:%s,已经保存到小时数据表'%str(insId.inserted_id))
        
if __name__=='__main__':
    pass
    

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档