专栏首页python3三、wss连接B站弹幕

三、wss连接B站弹幕

环境

pip install ws4py
from ws4py.client.threadedclient import WebSocketClient

一、websocket协议

  1. 先建立连接 wss://broadcastlv.chat.bilibili.com/sub
  2. 发送登录包 { "uid": 0表示未登录,否则为用户ID, "roomid": 房间ID, "protover": 1, "platform": "web", "clientver": "1.4.0" }
  3. 每隔一段时间发送心跳包
  4. 接收响应 响应由头部和数据组成
  1. 解析响应得到数据
  2. 注册事件分发事件

二、工具层 utils.py

  1. 定时器类 可取消
  2. 事件类 注册事件 (可重复) 分发事件 取消事件
class Timer():
    def __init__(self,delay,fun):
        self.delay,self.f=delay,fun
        self.t=threading.Timer(self.delay,self.fun)
        self.t.start()
    def fun(self):
        if self.f:self.f()
        self.t=threading.Timer(self.delay,self.fun)
        self.t.start()
    def cancel(self):
        self.t.cancel()
        print("threading cancel")

class Event():
    def __init__(self):
        self.map=[]
        self.keys=[]
    def index(self,k):
        i=-1
        for key in self.keys:
            i+=1
            if key==k:return i
        return -1
    def on(self,key,fun):
        i=self.index(key)
        if i==-1:
            self.map.append({"key":key,"funs":[fun]})
            self.keys.append(key)
        else:
            self.map[i]["funs"].append(fun)
    def emit(self,key,data=None):
        i=self.index(key)
        if i==-1:
            print("no regist event:"+str(key))
            return
        for f in self.map[i]["funs"]:f(data)
    def rm(self,key,fun):
        i=self.index(key)
        if i==-1:
            print("no regist event:"+str(key))
            return
        funs=self.map[i]["funs"]
        for j in range(len(funs)):
            if funs[j]==fun:funs[j]=None
        self.map[i]["funs"]=list(filter(None,funs))

三、服务层 DanmuWS.py

import threading
import json
import struct
from ws4py.client.threadedclient import WebSocketClient
from utils import Event,Timer
event=Event()
class DanmuWebSocket(WebSocketClient):
    def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'):
        self.serveraddress=serveraddress
        WebSocketClient.__init__(self,serveraddress)
        DanmuWebSocket.event=event
        DanmuWebSocket.headerLength=16
        self.Info=info
    def opened(self):
        self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver'])
        self.sendHeartBeatPacket();
        self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket)
        print("opened")
    def delay_close(self):
        dws=DanmuWebSocket(self.Info,self.serveraddress)
        event.emit('reconnect',dws);
    def closed(self, code, reason=None):
        print("Closed", code, reason)
        if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel();
        if code == 1000: return
        threading.Timer(5,self.delay_close).start()
        print("Closed", code, reason)
    def received_message(self, message):
        position,length=0,len(message.data)-1
        while position<length:
            header_pack=struct.unpack(">IHHII",message.data[position:position+16])
            length_pack=header_pack[0]
            operation=header_pack[3]
            if operation==3:
                num=header_pack[1]+position
                num=struct.unpack(">I",message.data[num:num+4])[0]
                event.emit('heartbeat',num)
            elif operation==5:
                data=json.loads(message.data[position+16:position+length_pack])
                event.emit('cmd',data)
                #print("recv:"+data["cmd"])
            else:
                event.emit('login');
            position+=length_pack
    def sendData(self,data, protover = 1, operation = 2, sequence = 1):
        if type(data)==dict:
            data=json.dumps(data).encode()
        elif type(data)==str:
            data=data.encode()
        header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence)
        self.send(header+data)
    def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'):
        # Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登录数据包
        data = {
            'uid': int(uid),
            'roomid': int(roomid),
            'protover': protover,
            'platform': platform,
            'clientver': clientver
        }
        print("sendLoginPacket")
        data=json.dumps(data)
        data=data.replace(' ','')
        self.sendData(data.encode(),1,7,1)
    def sendHeartBeatPacket(self):
        # Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳数据包
        self.sendData(b'[object Object]', 1, 2, 1);
    def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None):
        if "cmd" in event.keys:return
        if hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect)
        if hasattr(onlogin,"__call__"):event.on("login",onlogin)
        if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat)
        if hasattr(oncmd,"__call__"):event.on("cmd",oncmd)
        if hasattr(onreceive,"__call__"):event.on("receive",onreceive)

四、测试代码

from server import Login
headers={
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0',
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
    'Accept-Encoding': 'gzip, deflate, br',
    'Referer': 'https://live.bilibili.com/',
    'Origin': 'https://live.bilibili.com',
    'Connection': 'keep-alive'
    }
s=session(headers,'cookie.txt')
login=Login(s)
while not login.isLogin():
    login.get_vdcode()
    login.loop_vdcode()
info={
  "uid": login.info['uid'],
  "roomid": 7603080,
  "protover": 1,
  "platform": "web",
  "clientver": "1.4.0"
}
from DanmuWS import DanmuWebSocket
def oncmd(data):
    cmd=data["cmd"]
    if cmd=="SYS_MSG":
        print(data)
    elif cmd=="SPECIAL_GIFT":
        print(data)
    else:
        print(data)
def onlogin(data):
    print("login success")
def onreconnect(dws):
    global ws
    ws=dws
def onheartbeat(num):
    print(num)
try:
    ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub')
    ws.connect()
    ws.bind(None,onlogin,onheartbeat,oncmd)
    ws.run_forever()
except:
    ws.close()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python写的俄罗斯方块

    简单瞅了下Tkinter,和Canvas配合在一起,还算是简洁的界面开发API。threading.Thread创建新的线程,其多线程机制也算是方便。

    py3study
  • python入门之tkinter实现简

    这里class MyHtmlParser继承了HTMLParser,根据这个模板你可以添加你自己需要从网页上获取的内容了,假设你的网页中有标签p,试一试新增一个...

    py3study
  • PyQt5--QLineEdit

    py3study
  • PKW: xadmin 搭建 + wxPython 聊天小程序(第一期)

    这是 Python Knowledge Weekly(PKW)第一期,之所以做这个东西,主要还是为了激励自己,每周都能有学习输入,同时再把知识点做一个总结输出。...

    周萝卜
  • python实现队列

    队列是一种先进先出的数据类型,它的跟踪原理类似于在超市收银处排队,队列里的的第一个人首先接受服务,新的元素通过入队的方式添加到队列的末尾,而出队就是将队列的头元...

    一墨编程学习
  • pyqt5实现浏览器与下载文件弹框

    本文由腾讯云+社区自动同步,原文地址 https://stackoverflow.club/article/pyqt5_webbrowser_download_...

    羽翰尘
  • PyQt5 动画类--跳舞的火柴人

    PyQt5.QtCore中的 QPropertyAnimation可以实现动画功能。

    用户6021899
  • 商业篇 | 使用python开发性格分析工具卖钱

    如此不均衡的贫富差距,各行业的领导者如何能管理好公司,让员工们即努力产出,又能安于现状呢?每个领导者必学的一门课程就是职场心理学。只有你充分了解员工心理与对应的...

    叫我龙总
  • 手把手教你使用Python打造一个智能搜索淘宝商品,生成操作日志的系统

    随着网购的兴起,使得很多传统店铺转型做线上生意,电子商务的产生极大便利了我们的生活。

    Python进阶者
  • unslider源码分析

    根据Bootstrap中文网的介绍,Unslider一个超小的 jQuery轮播(slider)插件,参照这个汉化版的介绍页面,这个插件有你需要的优点,但是本...

    用户3579639

扫码关注云+社区

领取腾讯云代金券