前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高质量编码--使用Pandas和Tornado构建高性能数据查询服务

高质量编码--使用Pandas和Tornado构建高性能数据查询服务

原创
作者头像
MiaoGIS
修改2019-07-15 11:05:54
1.4K0
修改2019-07-15 11:05:54
举报
文章被收录于专栏:Python in AI-IOTPython in AI-IOT

大数情况下,数据保存在数据库中,使用SQL来从数据库中查询数据,但相对于直接从内存中取数据前者显得比较慢和笨重。下面介绍基于csv文件目录存储数据,使用Tornado来作为Web服务器,使用Pandas来高性能查询数据。

效果如下:

看一下数据在CSV中的存储结构

tornado作为web服务器,index路由对应查询页面,devs路由对应取得所有传感器列表(每个传感器由设备ID和传感器类型唯一决定),data路由根据设备ID和传感器类型,以及日期范围来查询数据。当web服务启动时,同时将数据加载到全局变量保存在内存中。

代码语言:python
复制
#-*- coding:utf-8 -*-
import os
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define,options
import json
import datetime
from tools import *

define('port',default=8096,help='run on the given port',type=int)

class cross_originAllowed_Handler(tornado.web.RequestHandler):
    
    def initialize(self):
       
        self.set_header('Access-Control-Allow-Origin','*')
        self.set_header('Access-Control-Allow-Methods','POST,GET')
        self.set_header('Access-Control-Max-Age',1000)
        self.set_header('Access-Control-Allow-Headers','*')

class indexHandler(tornado.web.RequestHandler):
    def get(self):      
        self.render('index.html')   

class devsHandler(cross_originAllowed_Handler):
    def get(self):
        self.write({'result':devs.to_json()})  
              
class dataHandler(cross_originAllowed_Handler):
    def get(self):
        dev=self.get_argument('dev',None)
        startDate=self.get_argument('startDate',None)
        endDate=self.get_argument('endDate',None)
        if not (dev and startDate and endDate):
            self.write({'result':'{}'})
            return     
        data=getByDev2DateRange(int(dev),startDate,endDate)
        self.write({'result':data})

class Application(tornado.web.Application):
    def __init__(self, handlers, **settings):
        global df
        global devs
        df,devs=initDf()
        tornado.web.Application.__init__(self, handlers, **settings)    
        
if __name__=='__main__':
    tornado.options.parse_command_line()
    app=Application(
        handlers=[
                 
                (r'/',indexHandler),
                (r'/index',indexHandler),
                
                 (r'/devs',devsHandler),
                (r'/data',dataHandler),
                 (r"/assets/(.*)", tornado.web.StaticFileHandler, {"path": "static/assets"}),

                (r"/dist/(.*)", tornado.web.StaticFileHandler, {"path": "static/dist"}),
                (r"/css/(.*)", tornado.web.StaticFileHandler, {"path": "static/css"}),
                (r"/js/(.*)", tornado.web.StaticFileHandler, {"path": "static/js"}),
                 
                  ],

        template_path=os.path.join(os.path.curdir,'templates'),static_path=os.path.join(os.path.curdir,'static'),cookie_secret='miaojiyue',debug=True )
         
    http_server=tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()
    

使用pandas将数据加载到dataframe中如下:

下面看一下使用Pandas数据分析工具的具体实现

代码语言:python
复制
#-*-coding:utf-8 -*-
import os
import numpy as np
import pandas as pd
def initData():
    dataDir='D:\wc\data'
    csvs=os.listdir(dataDir)
    dfs=[]
    names=['ID', 'DevID', 'DevData', 'DevUnit', 'voltage',
       'DevAlarm', 'RecDateTime', 'StrInfo', 'ErrorCode',
       'dflag', 'AddTime']

    #遍历文件夹中所有csv文件,将数据拼合到一个dataframe中
    for csv in csvs:
        #由于csv中首行没有存储列名,指定数据对应的列名称
        df0=pd.read_csv(os.path.join(dataDir,csv),names=names)
        print(csv)
        print(df0)
        dfs.append(df0)
    df=pd.concat(dfs)
    df['devId']=df['DevID']
    df['devUnit']=df['DevUnit']
    #根据小数点分隔字符串,将时间格式化到整数秒,并将字符串转为时间格式。
    df.RecDateTime=df.RecDateTime.apply(lambda x:x[:x.find('.')])
    df.RecDateTime=pd.to_datetime(df.RecDateTime)
    df['recDateTime']=df['RecDateTime']
    #为dataframe指定设备ID和接收时间作为索引并根据索引排序
    df=df.set_index(['DevID','RecDateTime']).drop(columns=['ID']).sort_index()
    return df

def initDf():
    print('begin to read data from pkl!')
    global df
    df=pd.read_pickle('data.pkl')
    print('read complete!')
    global df2
    print('begin to read devs!')
    #devs=df.index.map(lambda x:x[0]).unique().to_list()
    #devs=np.load('data.npz')['devs'].tolist()
    #df2=pd.read_excel('data.xls',names=["devId","devName","devUnit"])
    df2=pd.read_pickle('data2.pkl')
    
    print('devs complete!')
    return df,df2

def getByDev2DateRange(dev,startDate,endDate):
    print(dev)
    print(startDate)
    print(endDate)
    if dev not in df2['devId']:
        return '{}'
    dates=pd.date_range(start=startDate, end=endDate)
    dates=dates.map(lambda x:x.strftime('%Y-%m-%d'))
    #使用传感器ID索引过滤数据
    d=df.loc[dev]
    #取得该传感器所有数据的时间列表
    dates2=d.index.map(lambda x:x.strftime('%Y-%m-%d'))
    #用是否包含布尔索引来过滤查询时间范围的数据,也可以使用dates.contains(date2)
    data=d[dates2.isin(dates)]
    #只返回'recDateTime','DevData'两列
    data=data.loc[:,['recDateTime','DevData']]
    data.recDateTime=data.recDateTime.apply(lambda x:x.strftime('%Y-%m-%d %H:%M:%S'))
    data=data.reset_index()
    data=data.drop(columns=['RecDateTime'])
    return data.to_json()

这里有两个dataframe,一个保存传感器基本信息(包含DevID和DevUnit),一个保存历史数据。其中初始化它们时有两种方式,一种是从csv文件中加载,一种是预先将从csv中加载的dataframe使用to_pickle保存到pkl文件中,然后从pkl文件直接加载,后者文件更小而且加载速度更快。

下文将介绍查询数据使用echarts展示的前端代码。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档