介绍一下平台实现查询所用的queryDF模块。该模块loadData()随着tornado Web程序启动调用一次,读取数据目录下的所有pkl文件,用pandas的DataFrame存储在内存中。
数据按照日期存储在pkl文件中,更快的让pandas加载,同时体积更小。
定时程序定期在凌晨将昨天产生的数据提取为pkl文件保存在此目录下。
第一次加载读取所有pkl文件到全局变量df中,同时用全局变量maxDate和minDate跟踪当前数据的最新日期。
web服务启动后,每天都会有新的pkl文件出现在数据目录下,初次启动加载的数据保存在全局变量df中,需要往其中动态追加数据。使用refreshData来更新全局变量df,这时候用以跟踪当前数据的最新日期的maxDate和minDate起到了作用。
更新数据的方法已经有了,什么时候调用呢。我们写一个装饰器,让每一个查询函数调用前都去检查更新数据。
queryDF.py代码如下:
# -*- coding:utf-8 -*-
import numpy as np
import pandas as pd
from datetime import datetime,timedelta
import os
from functools import wraps
import time
global baseDir
baseDir="D:/DataC/t"
global df
global maxDate
global minDate
def refreshData_decorator(f):
@wraps(f)
def wrapper(*args,**kwds):
refreshData()
print('refreshData')
return f(*args,**kwds)
return wrapper
def loadData():
print('begin load data!')
global df
global maxDate
global minDate
fileNames=sorted(os.listdir(baseDir))
dfs=map(lambda x:pd.read_pickle(os.path.join(baseDir,x)),fileNames)
df=pd.concat(dfs)
df=df.set_index(['DevID', 'RecDateTime'])
maxDate=datetime.strptime(fileNames[-1],'%Y-%m-%d.pkl')
minDate=datetime.strptime(fileNames[0],'%Y-%m-%d.pkl')
print('load data completed!')
def refreshData():
global df
global maxDate
global minDate
fileNames=sorted(os.listdir(baseDir))
latestDate=datetime.strptime(fileNames[-1],'%Y-%m-%d.pkl')
if(maxDate<latestDate):
days=(latestDate-maxDate).days
for i in range(days):
newDate=maxDate+timedelta(i+1)
newFile=newDate.strftime('%Y-%m-%d.pkl')
if newFile in fileNames:
print('append '+newFile)
df0=pd.read_pickle(os.path.join(baseDir,newFile))
if(df0.shape[0]==0):
print('Empty in '+newFile)
continue
df0=df0.set_index(['DevID', 'RecDateTime'])
df=df.append(df0)
maxDate=latestDate
return df
def getTime(s):
r="2019-10-02 %s:%s:00"%(s//3600,(s%3600)//60)
r=pd.datetime.strptime(r,"%Y-%m-%d %H:%M:%S")
return r
def getNames():
ids=df.index.get_level_values(0).unique()
return pd.DataFrame(ids)
def getInterpolate(df0):
seconds=pd.timedelta_range(start='1 day', end='1 day 23:45:00', freq='15min').map(lambda x:x.seconds)
dfSeconds=pd.DataFrame({'seconds':seconds})
df0['seconds']=df0.index.map(lambda x:x.hour*3600+x.minute*60+x.second)
df00=df0.append(dfSeconds).set_index('seconds').sort_index()
df00=df00.interpolate(limit_direction='both').loc[seconds]
df00['time']=df00.index.map(lambda x:getTime(x))
df00=df00.reset_index().set_index('time')
return df00
@refreshData_decorator
def getData(devID,start,end):
if devID not in df.index:
return "{}"
df0=df.loc[devID]
df0=df0.sort_index()
df0=df0.loc[start:end]
df0.reset_index(inplace=True)
df0.RecDateTime=df0.RecDateTime.apply(lambda x:pd.datetime.strftime(x,'%Y-%m-%d %H:%M:%S'))
df0.AddTime=df0.AddTime.apply(lambda x:pd.datetime.strftime(x,'%Y-%m-%d %H:%M:%S'))
result=df0.to_json()
return result
@refreshData_decorator
def getId(ID):
if ID is None:
return pd.DataFrame()
ID=int(ID)
df0=df.loc[ID].sort_index()
d1=df0.last('1D')
d2=df0.last('2D')[:-d1.shape[0]]
lastWeek=(d1.index[0]-timedelta(days=7)).strftime("%Y-%m-%d")
d3=df0.loc[lastWeek]
d11=getInterpolate(d1)
d22=getInterpolate(d2)
d33=getInterpolate(d3)
nrange=np.arange(0,1,0.1)
d4=df0.last('30D')
q1=d1['DevData'].quantile(nrange)
q2=d4['DevData'].quantile(nrange)
g1=d1.groupby(lambda x:x.strftime("%H"))['DevData']
g2=df0.groupby(lambda x:x.strftime("%H"))['DevData']
r={'d1':d11,'d2':d22,'d3':d33,'q1':q1,'q2':q2,'df0':d1,'g1':g1,'g2':g2}
return r
@refreshData_decorator
def getIds(ids):
df0=df.groupby(level='DevID').count()['ID'].reset_index()
if ids==[]:
return df0
else:
df0=df0[df0.DevID.isin(ids)]
return df0
@refreshData_decorator
def getYear(ids):
df0=df.reset_index(level="DevID").sort_index()
df0=df0[df0.DevID.isin(ids)]
return df0
@refreshData_decorator
def getDay(ids):
df0=df.reset_index(level="DevID").sort_index()
if ids!=[]:
df0=df0[df0.DevID.isin(ids)]
r=df0.last('1D')
return r
@refreshData_decorator
def getMinutes15(ids):
df0=df.reset_index(level="DevID").sort_index()
if ids!=[]:
df0=df0[df0.DevID.isin(ids)]
r=df0.last('16min')
return r
@refreshData_decorator
def getHour(ids):
df0=df.reset_index(level="DevID").sort_index()
if ids!=[]:
df0=df0[df0.DevID.isin(ids)]
r=df0.last('1H')
return r
@refreshData_decorator
def getInfo(ids):
df0=df.reset_index(level="DevID").sort_index()
if ids!=[]:
df0=df0[df0.DevID.isin(ids)]
total=len(df0.DevID.unique())
size=df0.shape[0]
g1=df0.groupby(lambda x:x.strftime("%H"))['DevData']
last1D=df0.last('1D')
last1H=df0.last('1H')
last1M=df0.last('30D')
#lastData=df0.last('1D').sort_index().tail(10)
deltaG=getDeltaG(ids)
deltaM=getDeltaM(ids,16)
deltaM=deltaM.set_index("start")
deltaMG=deltaM.groupby(lambda x:x.strftime('%Y-%m'))['DevID'].count()
return {'total':total,'size':size,'last1D':last1D,'last1H':last1H,'last1M':last1M,'deltaG':deltaG,'deltaMG':deltaMG,'g1':g1}
@refreshData_decorator
def getDeltaM1():
df0=df.reset_index(level="DevID").last('30D').reset_index().set_index(["DevID","RecDateTime"])
ids=df0.index.get_level_values(0).unique()
r=map(lambda x:getDeltaT(df0.loc[x],x),ids,16)
r=pd.concat(r)
r=r.iloc[(r.end-r.start).values.astype(np.int64)>1000000000000][["DevID","start","end"]]
return r
@refreshData_decorator
def getError(ids):
df0=df.reset_index(level="DevID").last('30D').reset_index()
if ids!=[]:
df0=df0[df0.DevID.isin(ids)]
dfError=df0.iloc[(df0.ErrorCode.astype(np.int8)!=0).values].sort_index(ascending=False)
dfAlarm=df0.iloc[(df0.DevAlarm.astype(np.int8)!=0).values].sort_index(ascending=False)
return {'dfError':dfError,'dfAlarm':dfAlarm}
def getDelta(d):
j1=d.index[:-1]
j2=d.index[1:]
jj=j2-j1
return jj
def getDeltaT(d,ID,minutes):
d=d.sort_index()
i=getDelta(d)
j=i.values.astype(np.int64)>60000000000*minutes
r=d.iloc[1:].iloc[j]
r['end']=r.index
r['DevID']=ID
r['start']=d.iloc[:-1].iloc[j].index
r=r.reset_index()
return r
def getDeltaM(ids,minutes):
ids0=df.index.get_level_values(0).unique()
if(ids!=[]):
ids=np.intersect1d(ids0,ids).astype(np.int16)
else:
ids=ids0
r=map(lambda x:getDeltaT(df.loc[x],x,minutes),ids)
r=pd.concat(r)
r=r.iloc[(r.end-r.start).values.astype(np.int64)>60000000000*minutes][["DevID","start","end"]]
return r
def getDeltaG(ids):
ids0=df.index.get_level_values(0).unique()
if(ids!=[]):
ids=np.intersect1d(ids0,ids).astype(np.int16)
else:
ids=ids0
r=map(lambda x:getDelta(df.loc[x]).value_counts(),ids)
df0=pd.concat(list(r))
nrange=np.array([1000000000000,900000000000*4])
g=df0.groupby(nrange.searchsorted(df0.index.values.astype(np.int64))).sum()
g=g.rename({0:'A',1:'B',2:'C'})
return g