前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python简单试用MQTT服务器

Python简单试用MQTT服务器

作者头像
py3study
发布2020-01-08 11:02:30
5.1K0
发布2020-01-08 11:02:30
举报
文章被收录于专栏:python3python3

前言

经历过各种问题的磨难终于基本搭建完成了自己的MQTT服务器,接下来我就赶紧写个Python程序测试下.

安装

这里采用paho.mqtt.python编写程序,详情参阅这里 打开powershell,执行pip install paho-mqtt安装模块

程序

代码语言:javascript
复制
# coding=utf-8
import json
import threading

import paho.mqtt.client as mqtt

# 当连接上服务器后回调此函数
import time

from my_lib.code_handle.code_handle import auto_code
from windows_info.read_info import Win_psutil


class MqttClient:
    client = mqtt.Client('tester')

    def __init__(self, host, port):
        self._host = host
        self._port = port
        self.client.on_connect = self._on_connect  # 设置连接上服务器回调函数
        self.client.on_message = self._on_message  # 设置接收到服务器消息回调函数

    def connect(self, username='tester', password='tester'):
        self.client.username_pw_set(username, password)
        self.client.connect(self._host, self._port, 60)  # 连接服务器,端口为1883,维持心跳为60秒

    def publish(self, topic, data):
        self.client.publish(topic, data)

    def loop(self, timeout=None):
        thread = threading.Thread(target=self._loop, args=(timeout,))
        # thread.setDaemon(True)
        thread.start()

    def _loop(self, timeout=None):
        if not timeout:
            self.client.loop_forever()
        else:
            self.client.loop(timeout)

    def _on_connect(self, client, userdata, flags, rc):
        print("\nConnected with result code " + str(rc))
        client.subscribe("test-0")

    def _on_message(self, client, userdata, msg):  # 从服务器接受到消息后回调此函数
        print "\n主题:" + auto_code(str(msg.topic)) + " 消息:" + auto_code(str(msg.payload))

    def _is_json(self, data):
        try:
            json.loads(data)
        except ValueError:
            return False
        return True

    def publish_loop(self):
        pass


if __name__ == '__main__':
	host=None
    client = MqttClient(host, 1883)
    client.connect('tester','tester')
    client.publish('test-0', '我上线啦!')
	client.loop()
    wp = Win_psutil()#自己定义的一个类
    while True:
        data_json=wp.auto_json()#方法返回一个包含CPU和进程信息的JSON字符串
        client.publish('test-0',data_json)
        time.sleep(2)

这里自己封装了类,主要功能是连上服务器订阅默认主题,接收到消息即打印出来. 在主程序中先实例化类,接着使用默认用户名与密码登陆,在主题"test-0上"发布信息,接着定时将打包成JSON信息的数据发布到"test-0"这个主题

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 安装
  • 程序
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档