我正在用gstreamer在python中构建流应用程序。
应用程序使用tee
元素将数据写入rtmp接收器和文件链接。在理想的环境(本地网络)中,启动和流流工作良好,但如果与流服务器断开,怎么办?我试图弄清楚如何保持管道运行,从而在错误发生后继续写入文件墨水.
我想要存档的东西:
Question(s):
能把我想做的事存档吗?
如何存档(动态管道/探测/额外元素)?
任何解释,例子或指向正确的方向将是非常感谢的。
注:
Gst版本:GStreamer1.3.90(rtmp接收器,faac,x264enc)
操作系统: ubuntu 14.04 LTS
流服务器: wowza 4.x
测试应用程序(代码): 链接
启动后的管道(OK): 链接
Rtmp接收器错误(写入数据失败)后的管道: 链接
Rtmp接收器错误(写入数据失败)后的日志片段: 链接
发布于 2015-01-13 14:56:05
我不知道你用一条管道就能得到多可靠的系统。我建议做的是创建一个两个阶段的过程:
1) audio -> encode -> tee -> filesink
-> shmsink
2) shmsrc -> mux -> rtmpsink
然后为第二个管道创建一个包装脚本。下面是一个示例,说明如何在视频these中使用这些元素。请注意,大写是非常重要的--它们必须足够详细,才能知道共享内存中出现了什么。
gst-发射-1.0视频测试!tee name=t!排队!视频转换!最接近!视频/x-生,width=400,height=400,format=BGRA!Shm接收器等待连接=假套接字-path=/tmp/shr gst-启动-1.0 shmsrc套接字-path=/tmp/shr!width=400,height=400,format=BGRA,framerate=30/1!视频转换!近链
您也可以使用TCP/UDP来尝试这种方法,而不是使用共享内存。我没有安装faac插件,但是管道可能是这样的:
音频-> faac -> rtpmp4apay -> udpsink host=localhost port=1919 port=1919 -> rtpmp4adepay -> mux -> rtmp接收器
发布于 2022-04-21 06:59:45
在错误发生后,我还一直在尝试获得一条管道,以便重新连接到RTMP服务器。原则上,我同意@mpr的答复 (使用与shm接收器/shmsrc对连接的两个管道),但是我无法使它可靠地工作,所以我最终使用了不同的策略。
我使用的是rtmp2sink
,当遇到错误时,它将在管道总线上发布一条消息,然后返回GST_FLOW_FLUSHING,这将导致管道刷新所有内容。这不是我感兴趣的,所以我在GhostPad前面添加了一个rtmp2sink
,它捕获返回值并将其转换回GST_FLOW_OK。此时,我还重置了rtmp2sink
元素,使其重新连接。
这看起来相当可靠,至少对我使用的RTMP服务器来说,我不需要做任何特殊的事情来处理来自编码器的关键帧。
所有这些都用Gstreamer版本1.18.5进行了测试。下面是Python中的一个非常基本的示例,展示了这种方法:
#!/usr/bin/env python3
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
def _handle_message(_, message, loop):
"""handle messages posted to pipeline bus"""
if message.type == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
Gst.ResourceError.quark(), Gst.ResourceError(err.code)
):
resource_error = Gst.ResourceError(err.code)
print(f"caught {resource_error} from rtmp2sink, ignoring")
else:
print(f"caught error {err} ({debug}), exiting")
loop.quit()
return True
def _wrap_rtmp_sink(rtmpsink: Gst.Element):
"""wrap RTMP sink to make it handle reconnections"""
def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
internal_pad = pad.get_internal()
result = internal_pad.push(buffer)
if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
print(f"Restarting RTMP sink after {result}")
rtmpsink.set_state(Gst.State.NULL)
rtmpsink.set_state(Gst.State.PLAYING)
return Gst.FlowReturn.OK
return result
sinkpad = rtmpsink.get_static_pad("sink")
peer = sinkpad.get_peer()
peer.unlink(sinkpad)
ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
ghost_pad.set_chain_function_full(_chain)
peer.link(ghost_pad)
ghost_pad.activate_mode(Gst.PadMode.PUSH, True)
# hang on to GhostPad to avoid Python garbage collecting it
rtmpsink._ghost_pad = ghost_pad
def main():
Gst.init(None)
pipeline = Gst.parse_launch(
f"""
videotestsrc
! video/x-raw,width=1280,height=720,framerate=30/1
! avenc_h264_videotoolbox
! h264parse
! flvmux.video
audiotestsrc
! faac
! flvmux.audio
flvmux name=flvmux streamable=true
! queue
! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
"""
)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", _handle_message, loop)
_wrap_rtmp_sink(pipeline.get_by_name("rtmp"))
pipeline.set_state(Gst.State.PLAYING)
loop.run()
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
https://stackoverflow.com/questions/27905606
复制相似问题