智能字幕接入

最近更新时间:2025-09-24 11:39:42

我的收藏

智能字幕功能简介

智能字幕功能可以将离线视频文件或直播流中的语音信息进行实时语音识别,将其转换成字幕并进行多语言翻译,适用于直播实时字幕、会议字幕、视频转译出海等场景。该功能还支持配置热词库、术语库以增加识别和翻译的准确率。

技术优势

全平台支持:支持处理离线文件、直播流、互动音视频、会议字幕。直播实时同传字幕支持稳态、渐变模式,接入门槛低,无需播放端改造。
准确率高:大模型处理,支持热词、术语库,准确率行业领先。
语种丰富:支持上百种语种,支持多地方言,支持中英文夹杂等混合语种识别。
样式自定义:支持将字幕压制至视频,且字幕样式(字体、字号、颜色、背景、位置等)可自定义,支持页面自定义渲染。

免费体验

1. 打开 体验馆,进入智能字幕体验页,在右侧选择点播文件或直播流,选择原语言和字幕类型,单击开始处理。
2. 等待处理完成后即可查看结果。
说明:
体验馆功能较简单,仅用于体验基础效果,测试完整效果请使用 API 接入。


场景一:处理离线文件

方式一:控制台零代码发起任务

手动发起任务

登录媒体处理控制台,单击创建任务 > 快速创建离线处理任务
1. 指定输入文件
您可以选择腾讯云 COS 存储桶中的视频文件,或提供视频下载 URL。当前字幕生成及翻译功能暂不支持以 AWS S3 为输入文件来源。
2. 处理输入文件
选择创建编排,插入“智能字幕”节点。

您可以选择系统预设模板,或使用自定义参数。详细模板配置指引参考 智能字幕模板自定义热词库

系统预设模板如下表所示:
模板名称/ID
模板能力
中文源视频-生成中文字幕
100
识别源视频中的中文语音,生成中文字幕文件(WebVTT 格式)。
中文源视频-生成英文字幕
121
识别源视频中的中文语音,并翻译成英文,生成英文字幕文件。
中文源视频-生成中英文字幕
122
识别源视频中的中文语音,并翻译成英文,生成中英双语字幕文件。
英文源视频-生成英文字幕
200
识别源视频中的英文语音,生成英语字幕文件。
英文源视频-生成中文字幕
211
识别源视频中的英文语音,并翻译成中文,生成中文字幕文件。
英文源视频-生成中英文字幕
212
识别源视频中的英文语音,并翻译成中文,生成英中双语字幕文件。
3. 指定输出路径
指定输出文件的保存路径。
4. 发起任务
单击创建,发起任务。

通过编排自动触发任务

若您希望实现:在 COS 桶指定路径下上传了新视频文件,自动按照预设参数对新视频文件进行智能字幕处理。您可以:
1. 进入菜单离线编排,单击创建离线服务编排,任务配置选择智能字幕节点,并配置触发 Bucket、触发目录等参数。

2. 创建后,进入离线编排列表,找到刚创建好的编排,在启动处开启按钮即可。后续在触发目录下新增的视频文件,将自动按照该编排预设的流程和参数发起任务,并将处理后的视频文件保存到编排配置的输出路径中。
注意:
启用编排成功后,需要3-5分钟才会生效。


方式二:API 接口调用

调用 ProcessMedia API,通过指定模板 ID 发起任务。示例:
{
"InputInfo": {
"Type": "URL",
"UrlInputInfo": {
"Url": "https://test-1234567.cos.ap-guangzhou.myqcloud.com/video/test.mp4" // 替换成需要处理的视频 URL
}
},
"SmartSubtitlesTask": {
"Definition": 122 //122为预设中文源视频-生成中英文字幕模板ID,可替换为您的自定义智能字幕模板ID
},
"OutputStorage": {
"CosOutputStorage": {
"Bucket": "test-1234567",
"Region": "ap-guangzhou"
},
"Type": "COS"
},
"OutputDir": "/output/",
"Action": "ProcessMedia",
"Version": "2019-06-12"
}
如果您在控制台创建了编排,也可以调用 ProcessMedia API,通过指定服务编排 ID 发起任务。示例:
{
    "InputInfo": {
        "Type": "COS",
        "CosInputInfo": {
            "Bucket": "test-1234567",
            "Region": "ap-guangzhou",
            "Object": "/video/123.mp4"
        }
    },
    "ScheduleId": 12345, //替换为自定义编排ID,12345为填写示例,不具备实际意义
    "Action": "ProcessMedia",
    "Version": "2019-06-12"
}
说明:
如果有设置回调地址,回包参考文档 解析事件通知

将字幕压制到视频(可选能力)

需要调用 ProcessMedia API, 发起转码任务 并通过 SubtitleTemplate 字段指定字幕 vtt 文件。示例:
{
    "MediaProcessTask": {
        "TranscodeTaskSet": [
            {
                "Definition": 100040, //转码模板ID;需要替换为您需要的转码模板
                "OverrideParameter": { //覆盖参数;用于灵活覆盖转码模板中的部分参数
                    "SubtitleTemplate": { //字幕压制配置
                        "Path": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/1.vtt",
                        "StreamIndex": 2,
                        "FontType": "simkai.ttf",
                        "FontSize": "10px",
                        "FontColor": "0xFFFFFF",
                        "FontAlpha": 0.9
                    }
                }
            }
        ]
    },
    "InputInfo": { //输入信息
        "Type": "URL",
        "UrlInputInfo": {
            "Url": "https://test-1234567.cos.ap-nanjing.myqcloud.com/mps_autotest/subtitle/123.mkv"
        }
    },
    "OutputStorage": { //输出存储桶
        "Type": "COS",
        "CosOutputStorage": {
            "Bucket": "test-1234567",
            "Region": "ap-nanjing"
        }
    },
    "OutputDir": "/mps_autotest/output2/", //输出路径
    "Action": "ProcessMedia",
    "Version": "2019-06-12"
}

查询任务结果

控制台查询任务

进入控制台 离线任务管理,任务列表中会展示刚发起的任务。

子任务状态为“成功”时,单击查看结果,可以预览字幕样式。

生成的 VTT 字幕文件可以在编排管理 > COS Bucket > 输出 Bucket 中找到。

中文字幕样例:



中英字幕样例:




事件通知回调

在使用 ProcessMedia 发起媒体处理任务时,可以通过 TaskNotifyConfig 参数配置事件回调。当任务处理完成后,会通过配置的回调信息回调任务结果,您可以通过 ParseNotification 解析事件通知结果。

调用接口查询任务结果

调用 DescribeTaskDetail 接口,输入任务 ID (例如:24000022-WorkflowTask-b20a8exxxxxxx1tt110253、24000022-ScheduleTask-774f101xxxxxxx1tt110253)查询任务结果,示例如下:


场景二:处理直播流

在直播流中使用字幕及翻译目前有两种方案:云直播控制台开启字幕功能和通过 MPS 回调文本并压制到直播流。建议使用云直播控制台开启字幕功能,方案介绍如下文。

方式一:云直播控制台开启字幕功能

1. 配置直播字幕功能
1.1 开通 云直播媒体处理
2. 拉字幕流
拉取转码流(在对应的直播流 StreamName 后加上绑定了字幕模板的转码名称_转码模板名称生成转码流地址)时会输出字幕效果,拉流地址拼接规则详见 拼装播放 URL
说明:
目前字幕展示有两种形式:实时动态字幕和延时稳态字幕。实时动态字幕指实时直播中的字幕会根据语音内容逐字动态校正字幕内容,输出的字幕内容会实时变动;延时稳态字幕指系统会按设定的时间延时展示直播,输出的是整句字幕,观看体验更佳。

方式二:通过 MPS 回调文本

暂不支持使用 MPS 控制台发起直播流智能字幕任务,您可以通过 API 发起。且处理直播流,目前需要通过智能识别模板,通过配置语音识别或语音翻译功能实现字幕生成。
使用示例如下,详细 API 文档参考 对直播流发起处理,实时回调包体请参考文档 解析直播流处理结果
{
    "Url": "http://5000-wenzhen.liveplay.myqcloud.com/live/123.flv",
    "AiRecognitionTask": {
        "Definition": 10101 //10101为预设中文字幕模板ID,可替换为您的自定义智能识别模板ID
    },
    "OutputStorage": {
        "CosOutputStorage": {
            "Bucket": "test-1234567",
            "Region": "ap-guangzhou-2"
        },
        "Type": "COS"
    },
    "OutputDir": "/test/output/",
    "TaskNotifyConfig": {
        "NotifyType": "URL",
        "NotifyUrl": "http://xxxx.qq.com/callback/qtatest/?token=xxxxxx"
    },
    "Action": "ProcessLiveStream",
    "Version": "2019-06-12"
}

场景三:通过 WebSocket 处理私有音频流

在视频会议、双工语音等场景可以通过 WebSocket 协议将音频传给识别翻译服务,然后将结果通过 WebSocket 协议返回。支持单识别、识别并翻译、多路实时音频流同时识别并翻译、实时结果字幕支持稳态、渐变模式。协议参考 WebSocket 识别协议
代码示例:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import struct
import time
import os
import signal
import sys
import hashlib
import hmac
import random
from urllib.parse import urlencode, urlunsplit, quote
import websockets
import asyncio
import logging
import json

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AudioPacket:
def __init__(self, format=1, is_end=False, timestamp=0, audio_src_id="123456", ext_data=b'', data=b''):
self.format = format
self.is_end = is_end
self.timestamp = timestamp
self.audio_src_id = audio_src_id
self.ext_data = ext_data
self.data = data

def marshal(self):
"""Serialize audio packet to binary format"""
header = struct.pack(
'>BBQH',
self.format,
1 if self.is_end else 0,
self.timestamp,
len(self.audio_src_id)
)
audio_src_bytes = self.audio_src_id.encode('utf-8')
ext_len = struct.pack('>H', len(self.ext_data))
return header + audio_src_bytes + ext_len + self.ext_data + self.data

def sha256hex(s):
"""Calculate SHA256 hex digest"""
if isinstance(s, str):
s = s.encode('utf-8')
return hashlib.sha256(s).hexdigest()

def hmacsha256(s, key):
"""Calculate HMAC-SHA256"""
if isinstance(s, str):
s = s.encode('utf-8')
if isinstance(key, str):
key = key.encode('utf-8')
return hmac.new(key, s, hashlib.sha256).digest()

def generate_random_number(digits):
"""Generate random number with specified digits"""
low = 10 ** (digits - 1)
high = (10 ** digits) - 1
return random.randint(low, high)

def generate_url_v3(args):
"""Generate WebSocket URL with TC3-HMAC-SHA256 signature"""
query_params = {}
if args.dstLang:
query_params["transSrc"] = args.lang
query_params["transDst"] = args.dstLang
else:
query_params["asrDst"] = args.lang
query_params["fragmentNotify"] = "1" if args.frame else "0"
query_params["timeoutSec"] = str(args.timeout)
timestamp = int(time.time())
expire_timestamp = timestamp + 3600
query_params["timeStamp"] = str(timestamp)
query_params["expired"] = str(expire_timestamp)
query_params["secretId"] = args.secretId
query_params["nonce"] = str(generate_random_number(10))
# Sort keys and build canonical query string
sorted_keys = sorted(query_params.keys())
canonical_query = "&".join(
["{}={}".format(k, quote(query_params[k], safe=''))
for k in sorted_keys]
)
# Build canonical request
path = "/wss/v1/{}".format(args.appid)
http_method = "post"
canonical_uri = path
canonical_headers = "content-type:application/json; charset=utf-8\\nhost:{}\\n".format(args.addr)
signed_headers = "content-type;host"
canonical_request = "{}\\n{}\\n{}\\n{}\\n{}\\n".format(
http_method,
canonical_uri,
canonical_query,
canonical_headers,
signed_headers,
)
# Build string to sign
date = time.strftime("%Y-%m-%d", time.gmtime(timestamp))
credential_scope = "{}/mps/tc3_request".format(date)
hashed_canonical = sha256hex(canonical_request)
algorithm = "TC3-HMAC-SHA256"
string_to_sign = "{}\\n{}\\n{}\\n{}".format(
algorithm,
timestamp,
credential_scope,
hashed_canonical
)
# Calculate signature
secret_date = hmacsha256(date, "TC3" + args.secretKey)
secret_service = hmacsha256("mps", secret_date)
secret_signing = hmacsha256("tc3_request", secret_service)
signature = hmac.new(
secret_signing,
string_to_sign.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Add signature to query params
query_params["signature"] = signature
# Build final URL
scheme = "wss" if args.ssl else "ws"
url = urlunsplit((
scheme,
args.addr,
path,
urlencode(query_params),
""
))
return url

async def receive_messages(websocket, stop_event):
"""Handle incoming WebSocket messages"""
try:
while not stop_event.is_set():
message = await websocket.recv()
if isinstance(message, bytes):
try:
message = message.decode('utf-8')
except UnicodeDecodeError:
message = str(message)
logger.info("Received: %s", message)
except Exception as e:
logger.info("Connection closed: %s", e)

async def run_client():
parser = argparse.ArgumentParser()
parser.add_argument("--addr", default="mps.cloud.tencent.com", help="websocket service address")
parser.add_argument("--file", default="./wx_voice.pcm", help="pcm file path")
parser.add_argument("--appid", default="121313131", help="app id")
parser.add_argument("--lang", default="zh", help="language")
parser.add_argument("--dstLang", default="", help="destination language")
parser.add_argument("--frame", action="store_true", help="enable frame notify")
parser.add_argument("--secretId", default="123456", help="secret id")
parser.add_argument("--secretKey", default="123456", help="secret key")
parser.add_argument("--ssl", action="store_true", help="use SSL")
parser.add_argument("--timeout", type=int, default=10, help="timeout seconds")
parser.add_argument("--wait", type=int, default=700, help="wait seconds after end")
args = parser.parse_args()

url = generate_url_v3(args)
logger.info("Connecting to %s", url)

try:
# Python 3.6 compatible websockets connection
websocket = await websockets.connect(url, ping_timeout=5)

# Handle initial response
initial_msg = await websocket.recv()
try:
result = json.loads(initial_msg)
if result.get("Code", 0) != 0:
logger.error("Handshake failed: %s", result.get("Message", ""))
return
logger.info("TaskId %s handshake success", result.get("TaskId", ""))
except ValueError: # json.JSONDecodeError not available in 3.6
logger.error("Invalid initial message")
return

# Setup signal handler
loop = asyncio.get_event_loop()
stop_event = asyncio.Event()
loop.add_signal_handler(signal.SIGINT, stop_event.set)

# Start receiver
receiver_task = asyncio.ensure_future(receive_messages(websocket, stop_event))

# Audio processing
try:
with open(args.file, "rb") as fd:
PCM_DUR_MS = 40
pcm = bytearray(PCM_DUR_MS * 32)
pkt = AudioPacket(data=pcm)
is_end = False
wait_until = 0

while not stop_event.is_set():
if is_end:
if time.time() > wait_until:
logger.info("Finish")
break
await asyncio.sleep(0.1)
continue

# Read PCM data
n = fd.readinto(pkt.data)
if n < len(pkt.data):
pkt.is_end = True
is_end = True
wait_until = time.time() + args.wait

# Send audio packet
await websocket.send(pkt.marshal())
logger.info("Sent ts %d", pkt.timestamp)
pkt.timestamp += n // 32

await asyncio.sleep(PCM_DUR_MS / 1000)

except IOError: # FileNotFoundError not available in 3.6
logger.error("Open file error: %s", args.file)
return

# Cleanup
await asyncio.wait_for(receiver_task, timeout=1)
await websocket.close()

except Exception as e:
logger.error("Connection error: %s", e)
return

if __name__ == "__main__":
# Python 3.6 compatible asyncio runner
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run_client())
finally:
loop.close()