前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python开发---使用subprocess从命令行程序获取数据

Python开发---使用subprocess从命令行程序获取数据

原创
作者头像
MiaoGIS
修改2020-11-04 10:38:18
7780
修改2020-11-04 10:38:18
举报
文章被收录于专栏:Python in AI-IOT

发现一个简单的解决不同语言开发的程序之间调用对方函数获取数据的方法,就是使用命令行作为数据流的接口。

下面举例说明。

比如可以用一个C# 开发一个命令行程序。程序使用命令行参数,来选择调用不同函数,同时传入其他函数调用需要的参数,返回结果用JSON格式输出到命令行。

下面的C#命令行程序,根据args[0]来选择调用不同的函数,其他args参数作为调用具体函数的参数。返回结果以JSON格式输出到命令行。

代码语言:c#
复制
private static void Main(string[] args)
{
    helper helper = new helper();
    string str = args[0];
    if (!(str == "1"))
    {
        if (!(str == "2"))
        {
            if (!(str == "3"))
            {
                if (str == "4")
                {
                    string startDate = args[1];
                    string endDate = args[2];
                    Console.WriteLine(JsonConvert.SerializeObject(helper.GetUserLogList(0, 0, startDate, endDate, 0, 2, "", "")));
                }
                return;
            }
            Console.WriteLine(JsonConvert.SerializeObject(helper.GetTargetValueList(0L, "0", DateTime.Now.Year.ToString(), DateTime.Now.Year.ToString())));
            return;
        }
    }
    else
    {
        Console.WriteLine(JsonConvert.SerializeObject(helper.GetRealTimeMonitorList(0L, "0", "F02,F03,F04,F05,F15,F16,F13,F08,F09,F07,F11,F10", 1)));
        return;
    }
    Console.WriteLine(JsonConvert.SerializeObject(helper.GetMinutesDataList(0, 0, 0L, "", "F02,F03,F04,F05,F15,F16,F13,F08,F09,F07,F11,F10", DateTime.Now.ToString("yyyy-MM-dd"), DateTime.Now.ToString("yyyy-MM-dd"))));
}

在Python来调用它,其实就是带上参数来运行上面的命令行程序来调用不同的函数,并获取返回数据。这时候使用的是

代码语言:python
代码运行次数:0
复制
# -*- coding:utf-8 -*-
import json
import subprocess


cmdPath='C#控制台程序的文件路径'
dictFactor1 = {'F02': 'cod', 'F03': 'nh', 'F04': 'p', 'F05': 'n'}
dictFactor2={'水质类别': 'level', 'COD': 'cod', '总磷': 'p', '氨氮': 'nh', '总氮': 'n'}

dictFactor11=dict(zip(dictFactor1.values(),dictFactor1.keys()))
dictFactor22=dict(zip(dictFactor2.values(),dictFactor2.keys()))

dictGrid={"1037068501":"1035","1037068502":"1042","1037068503":"1045",
          "1037068504":"1045","1037068506":"1045","1037068508":"1044",
          "1037068505":"1045","1037068507":"1044"}

global dictStation
global dictTarget




def getFactor1(r):
    d1=dict(map(lambda x:[x["ItemCode"],x["ItemValue"]],r))
    d2=dict(map(lambda x:[x[1],d1.get(x[0])],dictFactor1.items()))
    return d2

def getFactor2(r):
    d1=dict(map(lambda x:[x[1],r.get(x[0])],dictFactor1.items()))
    return d1

def getFactor3(r):
    d1=dict(map(lambda x:[x[1],r.get(x[0]+"Target")],dictFactor1.items()))
    return d1
def getStation():
    p=subprocess.Popen([cmdPath,"1"],
                       stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()
    hourData=json.loads(result.decode('gbk').strip())['SiteList']
    global dictStation
    dictStation=dict(map(lambda x:[x['SiteId'],{'SiteName':x['SiteName'],'SiteId':x['SiteId'],\
                  'Longitude':x['Dongitude'],'Latitude':x['Dimensionality'],'lastHour':{},'lastMinute':{}}],hourData))
    return dictStation

def getTarget():
    p=subprocess.Popen([cmdPath,"3"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    result,stderr=p.communicate()
    targetData=json.loads(result.decode('gbk').strip())
    global dictTarget
    dictTarget=dict(map(lambda x:[x['SiteId'],getFactor3(x)],targetData))
    return dictTarget

def getAlarm(data):
    alarms=[]      
    for i in data:
        for k,v in dictTarget[i['SiteId']].items():
            if(i['factors'][k]==''):
                continue
            value=float(str(i['factors'][k]).replace('-','0'))
            if(value>float(v)):
                alarms.append({'SiteId':i['SiteId'],'RecordTime':i['RecordTime'],'Factor':k,'Value':value,'Target':float(v)})
    return alarms

def getHourAlarm():
    p=subprocess.Popen([cmdPath,"1"],
                       stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()
    hourData=json.loads(result.decode('gbk').strip())['SiteList']
    hourData=filter(lambda x:x.get('RecordTime'),hourData)
    hourData=list(map(lambda x:{'SiteName':x['SiteName'],'SiteId':x['SiteId'],'RecordTime':x['RecordTime'],\
                  'Longitude':x['Dongitude'],'Latitude':x['Dimensionality'],\
                  'factors':getFactor1(x['ItemDataList'])},hourData))
    #hourData=list(filter(lambda x :x['SiteId'] in ['37068506'],hourData))

    result=getAlarm(hourData)
    return result

def getMinuteAlarm():
    p=subprocess.Popen([cmdPath,"2"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    result,stderr=p.communicate()
    minuteData=json.loads(result.decode('gbk').strip())['rows']
    
    minuteData=list(map(lambda x:{'SiteName':x['SiteName'],'SiteId':x['SiteID'],'RecordTime':x['MonitorTime'],\
                            'factors':getFactor2(x)},minuteData))
    #minuteData=list(filter(lambda x :x['SiteId'] in ['37068506'],minuteData))
    result=getAlarm(minuteData)
    return result


def getLogs(start,end):
    p=subprocess.Popen([cmdPath,"4",start,end],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result,stderr=p.communicate()

    result=json.loads(result.decode('gbk').strip())
    return result


            
        
 
    
  
    

通过subprocess来调用可以传入参数的命令行程序并获取返回结果。这样就可以将C#语言的不同的函数包装成了Python语言的不同函数。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档