前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于web页面开发串口程序界面---代码实现

基于web页面开发串口程序界面---代码实现

原创
作者头像
MiaoGIS
修改2020-09-14 17:48:47
1.1K0
修改2020-09-14 17:48:47
举报
文章被收录于专栏:Python in AI-IOTPython in AI-IOT

对应的js代码如下:

代码语言:javascript
复制
$(function () {

    $('.dropdown-toggle').dropdown();
    $.get('com/ports', function (data) {
        var liTemplate = _.template('<li data-value="<%=device%>"><a href="#"><%=device%></a></li>');
        _(data['result']).each(x => {
            $('[data-role="ulPort"]').append($(liTemplate(x)));

        })

        $("[data-role=ulPort],[data-role=ulBaudrate],[data-role=ulBytesize],[data-role=ulParity],[data-role=ulStopbits]").find('li').click(function (e) {
            console.log($(this));
            $(this).addClass('active').siblings().removeClass('active');
        });
        $("[data-role=open]").click(function () {
            if (!$('[data-role=ulPort] li').hasClass('active')) {
                $('[data-role=info] p').text("请先选择一个串口!");
                return
            }

            var com = _.object($('#init').find('ul[data-role]').map(function (index, item) {
                return [[$(item).data('role').slice(2).toLowerCase(), $(item).find('li.active').data('value')]]
            }));
            console.log(com);
            $.post('com/open', {
                'com': JSON.stringify(com)
            }, function (data) {
                if (data['result']) {
                    $('[data-role=info] p').text("串口打开成功!");
                    $('[data-role="readwrite"]').removeClass('hide');
                }
            });
            console.log('open');
        });
        $("[data-role=close]").click(function () {
            if (!$('[data-role=ulPort] li').hasClass('active')) {
                $('[data-role=info] p').text("请先选择一个串口!");
                return
            }

            $.post('com/close', function (data) {
                if (!data['result']) {
                    $('[data-role=info] p').text("串口已经关闭!");
                    $('[data-role="readwrite"]').addClass('hide');
                }
            });
            console.log('close');
        });
        $("[data-role=clear]").click(function () {
            $('#result').val('');
        })

        $("[data-role=send]").click(function () {
            var cmdText = $("#cmdText").val();

            console.log(cmdText);
            $.post('com/read', {
                cmd: cmdText,

            }, function (data) {
                var hexes = data.result.hexes;
                var text = hexes.map(function (item) {
                    return item
                }).join(' ');


                $('#result').val(text);
                console.info(data);
            }).error(function () {
                alert("请先打开串口或重新发送!")
            });

            $('#result').val('');
        })

        console.log(data);
    });
    $('body').delegate('[data-role=read]', 'click', function () {
        var $panel = $(this).parents('[data-read]');
        var callback = $panel.data('callback');
        var cmdText = $panel.data('read');
        var params = $panel.find('[data-field]').map((x, y) => _.object(["field", "type", "size"], [$(y).data('field'), $(y).data('type'), $(y).data('size')])).toArray();
        console.log(cmdText);
        $.post('com/read', {
            cmd: cmdText,
            params: JSON.stringify(params)
        }, function (data) {
            if (data.result == 0) {
                alert("由于调试字节流突然涌入,本次命令失效,请重新发送命令!")
                return;
            }
            var hexes = data.result.hexes;

            var text = hexes.map(function (item) {
                return item
            }).join(' ');
            if (!!callback) {

                window[callback].apply(this, data.result.data);
            }

            data.result.data.forEach((x, y) => {
                $panel.siblings().find('[data-field]').eq(y).data('value', x).val(x)
            });
            $('#result').val(text);
            console.info(data);
        }).error(function () {
            alert("请先打开串口或重新发送!")
        });
    });


    $('body').delegate('[data-role=write]', 'click', function () {

        var $panel = $(this).parents('[data-write]');
        var cmdText = $panel.data('write');
        var params = $panel.siblings('[data-read]').find('[data-field]').map((x, y) => _.object(["field", "type", "size"], [$(y).data('field'), $(y).data('type'), $(y).data('size')])).toArray();
        console.log(cmdText);
        $.post('com/write', {
            cmd: cmdText,
            params: JSON.stringify(params)
        }, function (data) {
            var hexes = data.result.hexes;
            if (data.result == 0) {
                alert("由于调试字节流突然涌入,本次命令失效,请重新发送命令!")
                return;
            }
            var text = hexes.map(function (item) {
                return item
            }).join(' ');



            $('#result').val(text);
            console.info(data);
        }).error(function () {
            alert("请先打开串口或重新发送!")
        });


    });

    function initTabs(a, b, count) {
        var tabsTemplate = _.template($('#tabsTemplate').html());
        $("#tabsContainter").html(tabsTemplate({
            sensors: _.range(1, count + 1)
        }));
        $('#tabsContainter [data-toggle="tab"]').tab();

        var tableTemplate = _.template($('#tableTemplate').html());
        $("#tableContainter").html(tableTemplate({
            sensors: _.range(1, count + 1)
        }));


        var sensors = _.map(_.range(count), function (item, index) {
            return "0000";
        });
        initBordertablepanel(sensors);

        initStripedtable(_.range(1, count + 1));
    }

    function initStripedtable(sensors) {

        var stripedtableTemplate = _.template($('#stripedtableTemplate').html());
        $("#stripedtableContainter").html(stripedtableTemplate({
            sensors: sensors
        }));

    }

    function initBordertablepanel(sensors) {
        sensors = Object.values(arguments).flat();
        var bordertablepanelTemplate = _.template($('#bordertablepanelTemplate').html());
        $("#bordertablepanelContainter").html(bordertablepanelTemplate({
            sensors: sensors
        }));

    }

     function initBordertable(sensors) {
        sensors = Object.values(arguments).flat();
        var bordertableTemplate = _.template($('#bordertableTemplate').html());
        $("#bordertableContainter").html(bordertableTemplate({
            sensors: sensors
        }));

    }
    
    function refreshBordertable(sensors) {
        sensors = Object.values(arguments);
        console.log(sensors);
        $('#bordertableContainter tbody tr').find('td:first').each(function (index) {
            $(this).html(dictKey2Name[sensors[index]]);
        });

    }
    window.dictKey2Name = {
        '0010': '温度',
        '0030': '湿度',
        '0016': '气压',
        '0013': '风速',
        '0014': '风向',
        '0011': 'PM25',
        '0032': 'PM10',

        '0019': 'PM100',
        '0012': '噪声',

    }



    window.initTabs = initTabs;
    window.initBordertable = initBordertable;
    window.refreshBordertable = refreshBordertable;
})

后台web框架和串口操作采用的是Python语言,其中web框架使用的是tornado。

web网站代码如下:

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import os
import tornado.web
import tornado.httpserver
import tornado.ioloop
import tools
from tornado.options import define,options
import json
define('port',default=9001,type=int,help="input your port")

class indexHandler(tornado.web.RequestHandler):
    def get(self):
        self.render('index.html')
class readHandler(tornado.web.RequestHandler):
    def post(self):
        cmdText=self.get_argument('cmd')
        params=self.get_argument('params',None)
        hexes=tools.writeText(cmdText)
        if not params:
            self.write({'result':{'hexes':hexes}})
            return 
                   
        print(hexes[0]!='01')
        if(hexes[0]!='01'):
            self.write({'result':0})        
        data=tools.parseText(' '.join(hexes),json.loads(params))
        result={'hexes':hexes,'data':data}
        self.write({'result':result})
#未实现    
class writeHandler(tornado.web.RequestHandler):
    def post(self):
        pass
        

class portsHandler(tornado.web.RequestHandler):
    def get(self):
        result=tools.getPorts()
        self.write({'result':result})
class openHandler(tornado.web.RequestHandler):
    def post(self):
        print(dir(self))
        com=self.get_argument('com')
        com=json.loads(com)
        print(com)
        stopbits=com.get('stopbits',1)
        result=tools.begin(com.get('port',None),\
                           int(com.get('baudrate',115200)),\
                           int(com.get('bytesize',8)),\
                           com.get('parity',"N"),\
                           stopbits)
        self.write({'result':True})
        
class closeHandler(tornado.web.RequestHandler):
    def post(self):       
        result=tools.end()
        self.write({'result':result})

app=tornado.web.Application(
	handlers=[(r'/',indexHandler),
            (r'/index',indexHandler),
            (r'/com/open',openHandler),
            (r'/com/close',closeHandler),
            (r'/com/read',readHandler),
            (r'/com/write',writeHandler),
            (r'/com/ports',portsHandler)
		],static_path=os.path.join(os.curdir,'static'),
        template_path=os.path.join(os.curdir,'template'),debug=True
        )
if __name__=='__main__':
    tools.init()
    print(tools.s2)
    tornado.options.parse_command_line()
    httpserver=tornado.httpserver.HTTPServer(app)
    httpserver.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

串口操作tools.py代码如下:

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import os
if os.name == 'nt':  # sys.platform == 'win32':
    import serial.tools.list_ports_windows as list_ports
    import list_ports_winreg as list_ports
elif os.name == 'posix':
    import serial.tools.list_ports_posix as list_ports

import serial
import time
import json
text='01 03 30 08 00 06 0A 4B'
#from serial.tools import list_ports
from datetime import datetime
import numpy as np
testSend1="01 03 10 00 01 7c 7b 41"
 
testReceive1="""
01 03 10 00 02 F8 23 03 33 30 42 30 37 31 38 31 32 30 33 30 31 32 00 00 39 32 36 30 00 32 31 38 32 31 38 2E 32 38 2E 37 31 2E 32 32 30 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 31 38 31 31 31 00 39 32 36 30 00 00 43 4D 4D 54 4D 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 3C 00 78 00 02 00 0A 00 50 00 78 00 3C 00 0A 00 00 00 00 00 00 00 FF 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 00 00 09 00 01 00 10 00 54 45 4D 50 00 00 00 00 01 00 08 00 E8 03 00 00 00 00 00 00 00 00 00 00 0A 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 00 00 00 00 00 00 02 00 30 00 48 55 4D 00 00 00 00 00 01 00 09 00 E7 03 00 00 00 00 00 00 00 00 00 00 0A 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 00 00 02 00 00 00 12 00 16 00 50 52 45 53 53 00 00 00 01 00 0C 00 B0 04 00 00 00 00 00 00 00 00 00 00 05 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 00 00 00 00 00 00 03 00 13 00 57 53 00 00 00 00 00 00 01 00 0B 00 2C 01 00 00 00 00 00 00 00 00 00 00 05 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 00 00 00 00 00 00 04 00 14 00 57 44 00 00 00 00 00 00 00 00 0A 00 10 0E 00 00 00 00 00 00 00 00 00 00 05 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 00 00 00 00 00 00 05 00 11 00 50 4D 32 35 00 00 00 00 00 00 06 00 E8 03 00 00 00 00 00 00 00 00 00 00 3C 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 06 00 00 00 00 00 06 00 32 00 50 4D 31 30 00 00 00 00 00 00 06 00 E8 03 00 00 00 00 00 00 00 00 00 00 3C 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 06 00 02 00 00 00 14 00 19 00 50 4D 31 30 30 00 00 00 00 00 06 00 20 4E 00 00 00 00 00 00 00 00 00 00 3C 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 06 00 00 00 00 00 18 00 12 00 4E 6F 69 73 65 00 00 00 01 00 0E 00 20 4E 00 00 00 00 00 00 00 00 00 00 3C 00 00 00 00 00 00 00 00 00 00 00 00 00 02 00 03 80 06 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 24 13
"""

def calc_crc(string):
    data = bytearray.fromhex(string)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if ((crc & 1) != 0):
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return '{:04x}'.format(((crc & 0xff) << 8) + (crc >> 8))

 
def getPorts():
    ports=list_ports.comports()
    result=list(map(lambda x:{"device":x.device},ports))
    return result


def begin(port,baudrate,bytesize,parity,stopbits):
    if port is None:
        return False
    global s2
    if(s2.is_open):
        s2.close()
    s2=serial.Serial(port=port, baudrate=baudrate, bytesize=bytesize, 
                      parity=parity, stopbits=stopbits, timeout=5)
    return s2.is_open
    
def end():
    global s2
    if(s2.is_open):
        s2.close()
    return s2.is_open
    



def writeText(text):
    #print(text)
    hexs=map(lambda x:int(x,base=16),text.split())
    s2.write(bytes(hexs))
    time.sleep(5)
    text=s2.read_all()
    text=list(map(lambda x:'{0:02x}'.format(x),text))
    #print(text)
    return text

 
def checkText(text):
    hexstrs=text.split()
    length=int(hexstrs[4]+hexstrs[5],16)
    calccode=calc_crc(' '.join(hexstrs[:length+6]))
     
    #checkcode=''.join(hexstrs[length+6:length+8][::-1])
    checkcode=''.join(hexstrs[-2:][::-1])
    if(int(checkcode,16)==int(calccode,16)):
        return True


def getBody(text):
    hexstrs=text.split()
    length=int(hexstrs[4]+hexstrs[5],16)
    
    if not checkText(text):
        return (False,)
    body=hexstrs[6:length+6]
    return body



def parseText(text,formats):
    
    c=list(map(lambda x:x['size'],formats))

    a=text.split()[6:6+sum(c)*2] 
    b=enumerate(formats)
    d=np.cumsum(c)
    e=d-c
     
    result=map(lambda x:dictParse[x[1]['type']](a[e[x[0]]*2:x[1]['size']*2+e[x[0]]*2]),b)
    result=list(result)
    return result

def reverseRecord(record,formats):
    result=[]
    bytesArray=map(lambda x:dictReverse[x[1]['type']](record[x[0]]),enumerate(formats))
    for i in bytesArray:
        result=result+i
    return result

def parseTIME_HEX(xs):
    result=map(lambda x:parseNUMBER_HEX(x),xs)
    result= list(result)
    result[0]=result[0]+2000
    result=datetime(*result).strftime("%Y-%m-%d %H:%M:%S")
    return result

def reverseTIME_HEX(text):
    date=datetime.strptime(text,"%Y-%m-%d %H:%M:%S")
    result=[date.year-2000,date.month,date.day,date.hour,date.minute,date.second]
    result=list(map(lambda x:reverseNUMBER_HEX(x),result))
    return result

def parseREVERSE_STR(xs):
    xs.reverse()
    result=''.join(xs)
    return result


def parseSTR(xs):
    
    result=''.join(xs)
    return result
def reverseREVERSE_STR(text):
    result=zip(text[::2],text[1::2])
    result=list(map(lambda x:x[0]+x[1],result))
    result.reverse()
    
    return result

def reverseSTR(text):
    result=zip(text[::2],text[1::2])
    result=list(map(lambda x:x[0]+x[1],result))
     
    
    return result


def parseREVERSE_NUMBER(xs):
    xs.reverse()
    result=''.join(xs)
    result=int(result,16)
    return result

def reverseREVERSE_NUMBER(text):
    result=int(text)
    result='{:04x}'.format(result)
    result=reverseREVERSE_STR(result)
    return result
    
    
def parseBYTES(xs):
    #print(xs)
    result=bytes.fromhex(''.join(xs)).decode().strip('\x00')
    return result
    
def reverseBYTES(text,length):
    result=text.encode().hex()
    result='{0:0<{1}s}'.format(result,length)
    result=reverseREVERSE_STR(result)[::-1]
    return result

def parseNUMBER_HEX(xs):
    return int(xs,16)

def parseNUMBER_HEXS(xs):
    result=map(lambda x:str(parseNUMBER_HEX(x)),xs)
    result=''.join(result)
    return result
def reverseNUMBER_HEX(x):
    return '{:02x}'.format(int(x))

def parseNUMBERS_HEX(xs):
    return int(''.join(xs),16)

def reverseNUMBERS_HEX(text):
    result='{:04x}'.format(int(text))
    result=reverseREVERSE_STR(result)[::-1]
    return result

def reverseNUMBER_HEXS(text):
    result=map(lambda x:'{:02d}'.format(int(x)),text)
    return ''.join(result)

def parseNUMBER_BITS(xs):
    result=int(''.join(xs),16)
    result='{:016b}'.format(result)
    return result

def reverseNUMBER_BITS(text):
    result=int(text,2)
    result='{:04x}'.format(result)
    return result
    
def init():

 
    global s2
    s2=serial.SerialBase()
    global dictParse
    dictParse={"STR":parseSTR,"NUMBER_BITS":parseNUMBER_BITS,
               "NUMBER_HEXS":parseNUMBER_HEXS,"REVERSE_NUMBER":parseREVERSE_NUMBER,
               "TIME_HEX":parseTIME_HEX,"NUMBERS_HEX":parseNUMBERS_HEX,
               'REVERSE_STR':parseREVERSE_STR,'BYTES':parseBYTES}
    global dictReverse
    dictReverse={"STR":reverseSTR,"NUMBER_BITS":reverseNUMBER_BITS,
                 "NUMBER_HEXS":reverseNUMBER_HEXS,"REVERSE_NUMBER":reverseREVERSE_NUMBER,
                 "TIME_HEX":reverseTIME_HEX,"NUMBERS_HEX":reverseNUMBERS_HEX,
                 'REVERSE_STR':reverseREVERSE_STR,'BYTES':reverseBYTES}

list_ports_winreg.py用于识别通过软件虚拟的串口,代码如下:

代码语言:javascript
复制
#! python
#
# Enumerate serial ports on Windows including a human readable description
# and hardware information using winreg.
#
# Using winreg helps find virtual comports

try:
    # Python 3.X
    import winreg
except ImportError:
    # Python 2.7 compatibility
    try:
        import _winreg as winreg
    except ImportError:
        winreg = None

from serial.tools import list_ports_common
from serial.tools import list_ports_windows


SERIAL_REGISTRY_PATH = 'HARDWARE\\DEVICEMAP\\SERIALCOMM'


def regval_to_listport(winport):
    """Convert a windows port from registry key to pyserial's ListPortInfo.
    Args:
        winport (tuple): Windows registry value (description, device, value).
    Returns:
        listport (ListPortInfo): comport device details.
    """
    # Create the ListPortInfo
    description, device, _ = winport
    listport = list_ports_common.ListPortInfo(device)

    # Format the description like other ListPortInfo
    description = description.replace('\\Device\\', '')
    listport.description = "{} ({})".format(description, device)

    return listport
# end regval_to_listport


def winreg_comports():
    """Return windows comports found in the registry.
    See Also:
        list_ports_winreg.comports(), list_ports_winreg.comports_list(),
        list_ports_windows.comports()
    Note:
        This should include virtual comports, and it is significantly faster
        than list_ports_windows.comports(). However, list_ports_windows has far
        more information. comports() contains all list_ports_windows.comports()
        and winreg_comports() that were not found from list_ports_windows.
    Returns:
        comports (list): Sorted list of ListPortInfo.
    """
    try:
        # Get the Serial Coms registry
        key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, SERIAL_REGISTRY_PATH)

        # Get key info - number of values (subkeys, num_vals, last_modified)
        num_values = winreg.QueryInfoKey(key)[1]

        # Make a generator of comports
        for i in range(num_values):        
            # get registry value for the comport
            value = winreg.EnumValue(key, i)
            yield regval_to_listport(value)

        # Close the registry key
        winreg.CloseKey(key)
    except (AttributeError, WindowsError, EnvironmentError):
        # winreg is None or there was a key error
        pass
# end winreg_comports


def comports_list():
    """Return a list of comports found from list_ports_windows and comports
    found in the window registry.
    
    See Also:
        list_ports_winreg.comports(), list_ports_winreg.winreg_comports(),
        list_ports_windows.comports()
    Note:
        This should include virtual comports. This method contains all
        list_ports_windows.comports() and winreg_comports() that were not found
        from list_ports_windows.
    Returns:
        comports (list): List of ListPortInfo comport details.
    """
    comports = list(list_ports_windows.comports())

    comports[len(comports): ] = [li for li in winreg_comports()
                                 if li not in comports]

    return comports
 


def comports():
    """Generator for comports found from list ports windows and comports found
    in the windows registry.
    See Also:
        list_ports_winreg.comports_list(), list_ports_winreg.winreg_comports(),
        list_ports_windows.comports()
    Note:
        This should include virtual comports. This method contains all
        list_ports_windows.comports() and winreg_comports() that were not found
        from list_ports_windows.
    Yields:
        comport (ListPortInfo): Comport details.
    Returns:
        comports (generator): Generator of ListPortInfo comports.
    """
    existing = []
    for comport in list_ports_windows.comports():
        existing.append(comport)
        yield comport

    for li in winreg_comports():
        if li not in existing:
            existing.append(li)
            yield li
 
if __name__ == '__main__':
    
    for port, desc, hwid in sorted(comports()):
        
        print("%s: %s [%s]" % (port, desc, hwid))

上面便是这个程序的全部代码了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档