首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何注册以通知BLE外设中的更改?

如何注册以通知BLE外设中的更改?
EN

Stack Overflow用户
提问于 2021-09-28 09:31:28
回答 1查看 395关注 0票数 1

我有一个BLE外围设备(Arduino nano),具有单一的服务和单一的特点。该特性持有8位uint,当开关打开时设置为1,当开关关闭时设置为0。该特性支持读取和通知。

使用nRF连接,我可以看到通知部件正在工作,因为当开关状态发生变化时,值将被更新。

但我真正想做的是使用一个覆盆子Pi作为中心设备,使用Adafruit CircuitPython BLE。

按照Adafruit CircuitPython BLE存储库中的示例,我创建了下面的一个简单程序:

observe_ble_switch.py

代码语言:javascript
运行
复制
#!/usr/bin/env python3

import asyncio
import time
from switch_service import SwitchService

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import Advertisement

ble = BLERadio()

async def main():
    device_name = "My Arduino"
    device_found = False
    ble_device_connection = None

    print("Scanning for %r" % device_name)

    while not device_found:
        print("...")

        for adv in ble.start_scan(Advertisement, timeout=5):
            name = adv.complete_name
            if not name:
                continue

            if name.strip("\x00") == device_name:
                ble.stop_scan()
                device_found = True
                print("%r found!" % name)
                ble_device_connection = ble.connect(adv)
                break

    if ble_device_connection and ble_device_connection.connected:
        print("Connected to %r!" % name)

        if SwitchService in ble_device_connection:
            print("switch service available")
            switch_service = ble_device_connection[SwitchService]

            while ble_device_connection.connected:
                print("status %r" % switch_service.read_status())
                time.sleep(0.3)
        else:
            print("switch service not available")

if __name__ == "__main__":
    asyncio.run(main())

switch_service.py

代码语言:javascript
运行
复制
from adafruit_ble.uuid import VendorUUID
from adafruit_ble import Service
from adafruit_ble.characteristics.int import Uint8Characteristic

class SwitchService(Service):
    """
    """

    uuid = VendorUUID('8158b2fd-94e4-4ff5-a99d-9a7980e998d7')

    switch_characteristic = Uint8Characteristic(
        uuid=VendorUUID("8158b2fe-94e4-4ff5-a99d-9a7980e998d7")
    )

    def __init__(self, service=None):
        super().__init__(service=service)
        self.status = self.switch_characteristic

    def read_status(self):
        return self.status

我遇到的问题是,read_status()总是在程序第一次运行时返回BLE开关的状态。它不会收到关于BLE开关的后续状态变化的通知。我的想法是,我所缺少的是注册到BLE开关,以得到通知的变化。我正在努力寻找例子或参考来做到这一点。

谢谢。

EN

回答 1

Stack Overflow用户

发布于 2021-09-29 10:50:34

感谢ukBaz指出的荒凉。下面的代码使用的是Bleak,使用起来很棒。

代码语言:javascript
运行
复制
import asyncio

from bleak import BleakScanner
from bleak.backends.bluezdbus.client import BleakClientBlueZDBus

device_name = "My Arduino"
switch_status_char_uuid = "8158b2fe-94e4-4ff5-a99d-9a7980e998d7"


def notification_handler(sender, data):
    print("Switch is active: {}".format(bool(data[0])))


async def run():
    client = None
    external_heartbeat_received = False
    device = await BleakScanner.find_device_by_filter(
        lambda d, ad: d.name and d.name.lower() == device_name.lower()
    )

    if device is None:
        print("{} not found".format(device_name))
    else:
        print("{} found".format(device))

    client = BleakClientBlueZDBus(device)

    try:
        timer = 60  # seconds

        while timer != 0 or external_heartbeat_received:
            if not client.is_connected:
                if await client.connect():
                    print("Connected to {}".format(device_name))
                    await client.start_notify(switch_status_char_uuid, notification_handler)
            await asyncio.sleep(1)
            timer -= 1

            # If timer expired and we received a heartbeat, restart timer and carry on.
            if timer == 0:
                if external_heartbeat_received:
                    timer = 60
                    external_heartbeat_received = False
    except:
        print("Connected to {} failed".format(device_name))

    if client is not None and client.is_connected:
        await client.disconnect()
        print("Disconnected from {}".format(device_name))

loop = asyncio.get_event_loop()
loop.run_until_complete(run())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69359173

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档