为了更好的使用Python来开发物联网数据分析平台,我们使用pkl文件以日期作为文件名称来存储数据。原来数据是在数据库中的,我们需要定时将最近产生的数据导出为pkl文件。下面两段代码,分别实现按照日期从数据库中导出pkl文件以及定时任务执行前者。
下面数据表T_PRESSURE_DATA201901,T_PRESSURE_DATA201902..按照月份建表
export.py代码如下:
#-*- coding:utf-8 *-*
from collections import namedtuple
from pydal import DAL, Field
import pandas as pd
import os
dbConStr='mssql3://XX:XXXXXX@XX.XX.XX.XX/XXXXX'
cols=['ID', 'DevID', 'DevData', 'DevUnit', \
'voltage', 'DevAlarm', 'RecDateTime', \
'StrInfo', 'ErrorCode', 'dflag', 'AddTime']
PRE=namedtuple('PreData',cols)
sqlTem="""SELECT * FROM T_PRESSURE_DATA{month}
where recdatetime BETWEEN '{start}' and '{end}'
"""
dictDevAlarm0={0:"正常",1:"高高报",2:"高报",3:"低报",4:"低低报"}
dictDevAlarm1={'正常': 0, '高高报': 1, '高报': 2, '低报': 3, '低低报': 4}
dictDevUnit0={1:"PRE(Pa)",2:"TEMP(℃)"}
dictDevUnit1={"PRE(Pa)":1,"TEMP(℃)":2}
dictErrorCode0={'0': '无异常', '1': '电压异常', '2': '压力异常',\
'3': '客户网络异常', '4': '设备液位异常',\
'5': '气体泄漏异常', '6': '位置异常', \
'7': '温度异常', '8': '温度传感器通信故障',\
'9': '催化传感器故障', '10': '流量异常', '11': '井盖异动',\
'12': '门禁异常', '13': '境内液位异常', '14': '燃气阀门状态', \
'15': '液位传感器故障'}
dictErrorCode1={'无异常': '0', '电压异常': '1', '压力异常': '2',\
'客户网络异常': '3', '设备液位异常': '4',\
'气体泄漏异常': '5', '位置异常': '6',\
'温度异常': '7', '温度传感器通信故障': '8',\
'催化传感器故障': '9', '流量异常': '10',\
'井盖异动': '11', '门禁异常': '12', \
'境内液位异常': '13', '燃气阀门状态': '14',\
'液位传感器故障': '15'}
def exportPressure(month,start,end):
db = DAL(dbConStr)
sql=sqlTem.format(month=month,start=start,end=end)
r=db.executesql(sql)
pressureRows=map(lambda x:PRE(*x),r)
df0=pd.DataFrame(pressureRows)
df0.DevAlarm=df0.DevAlarm.apply(lambda x:dictDevAlarm1.get(x,'-1'))
df0.ErrorCode=df0.ErrorCode.apply(lambda x:dictErrorCode1.get(x,'-1'))
df0.DevUnit=df0.DevUnit.apply(lambda x:dictDevUnit1.get(x,'-1'))
db.close()
df0.to_pickle(os.path.join('D:/DataC/data_pressure',start+'.pkl'))
return df0
task.py代码如下:
# -*- coding:utf-8 -*-
import schedule
from datetime import datetime,timedelta
from export import *
global date
def job():
global date
yesterday=datetime.now()-timedelta(days=1)
yesterday=yesterday.replace(hour=0,minute=0,second=0,microsecond=0)
if(date>yesterday):
return
start=date
end=date+timedelta(days=1)
exportPressure(date.strftime('%Y%m'),\
start.strftime('%Y-%m-%d'),\
end.strftime('%Y-%m-%d'))
print("I'm working..."+str(start)+"---"+str(end))
date=end
def job2():
today=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)
yesterday=today-timedelta(days=1)
exportPressure(yesterday.strftime('%Y%m'),\
yesterday.strftime('%Y-%m-%d'),\
yesterday.strftime('%Y-%m-%d'))
print("I'm working..."+str(yesterday))
#schedule.every(10).seconds.do(job)
schedule.every().day.at("00:30").do(job2)
if __name__=='__main__':
#date=datetime(2019,9,30)
while True:
schedule.run_pending()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。