前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python开发物联网数据分析平台---查询方法

Python开发物联网数据分析平台---查询方法

原创
作者头像
MiaoGIS
修改2020-03-17 18:46:20
7920
修改2020-03-17 18:46:20
举报
文章被收录于专栏:Python in AI-IOT

介绍一下平台实现查询所用的queryDF模块。该模块loadData()随着tornado Web程序启动调用一次,读取数据目录下的所有pkl文件,用pandas的DataFrame存储在内存中。

数据存储

数据按照日期存储在pkl文件中,更快的让pandas加载,同时体积更小。

定时程序定期在凌晨将昨天产生的数据提取为pkl文件保存在此目录下。

数据加载

第一次加载读取所有pkl文件到全局变量df中,同时用全局变量maxDate和minDate跟踪当前数据的最新日期。

数据热更新

web服务启动后,每天都会有新的pkl文件出现在数据目录下,初次启动加载的数据保存在全局变量df中,需要往其中动态追加数据。使用refreshData来更新全局变量df,这时候用以跟踪当前数据的最新日期的maxDate和minDate起到了作用。

更新数据的方法已经有了,什么时候调用呢。我们写一个装饰器,让每一个查询函数调用前都去检查更新数据。

queryDF.py代码如下:

代码语言:python
代码运行次数:0
复制
# -*- coding:utf-8 -*-
import numpy as np

import pandas as pd
from datetime import datetime,timedelta
import os
from functools import wraps
import time
 
global baseDir

baseDir="D:/DataC/t"

global df
global maxDate
global minDate

def refreshData_decorator(f): 
    @wraps(f)
    def wrapper(*args,**kwds):
        refreshData()
        print('refreshData')
        return f(*args,**kwds)
    return wrapper

def loadData():
    print('begin load data!')
    global df
    global maxDate
    global minDate
    fileNames=sorted(os.listdir(baseDir))
    dfs=map(lambda x:pd.read_pickle(os.path.join(baseDir,x)),fileNames)

    df=pd.concat(dfs)
    
    df=df.set_index(['DevID', 'RecDateTime'])
    
    maxDate=datetime.strptime(fileNames[-1],'%Y-%m-%d.pkl')
    minDate=datetime.strptime(fileNames[0],'%Y-%m-%d.pkl')
    print('load data completed!')
def refreshData():
    global df
    global maxDate
    global minDate  
    fileNames=sorted(os.listdir(baseDir))
    latestDate=datetime.strptime(fileNames[-1],'%Y-%m-%d.pkl')
    if(maxDate<latestDate):
        days=(latestDate-maxDate).days
        for i in range(days):
            newDate=maxDate+timedelta(i+1)
            newFile=newDate.strftime('%Y-%m-%d.pkl')
            if newFile in fileNames:
                print('append '+newFile)
                df0=pd.read_pickle(os.path.join(baseDir,newFile))
                if(df0.shape[0]==0):
                    print('Empty in '+newFile)
                    continue
                df0=df0.set_index(['DevID', 'RecDateTime'])                           
                df=df.append(df0)     
        maxDate=latestDate
    return df
def getTime(s):
	r="2019-10-02 %s:%s:00"%(s//3600,(s%3600)//60)
	r=pd.datetime.strptime(r,"%Y-%m-%d %H:%M:%S")
	return r
    
def getNames():
    ids=df.index.get_level_values(0).unique()
    return pd.DataFrame(ids)

def getInterpolate(df0):
    seconds=pd.timedelta_range(start='1 day', end='1 day 23:45:00', freq='15min').map(lambda x:x.seconds)
    dfSeconds=pd.DataFrame({'seconds':seconds})
    
    df0['seconds']=df0.index.map(lambda x:x.hour*3600+x.minute*60+x.second)
    df00=df0.append(dfSeconds).set_index('seconds').sort_index()
    df00=df00.interpolate(limit_direction='both').loc[seconds]
    df00['time']=df00.index.map(lambda x:getTime(x))
    df00=df00.reset_index().set_index('time')
    return df00

@refreshData_decorator    
def getData(devID,start,end):
    if devID not in df.index:
        return "{}"
    df0=df.loc[devID]
    df0=df0.sort_index()
    df0=df0.loc[start:end]
    df0.reset_index(inplace=True)
    df0.RecDateTime=df0.RecDateTime.apply(lambda x:pd.datetime.strftime(x,'%Y-%m-%d %H:%M:%S'))
    df0.AddTime=df0.AddTime.apply(lambda x:pd.datetime.strftime(x,'%Y-%m-%d %H:%M:%S'))
    result=df0.to_json()
    return result

@refreshData_decorator
def getId(ID): 
    if ID is None:
        return pd.DataFrame()
    ID=int(ID)
    df0=df.loc[ID].sort_index()
    d1=df0.last('1D')

    d2=df0.last('2D')[:-d1.shape[0]]

    lastWeek=(d1.index[0]-timedelta(days=7)).strftime("%Y-%m-%d")
    d3=df0.loc[lastWeek]
    
    d11=getInterpolate(d1)
    d22=getInterpolate(d2)
    d33=getInterpolate(d3)

    nrange=np.arange(0,1,0.1)

    d4=df0.last('30D')
    
    q1=d1['DevData'].quantile(nrange)
    q2=d4['DevData'].quantile(nrange)


    g1=d1.groupby(lambda x:x.strftime("%H"))['DevData']
    g2=df0.groupby(lambda x:x.strftime("%H"))['DevData']
    
    r={'d1':d11,'d2':d22,'d3':d33,'q1':q1,'q2':q2,'df0':d1,'g1':g1,'g2':g2}
    return r
     
@refreshData_decorator
def getIds(ids):
    df0=df.groupby(level='DevID').count()['ID'].reset_index()
    if ids==[]:
        return df0
    else:
        df0=df0[df0.DevID.isin(ids)]
        return df0

@refreshData_decorator
def getYear(ids):
    df0=df.reset_index(level="DevID").sort_index()
   
    df0=df0[df0.DevID.isin(ids)]
      
    return df0
  
@refreshData_decorator
def getDay(ids):
    df0=df.reset_index(level="DevID").sort_index()
    if ids!=[]:
        df0=df0[df0.DevID.isin(ids)]
    r=df0.last('1D')
    return r

@refreshData_decorator
def getMinutes15(ids):
    df0=df.reset_index(level="DevID").sort_index()
    if ids!=[]:
        df0=df0[df0.DevID.isin(ids)]
    r=df0.last('16min')
    return r
@refreshData_decorator
def getHour(ids):
    df0=df.reset_index(level="DevID").sort_index()
    if ids!=[]:
        df0=df0[df0.DevID.isin(ids)]
    r=df0.last('1H')
    return r

@refreshData_decorator
def getInfo(ids):
    df0=df.reset_index(level="DevID").sort_index()
    if ids!=[]:
        df0=df0[df0.DevID.isin(ids)]
     
    total=len(df0.DevID.unique())
    size=df0.shape[0]
    
    g1=df0.groupby(lambda x:x.strftime("%H"))['DevData']
       
    last1D=df0.last('1D')
     
    last1H=df0.last('1H')
    last1M=df0.last('30D')
    #lastData=df0.last('1D').sort_index().tail(10)
    deltaG=getDeltaG(ids)
    deltaM=getDeltaM(ids,16)
    deltaM=deltaM.set_index("start")
    deltaMG=deltaM.groupby(lambda x:x.strftime('%Y-%m'))['DevID'].count()
    return {'total':total,'size':size,'last1D':last1D,'last1H':last1H,'last1M':last1M,'deltaG':deltaG,'deltaMG':deltaMG,'g1':g1}


@refreshData_decorator
def getDeltaM1():
    df0=df.reset_index(level="DevID").last('30D').reset_index().set_index(["DevID","RecDateTime"])
    ids=df0.index.get_level_values(0).unique()
    r=map(lambda x:getDeltaT(df0.loc[x],x),ids,16)
    r=pd.concat(r)
    r=r.iloc[(r.end-r.start).values.astype(np.int64)>1000000000000][["DevID","start","end"]]
    return r

@refreshData_decorator
def getError(ids):
    df0=df.reset_index(level="DevID").last('30D').reset_index()
    if ids!=[]:
        df0=df0[df0.DevID.isin(ids)]
    dfError=df0.iloc[(df0.ErrorCode.astype(np.int8)!=0).values].sort_index(ascending=False)
    dfAlarm=df0.iloc[(df0.DevAlarm.astype(np.int8)!=0).values].sort_index(ascending=False)
    return {'dfError':dfError,'dfAlarm':dfAlarm}
 
    
def getDelta(d):
    
    j1=d.index[:-1]
    j2=d.index[1:]
    jj=j2-j1
    return jj

def getDeltaT(d,ID,minutes):
    d=d.sort_index()
    i=getDelta(d)
    j=i.values.astype(np.int64)>60000000000*minutes
    r=d.iloc[1:].iloc[j]
    r['end']=r.index
    r['DevID']=ID
    r['start']=d.iloc[:-1].iloc[j].index
    r=r.reset_index()
    return r
def getDeltaM(ids,minutes):
     
    ids0=df.index.get_level_values(0).unique()
    if(ids!=[]):
        ids=np.intersect1d(ids0,ids).astype(np.int16)
    else:
        ids=ids0
    r=map(lambda x:getDeltaT(df.loc[x],x,minutes),ids)
    r=pd.concat(r)
    r=r.iloc[(r.end-r.start).values.astype(np.int64)>60000000000*minutes][["DevID","start","end"]]
    return r

def getDeltaG(ids):
    ids0=df.index.get_level_values(0).unique()
    if(ids!=[]):
        ids=np.intersect1d(ids0,ids).astype(np.int16)
    else:
        ids=ids0
    r=map(lambda x:getDelta(df.loc[x]).value_counts(),ids)
    df0=pd.concat(list(r))
    nrange=np.array([1000000000000,900000000000*4])
    
    g=df0.groupby(nrange.searchsorted(df0.index.values.astype(np.int64))).sum()
    g=g.rename({0:'A',1:'B',2:'C'})
    return g
    
        

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据存储
  • 数据加载
  • 数据热更新
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档