前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python开发物联网数据分析平台---数据抽取

Python开发物联网数据分析平台---数据抽取

原创
作者头像
MiaoGIS
修改2020-03-17 18:47:29
2.1K0
修改2020-03-17 18:47:29
举报
文章被收录于专栏:Python in AI-IOTPython in AI-IOT

为了更好的使用Python来开发物联网数据分析平台,我们使用pkl文件以日期作为文件名称来存储数据。原来数据是在数据库中的,我们需要定时将最近产生的数据导出为pkl文件。下面两段代码,分别实现按照日期从数据库中导出pkl文件以及定时任务执行前者。

数据库导出

下面数据表T_PRESSURE_DATA201901,T_PRESSURE_DATA201902..按照月份建表

export.py代码如下:

代码语言:python
复制
#-*- coding:utf-8 *-*

from collections import namedtuple
from pydal import DAL, Field
import pandas as pd
import os
dbConStr='mssql3://XX:XXXXXX@XX.XX.XX.XX/XXXXX'

cols=['ID', 'DevID', 'DevData', 'DevUnit', \
      'voltage', 'DevAlarm', 'RecDateTime', \
      'StrInfo', 'ErrorCode', 'dflag', 'AddTime']

PRE=namedtuple('PreData',cols)

sqlTem="""SELECT  * FROM T_PRESSURE_DATA{month}
     where recdatetime BETWEEN '{start}' and '{end}'
     """


dictDevAlarm0={0:"正常",1:"高高报",2:"高报",3:"低报",4:"低低报"}
dictDevAlarm1={'正常': 0, '高高报': 1, '高报': 2, '低报': 3, '低低报': 4}

dictDevUnit0={1:"PRE(Pa)",2:"TEMP(℃)"}
dictDevUnit1={"PRE(Pa)":1,"TEMP(℃)":2} 

 
dictErrorCode0={'0': '无异常', '1': '电压异常', '2': '压力异常',\
              '3': '客户网络异常', '4': '设备液位异常',\
              '5': '气体泄漏异常', '6': '位置异常', \
              '7': '温度异常', '8': '温度传感器通信故障',\
              '9': '催化传感器故障', '10': '流量异常', '11': '井盖异动',\
              '12': '门禁异常', '13': '境内液位异常', '14': '燃气阀门状态', \
              '15': '液位传感器故障'}

dictErrorCode1={'无异常': '0', '电压异常': '1', '压力异常': '2',\
              '客户网络异常': '3', '设备液位异常': '4',\
              '气体泄漏异常': '5', '位置异常': '6',\
              '温度异常': '7', '温度传感器通信故障': '8',\
              '催化传感器故障': '9', '流量异常': '10',\
              '井盖异动': '11', '门禁异常': '12', \
              '境内液位异常': '13', '燃气阀门状态': '14',\
              '液位传感器故障': '15'}
def exportPressure(month,start,end):
    db = DAL(dbConStr)
    sql=sqlTem.format(month=month,start=start,end=end)
    r=db.executesql(sql)
    
    pressureRows=map(lambda x:PRE(*x),r)
    df0=pd.DataFrame(pressureRows)
    df0.DevAlarm=df0.DevAlarm.apply(lambda x:dictDevAlarm1.get(x,'-1'))
    df0.ErrorCode=df0.ErrorCode.apply(lambda x:dictErrorCode1.get(x,'-1'))
    df0.DevUnit=df0.DevUnit.apply(lambda x:dictDevUnit1.get(x,'-1'))
    db.close()
    df0.to_pickle(os.path.join('D:/DataC/data_pressure',start+'.pkl'))
    return df0
   




定时任务

task.py代码如下:

代码语言:python
复制
# -*- coding:utf-8 -*-
import schedule
from datetime import datetime,timedelta

from export import *
global date

def job():
    global date
    yesterday=datetime.now()-timedelta(days=1)
    yesterday=yesterday.replace(hour=0,minute=0,second=0,microsecond=0)
    if(date>yesterday):
        return 
    start=date
    end=date+timedelta(days=1)
    exportPressure(date.strftime('%Y%m'),\
                   start.strftime('%Y-%m-%d'),\
                   end.strftime('%Y-%m-%d'))
                   
    print("I'm working..."+str(start)+"---"+str(end))
    date=end

def job2():
    today=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)
    yesterday=today-timedelta(days=1)
    

    exportPressure(yesterday.strftime('%Y%m'),\
                   yesterday.strftime('%Y-%m-%d'),\
                   yesterday.strftime('%Y-%m-%d'))
                   
    print("I'm working..."+str(yesterday))
   
#schedule.every(10).seconds.do(job)

schedule.every().day.at("00:30").do(job2)

if __name__=='__main__':
    #date=datetime(2019,9,30)
    while True:
        schedule.run_pending()
        

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据库导出
  • 定时任务
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档