专栏首页python3Python简单试用MQTT服务器

Python简单试用MQTT服务器

前言

经历过各种问题的磨难终于基本搭建完成了自己的MQTT服务器,接下来我就赶紧写个Python程序测试下.

安装

这里采用paho.mqtt.python编写程序,详情参阅这里 打开powershell,执行pip install paho-mqtt安装模块

程序

# coding=utf-8
import json
import threading

import paho.mqtt.client as mqtt

# 当连接上服务器后回调此函数
import time

from my_lib.code_handle.code_handle import auto_code
from windows_info.read_info import Win_psutil


class MqttClient:
    client = mqtt.Client('tester')

    def __init__(self, host, port):
        self._host = host
        self._port = port
        self.client.on_connect = self._on_connect  # 设置连接上服务器回调函数
        self.client.on_message = self._on_message  # 设置接收到服务器消息回调函数

    def connect(self, username='tester', password='tester'):
        self.client.username_pw_set(username, password)
        self.client.connect(self._host, self._port, 60)  # 连接服务器,端口为1883,维持心跳为60秒

    def publish(self, topic, data):
        self.client.publish(topic, data)

    def loop(self, timeout=None):
        thread = threading.Thread(target=self._loop, args=(timeout,))
        # thread.setDaemon(True)
        thread.start()

    def _loop(self, timeout=None):
        if not timeout:
            self.client.loop_forever()
        else:
            self.client.loop(timeout)

    def _on_connect(self, client, userdata, flags, rc):
        print("\nConnected with result code " + str(rc))
        client.subscribe("test-0")

    def _on_message(self, client, userdata, msg):  # 从服务器接受到消息后回调此函数
        print "\n主题:" + auto_code(str(msg.topic)) + " 消息:" + auto_code(str(msg.payload))

    def _is_json(self, data):
        try:
            json.loads(data)
        except ValueError:
            return False
        return True

    def publish_loop(self):
        pass


if __name__ == '__main__':
	host=None
    client = MqttClient(host, 1883)
    client.connect('tester','tester')
    client.publish('test-0', '我上线啦!')
	client.loop()
    wp = Win_psutil()#自己定义的一个类
    while True:
        data_json=wp.auto_json()#方法返回一个包含CPU和进程信息的JSON字符串
        client.publish('test-0',data_json)
        time.sleep(2)

这里自己封装了类,主要功能是连上服务器订阅默认主题,接收到消息即打印出来. 在主程序中先实例化类,接着使用默认用户名与密码登陆,在主题"test-0上"发布信息,接着定时将打包成JSON信息的数据发布到"test-0"这个主题

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • #PY小贴士# 我的python开发环境如何搬到其他电脑上?

    昨天说到了 git,说到了在 git 中不应该上传代码以外的文件。那么就有人问了:

    Crossin先生
  • 数据分析篇 | Pandas 时间序列 - 日期时间索引

    DatetimeIndex 主要用作 Pandas 对象的索引。DatetimeIndex 类为时间序列做了很多优化:

    叫我龙总
  • Python数据分析之利用pymysql操作数据库

    mysql应该说是如今使用最为普遍的数据库了,没有之一,而Python作为最为流行的语言之一,自然少不了与mysql打交道,pymysql就是使用最多的工具库了...

    统计学家
  • 这42个Python小例子,太走心~ [看哭系列]

    除了简单地判断是否匹配之外,正则表达式还有提取子串的强大功能。用()表示的就是要提取的分组(group)。比如:^(\d{3})-(\d{3,8})$分别定义了...

    AI算法与图像处理
  • 简单的语音分类任务入门(需要些深度学习基础)

    上次公众号刚刚讲过使用 python 播放音频与录音的方法,接下来我将介绍一下简单的语音分类处理流程。简单主要是指,第一:数据量比较小,主要是考虑到数据量大,花...

    用户2870857
  • Python数据分析之pandas基本数据结构

    Python数据分析之numpy数组全解析 Python数据分析之Pandas读写外部数据文件

    统计学家
  • 必备技能,conda创建python虚拟环境,完美管理项目

    在尝试各种项目的时候,比较烦人的问题就是环境配置问题,然而更烦人的就是在你做一个个项目的时候,突然发现以前可以正常运行的代码挂了。

    AI算法与图像处理
  • Python数据分析之numpy数组全解析

    numpy是一个在Python中做科学计算的基础库,重在数值计算,也是大部分Python科学计算库的基础库,多用于大型、多维数据上执行数值计算。

    统计学家
  • 文言文不能编程乎?中国大四小哥哥曰:非也

    这就是最近被盛传的:文言文编程语言“wenyan-lang”。GitHub上线5天时间,已经突破了6000星。

    新智元
  • 200 道算法面试题集锦!Python 实现,含华为、BAT 等校招真题!

    春招临近,无论是要找工作的准毕业生,还是身在职场想要提升自己的程序员,提升自己的算法内功心法、提升 Python 编程能力,总是大有裨益的。今天,红色石头发现了...

    用户2769421

扫码关注云+社区

领取腾讯云代金券