首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Gstreamer,如何从(rtmp接收器)错误中恢复

Gstreamer,如何从(rtmp接收器)错误中恢复
EN

Stack Overflow用户
提问于 2015-01-12 15:39:58
回答 2查看 2.7K关注 0票数 2

我正在用gstreamer在python中构建流应用程序。

应用程序使用tee元素将数据写入rtmp接收器和文件链接。在理想的环境(本地网络)中,启动和流流工作良好,但如果与流服务器断开,怎么办?我试图弄清楚如何保持管道运行,从而在错误发生后继续写入文件墨水.

我想要存档的东西:

  1. 至少,在发生流部分(rtmp接收器)错误后,我希望保留我的归档文件(文件墨水)。因此,如果出现错误,我们将提供一些备份。
  2. 手动重新连接到流服务器。
  3. 如果可能的话,构建一些机制来检查连接和重新连接流部分(rtmp接收器)。

Question(s):

能把我想做的事存档吗?

如何存档(动态管道/探测/额外元素)?

任何解释,例子或指向正确的方向将是非常感谢的。

注:

Gst版本:GStreamer1.3.90(rtmp接收器,faac,x264enc)

操作系统: ubuntu 14.04 LTS

流服务器: wowza 4.x

测试应用程序(代码): 链接

启动后的管道(OK): 链接

Rtmp接收器错误(写入数据失败)后的管道: 链接

Rtmp接收器错误(写入数据失败)后的日志片段: 链接

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-01-13 14:56:05

我不知道你用一条管道就能得到多可靠的系统。我建议做的是创建一个两个阶段的过程:

代码语言:javascript
运行
复制
1) audio -> encode -> tee -> filesink
                          -> shmsink 

2) shmsrc -> mux -> rtmpsink

然后为第二个管道创建一个包装脚本。下面是一个示例,说明如何在视频these中使用这些元素。请注意,大写是非常重要的--它们必须足够详细,才能知道共享内存中出现了什么。

gst-发射-1.0视频测试!tee name=t!排队!视频转换!最接近!视频/x-生,width=400,height=400,format=BGRA!Shm接收器等待连接=假套接字-path=/tmp/shr gst-启动-1.0 shmsrc套接字-path=/tmp/shr!width=400,height=400,format=BGRA,framerate=30/1!视频转换!近链

您也可以使用TCP/UDP来尝试这种方法,而不是使用共享内存。我没有安装faac插件,但是管道可能是这样的:

音频-> faac -> rtpmp4apay -> udpsink host=localhost port=1919 port=1919 -> rtpmp4adepay -> mux -> rtmp接收器

票数 2
EN

Stack Overflow用户

发布于 2022-04-21 06:59:45

在错误发生后,我还一直在尝试获得一条管道,以便重新连接到RTMP服务器。原则上,我同意@mpr的答复 (使用与shm接收器/shmsrc对连接的两个管道),但是我无法使它可靠地工作,所以我最终使用了不同的策略。

我使用的是rtmp2sink,当遇到错误时,它将在管道总线上发布一条消息,然后返回GST_FLOW_FLUSHING,这将导致管道刷新所有内容。这不是我感兴趣的,所以我在GhostPad前面添加了一个rtmp2sink,它捕获返回值并将其转换回GST_FLOW_OK。此时,我还重置了rtmp2sink元素,使其重新连接。

这看起来相当可靠,至少对我使用的RTMP服务器来说,我不需要做任何特殊的事情来处理来自编码器的关键帧。

所有这些都用Gstreamer版本1.18.5进行了测试。下面是Python中的一个非常基本的示例,展示了这种方法:

代码语言:javascript
运行
复制
#!/usr/bin/env python3
import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

def _handle_message(_, message, loop):
    """handle messages posted to pipeline bus"""
    if message.type == Gst.MessageType.EOS:
        print("End-of-stream")
        loop.quit()
    elif message.type == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
            Gst.ResourceError.quark(), Gst.ResourceError(err.code)
        ):
            resource_error = Gst.ResourceError(err.code)
            print(f"caught {resource_error} from rtmp2sink, ignoring")
        else:
            print(f"caught error {err} ({debug}), exiting")
            loop.quit()
    return True

def _wrap_rtmp_sink(rtmpsink: Gst.Element):
    """wrap RTMP sink to make it handle reconnections"""

    def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
        internal_pad = pad.get_internal()
        result = internal_pad.push(buffer)
        if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
            print(f"Restarting RTMP sink after {result}")
            rtmpsink.set_state(Gst.State.NULL)
            rtmpsink.set_state(Gst.State.PLAYING)
            return Gst.FlowReturn.OK

        return result

    sinkpad = rtmpsink.get_static_pad("sink")
    peer = sinkpad.get_peer()
    peer.unlink(sinkpad)

    ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
    ghost_pad.set_chain_function_full(_chain)
    peer.link(ghost_pad)
    ghost_pad.activate_mode(Gst.PadMode.PUSH, True)

    # hang on to GhostPad to avoid Python garbage collecting it
    rtmpsink._ghost_pad = ghost_pad

def main():
    Gst.init(None)
    pipeline = Gst.parse_launch(
        f"""
        videotestsrc
            ! video/x-raw,width=1280,height=720,framerate=30/1
            ! avenc_h264_videotoolbox
            ! h264parse
            ! flvmux.video

        audiotestsrc
            ! faac
            ! flvmux.audio

        flvmux name=flvmux streamable=true
            ! queue
            ! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
    """
    )

    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", _handle_message, loop)

    _wrap_rtmp_sink(pipeline.get_by_name("rtmp"))

    pipeline.set_state(Gst.State.PLAYING)
    loop.run()
    pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    main()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27905606

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档