前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高质量编码-轨迹管理平台(模拟实时位置)

高质量编码-轨迹管理平台(模拟实时位置)

原创
作者头像
MiaoGIS
修改2020-12-09 17:39:17
6140
修改2020-12-09 17:39:17
举报
文章被收录于专栏:Python in AI-IOT

设备终端通过调用http api来实现定位信息的上传,为了实现平台演示,模拟多个设备向平台发送http请求上传最新位置信息。

使用随机位置,设备的轨迹肯定交叉杂乱,所以使用百度地图的公交线路json数据模拟车辆的位置更新,当到起点和终点时掉头。

也可以直接修改某设备对应redis中keepOnline_locator_X和recentCoordinate_locator_X来更新设备最新定位信息,因为web平台中各设备的在线状态和最新位置就是从它们获得。

python 代码如下:

代码语言:python
代码运行次数:0
复制
# -*- coding:utf-8 -*-
import requests
    
import random
import time
from datetime import datetime

from gpsMonitor_Model import *
import redis
import json
def main():
    def initTest():
        f=open('./lines12.js','rb')
        global lines
        lines=json.load(f)[0:50]
        f.close()
        for line in lines:
            line['direction']=1
            line['currentStop']=1

    def emulateRedis():           
        for line in lines[:10]:
            if line['direction']==1:
                if line['currentStop']<len(line['stops']):
                    line['currentStop']=line['currentStop']+1
                else:
                    line['direction']=-1
                    line['currentStop']=line['currentStop']-1
            else:
                if line['currentStop']>1:
                    line['currentStop']=line['currentStop']-1
                else:
                    line['direction']=1
                    line['currentStop']=line['currentStop']+1
                    
            
            currentStop=line['stops'][line['currentStop']-1]
            print line['name'],line['currentStop']
            myRedis.setex('keepOnline_locator_'+str(line['name']),str(line['name']),maxTimeout)
            myRedis.set('recentCoordinate_locator_'+str(line['name']),
                        '|'.join([str(currentStop['lat']),
                                  str(currentStop['lng']),
                                  datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S')]))
    
    def emulateHttpRequsts():
        while True:          
            time.sleep(0.05)
            operationFlag=random.randint(0,100)%9
        
            if operationFlag in [0,1,2,3,4]:
                TestAddCoordinate()

            elif operationFlag in [5,6]:
                None
                #TestAddLocacor()
            else:
                None
                #TestRemoveLocacor()
    def initConsts():
        minTimeout=60
        maxTimeout=60*2
        defaultCoordinate_x=34.730861
        defaultCoordinate_y=113.6101392
        defaultCoordinate_time=datetime(2016, 12, 18, 3, 4, 5)  
       
    def TestAddLocacor():
       
       coordinate=coordinates[random.randint(0,coordinates.count()-1)]
       response=requests.put('http://127.0.0.1:8000/locator',data={'locator_name':'miao','locator_fleed':'1'})
       locatorID=response.content.split()[1]
       response=requests.post('http://127.0.0.1:8000/locator',data={'locator_name':'locator'+str(locatorID),'locator_fleed':'1','itemId':locatorID})
    
       coordinate_locatorID=response.content.split()[-1].strip('>').split('=')[-1]
       requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':coordinate_locatorID,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})

       locator=Locator.get(locatorID)
        
       print 'add locator:'+str(locatorID)                        
    def TestRemoveLocacor():
        locators=Locator.select((Locator.q.locator_fleedID==1)&(Locator.q.id>11))
        if locators.count()>1:
            locator=locators[random.randint(0,locators.count()-1)]
            requests.delete('http://127.0.0.1:8000/locator',data={'itemId':locator.id})
            print 'remove locator'+str(locator.id)
        else:
            print 'remove locator canceled'
    def TestAddCoordinate():
        coordinate=coordinates[random.randint(0,coordinates.count()-1)]
        locators=Locator.select(Locator.q.locator_fleedID==1)
        locator=locators[random.randint(0,locators.count()-1)]
        requests.put('http://127.0.0.1:8000/coordinate',data={'coordinate_locator':locator.id,'coordinate_x':coordinate.coordinate_x,'coordinate_y':coordinate.coordinate_y})
        print 'add coordinate:'+str(coordinate.id)+' on:'+str(locator.id)
##    initConsts()
##    initDatabase()
##    coordinates=Coordinate.select()
##    emulateHttpRequsts()
    maxTimeout=60*5
    myRedis=redis.Redis()
    initTest()
    while True:
        time.sleep(3)
        emulateRedis()
if __name__=='__main__':
    main()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档