前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ts 视频下载[通俗易懂]

ts 视频下载[通俗易懂]

作者头像
全栈程序员站长
发布2022-09-09 10:26:30
7680
发布2022-09-09 10:26:30
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

代码语言:javascript
复制
import urllib.request
import requests, os, threading
from Crypto.Cipher import AES
from src.Pacho.moviePa.tsdownload import aes_decode

class m3u8down(object):
    def __init__(self, url, listheaders, dicheaders):
        self.url = url  # 这里的url是index.m3u8地址
        self.headers = listheaders
        self.header = dicheaders
        self.ts_parts = []
        self.down_path = 'D:/workspace/download/Mp4'
        self.tsthreads = []
        self.key = None

    def aes_decode(self, data, key):
        """AES解密
        :param key:  密钥(16.32)一般16的倍数
        :param data:  要解密的数据
        :return:  处理好的数据
        """
        cryptor = AES.new(key, AES.MODE_CBC, key)
        plain_text = cryptor.decrypt(data)
        return plain_text.rstrip(b'\0')  # .decode("utf-8")

    def to_ts(self):
        requests.packages.urllib3.disable_warnings()
        content_all = requests.get(self.url, verify=False, timeout=200).text
        if "#EXTM3U" not in content_all:
            raise BaseException("非M3U8的链接")
        if "EXT-X-VERSION" in content_all:
            file_line = content_all.split("\n")
            # print(file_line)
            self.get_tsurls(self.url, file_line)

    def get_tsurls(self, m3u8url, lines):
        for index, line in enumerate(lines):  # m3u8文件中有ts,获取ts地址并添加索引
            if "EXTINF" in line:  # 找ts地址
                if "/" not in lines[index + 1]:  # 判断.ts是否是路径 'DjbgADY7468014.ts' or '/20181221/.../VRYKBY4319009.ts'
                    ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1]  # 拼出ts片段的URL
                else:
                    ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1].rsplit("/", 1)[-1]  # 拼出ts片段的URL
                self.ts_parts.append(ts_url)
            if "#EXT-X-KEY" in line:
                # #EXT-X-KEY:METHOD=AES-128,URI="encryption.key"
                key_url = m3u8url.rsplit("/", 1)[0] + "/" + line.split('"')[1]
                self.key = requests.get(url=key_url, timeout=120, headers=self.header).content  # 获取秘钥

    def load_ts(self, ts_url, files, count):
        if self.key:
            self.auto_keydown(ts_url, files, self.header, self.key)
        else:
            self.auto_down(ts_url, files, self.headers)
        print('第 %d/%d 个文件下载完成, 下载地址是%s' % (count, len(self.ts_parts), ts_url))
        count += 1


    def auto_down(self, url, filename, headers):  # 下载失败后,自调用从新下载
        try:
            opener = urllib.request.build_opener()  # 创建opener对象
            opener.addheaders = self.headers
            urllib.request.install_opener(opener)  # 将opener对象装入urllib.request
            urllib.request.urlretrieve(url, filename)
        except Exception as ex:
            # print(ex.args, url)
            return self.auto_down(url, filename, headers)

    def auto_keydown(self, url, filename, headers, key):  # 下载失败后,自调用从新下载
        try:
            response = requests.get(url=url, timeout=120, headers=headers)
            with open(filename, 'ab+') as f:
                data = aes_decode(response.content, key)
                f.write(data)
                f.close()
        except Exception as ex:
            # print(ex.args, url)
            return self.auto_down(url, filename, headers, key)

    def threads(self):
        for i in range(len(self.ts_parts)):
            files = self.down_path + '/' + 'tsm{:0>5}.ts'.format(i)
            if os.path.exists(files):  # 判断文件是已下载,且文件大小变为空。是则结束本次循环,继续循环
                sz = os.path.getsize(files)
                if not sz:
                    os.remove(files)  # 删除空文件
                    print("删除空字节视频文件", files.rsplit("/", 1)[-1])
                else:
                    continue
            t = threading.Thread(target=self.load_ts, args=(self.ts_parts[i], files, i))
            self.tsthreads.append(t)

def main():  # This is m3u8 url
    url = 'https://www.XXXXX.com/20200612/jDCLCWyb/1500kb/hls/index.m3u8'

    hd = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1941.0 Safari/537.36')]
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
        "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive",
        "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"} 

    m3u8 = m3u8down(url, hd, headers)
    m3u8.to_ts()
    m3u8.threads()

    for th in m3u8.tsthreads:
        th.start()

    for th in m3u8.tsthreads:
        th.join()

    print("{:-^20}".format("下载结束"))

if __name__ == '__main__':
    main()

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/161206.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档