前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【python】处理实时数据

【python】处理实时数据

作者头像
读书猿
发布2024-02-05 15:17:17
1880
发布2024-02-05 15:17:17
举报
文章被收录于专栏:无人驾驶感知

前言

  1. 实时语音识别、实时监控检测状态等等。你是不是在处理离线数据时得心应手,而在面对实时的数据处理的时候会不知所措?
  2. 时序在智能制造领域是个非常重要的指标,在线实时检测是优势与趋势。
  3. python如何处理在线数据?简单说下,利用queue、threading多线程处理。(实际情况要比这复杂多得多)
  4. 拿到数据后的处理要根据实际想情况而定,下面我会举个简单的例子说明。

一、实时数据输入

1.1、队列模拟实时数据

由于每个人在通讯过程中,实时获取的数据方式不同。下面构造数据模拟通讯数据实时输入。

代码语言:javascript
复制
import queue

list1 = [i for i in range(100 * 1000)]  # 0到10w 模拟实时输入的数据
q = queue.Queue()  # 构建一个队列
for data in list1:
    q.put(data)
    print(q.get())

1.2、获取实时数据类

代码语言:javascript
复制
import threading


class GetDataThread(threading.Thread):
    def __init__(self):
        super(GetDataThread, self).__init__()
        self.data = [i for i in range(100 * 1000)]

    def run(self):
        pass

二、实时数据处理

2.1、保存实时数据

保存数据是为了离线进行分析。这里把数据保存到数据库,用sqlite3第三方库。当然如果只是简单分析,可以直接写入csv文件

代码语言:javascript
复制
import queue
import sqlite3
import threading

# 连接数据库操作
conn = sqlite3.connect("datebase.db", check_same_thread=False)
cn = conn.cursor()


# 把获取到的数据存进数据库,写入数据 线程类WriteThread。
class WriteThread(threading.Thread):
    def __init__(self):
        super(WriteThread, self).__init__()
        self.queue = queue.Queue()  # 定义一个自身队列

    def run(self):
        while True:
            list_data = self.queue.get()  # 实时获取数据
            # 插入数据库操作。 如果只是简单分析,可以直接写入csv文件
            cn.execute("insert into tableone(time,data) values('{}', '{}')"
                       .format(list_data[0], list_data[1]))
            conn.commit()   # 提交

2.2、实时处理数据

代码语言:javascript
复制
import queue
import numpy as np
import threading


class DataOperationThread(threading.Thread):
    def __init__(self):
        super(DataOperationThread, self).__init__()
        self.queue = queue.Queue()
        self.data_list = []

    # 对实时获取到的数据进行处理
    def run(self):
        while True:
            data = self.queue.get()
            self.data_list.append(data)
            """
            下面就根据实际数据,实际情况进行处理,实际情况可能较为复杂。
            我这里举个简单例子。计算输出每1000条数据的平均值。
            """
            if len(self.data_list) > 1000:
                avg = np.average(self.data_list[:1000])
                print(avg)  # 输出每1000条数据平均值
                self.data_list = self.data_list[1000:]  # 删除计算过的数据,重置列表

三、完整代码

代码语言:javascript
复制
import time
import sqlite3
import queue
import threading
import numpy as np

# 连接数据库操作   数据库要提前构建完成
conn = sqlite3.connect("datebase.db", check_same_thread=False)
cn = conn.cursor()


class GetDataThread(threading.Thread):
    def __init__(self):
        super(GetDataThread, self).__init__()
        # 用csv数据模拟实时获取到的数据,实际情况是根据通讯或者其他方式获取到数据。
        self.data = [i for i in range(100 * 1000)]
        # 调用两个类的线程
        self.thread_one = WriteThread()
        self.thread_two = DataOperationThread()
        self.queue_one = self.thread_one.queue
        self.queue_two = self.thread_two.queue

    def run(self):
        # 开启线程
        self.thread_one.start()
        self.thread_two.start()

        # 把我们数据分别put到两个线程的队列里
        for rows in self.data:
            self.thread_two.queue.put(rows)  # 实时处理的数据
            data = [time.time(), rows]
            self.thread_one.queue.put(data)  # 保存到数据库里的数据 需要当前数据的时间信息
            time.sleep(0.001)  # !!!!注:这里模拟实际通讯延迟 实际过程中不需要这行代码


# 把获取到的数据存进数据库,写入数据线程类WriteThread。
class WriteThread(threading.Thread):
    def __init__(self):
        super(WriteThread, self).__init__()
        self.queue = queue.Queue()  # 定义自身队列

    def run(self):
        while True:
            list_data = self.queue.get()  # 实时获取数据
            print(list_data[0], list_data[1])
            # 插入数据库
            cn.execute("insert into tableone(time,data) values('{}', '{}')".format(list_data[0], list_data[1]))
            conn.commit()


class DataOperationThread(threading.Thread):
    def __init__(self):
        super(DataOperationThread, self).__init__()
        self.queue = queue.Queue()
        self.data_list = []

    # 对实时获取到的数据进行处理
    def run(self):
        while True:
            data = self.queue.get()
            self.data_list.append(data)
            """
            下面就根据实际数据,实际情况进行处理,实际情况可能较为复杂。
            我这里举个简单例子。计算输出每1000条数据的平均值。
            """
            if len(self.data_list) > 1000:
                avg = np.average(self.data_list[:1000])
                print(avg)  # 输出每1000条数据平均值
                self.data_list = self.data_list[1000:]  # 删除计算过的数据,重置列表


if __name__ == '__main__':
    GetDataThread().start()
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-11-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、实时数据输入
    • 1.1、队列模拟实时数据
      • 1.2、获取实时数据类
      • 二、实时数据处理
        • 2.1、保存实时数据
          • 2.2、实时处理数据
          • 三、完整代码
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档