feat(play): 添加流就绪状态检查和录像回放优化

- 新增 getStreamStatus API 用于获取流就绪状态
- 实现流状态轮询机制,等待录像回放流准备就绪
- 优化 JessibucaPlayer 连接顺序,先建立连接再创建 demuxer
- 添加流就绪状态指示界面,改善用户体验
- 支持通过 Call-ID 处理录像结束通知
- 调整流注册超时时间,录像回放模式支持更长等待时间
- 添加 FLV 流就绪检查功能,确保播放器正确加载
- 优化媒体状态通知处理,支持回放开始和结束事件
This commit is contained in:
2026-04-08 21:58:17 +08:00
parent 6987a6904e
commit 50b8356af8
10 changed files with 328 additions and 29 deletions

3
.gitignore vendored
View File

@@ -522,4 +522,5 @@ frontend/dist
CLAUDE.md
/.claude
/.omc
.omc
.planning

View File

@@ -38,6 +38,11 @@ export const playApi = {
return get<MediaInfoResponse>(`${BASE_URL}/media/${streamId}`)
},
/** 获取流就绪状态 */
getStreamStatus: (streamId: string) => {
return get<StreamStatusResponse>(`${BASE_URL}/stream_status/${streamId}`)
},
/** 查询历史录像 */
queryRecords: (params: RecordQueryParams) => {
return get<RecordListResponse>(`${RECORD_BASE_URL}/list`, params as unknown as Record<string, unknown>)
@@ -100,6 +105,16 @@ export interface MediaInfoResponse {
info?: unknown
}
/** 流状态 */
export interface StreamStatusResponse {
stream_id: string
ready: boolean
stream_active: boolean
flv_ready: boolean
mode: PlayMode
status: string
}
/** 录像查询参数 */
export interface RecordQueryParams {
device_id: string

View File

@@ -440,9 +440,14 @@ async function play(url: string) {
conn = new HttpConnection(url)
}
// 创建 demuxer使用 'avcc' 格式避免崩溃
// 先建立连接
await conn.connect()
console.log('[Jessibuca] 连接已建立')
// 连接成功后创建 demuxer使用 'avcc' 格式避免崩溃
const mode = props.mode === 'live' ? DemuxMode.PUSH : DemuxMode.PULL
demuxer = new FlvDemuxer(conn, mode, 'avcc')
console.log('[Jessibuca] Demuxer Created:', 'FlvDemuxer')
// 监听视频编码配置变化
demuxer.on(DemuxEvent.VIDEO_ENCODER_CONFIG_CHANGED, (vconfig: any) => {
@@ -504,8 +509,6 @@ async function play(url: string) {
}
}
await conn.connect()
// PULL模式处理
if (mode === DemuxMode.PULL) {
demuxer.videoReadable?.pipeTo(

View File

@@ -175,7 +175,7 @@
<div class="lg:tw-col-span-3 tw-bg-gray-900 tw-rounded-lg tw-shadow-lg tw-p-4 tw-border tw-border-gray-700">
<div class="tw-aspect-video tw-bg-black tw-rounded tw-overflow-hidden tw-relative">
<JessibucaPlayer
v-if="currentStream?.flv_url"
v-if="currentStream?.flv_url && streamReady"
ref="playerRef"
:url="currentStream.flv_url"
:mode="currentStream.mode"
@@ -186,7 +186,13 @@
@decoderchange="onDecoderChange"
class="tw-w-full tw-h-full"
/>
<div v-else class="tw-w-full tw-h-full tw-flex tw-items-center tw-justify-center tw-text-gray-400">
<div v-if="playing && !streamReady" class="tw-w-full tw-h-full tw-flex tw-items-center tw-justify-center tw-text-gray-400">
<div class="tw-text-center">
<el-icon size="48" class="tw-mb-4 tw-animate-pulse"><VideoPlay /></el-icon>
<p>正在等待设备推流...</p>
</div>
</div>
<div v-else-if="!currentStream?.flv_url" class="tw-w-full tw-h-full tw-flex tw-items-center tw-justify-center tw-text-gray-400">
<div class="tw-text-center">
<el-icon size="48" class="tw-mb-4 tw-animate-pulse"><VideoPlay /></el-icon>
<p>正在加载视频流...</p>
@@ -313,6 +319,7 @@ const playbackStartTime = ref('')
const playbackEndTime = ref('')
const playing = ref(false)
const queryingRecords = ref(false)
const streamReady = ref(false) // 录像回放模式下流是否就绪
const currentStream = ref<PlayResponse | null>(null)
const currentSession = ref<SessionResponse | null>(null)
const sessions = ref<SessionResponse[]>([])
@@ -477,6 +484,50 @@ async function playRecord(record: RecordItem) {
await startPlay()
}
// 录像回放模式下轮询流状态
let streamStatusPollTimer: ReturnType<typeof setInterval> | null = null
function stopStreamStatusPolling() {
if (streamStatusPollTimer) {
clearInterval(streamStatusPollTimer)
streamStatusPollTimer = null
}
}
async function pollStreamStatus(streamId: string): Promise<boolean> {
try {
const res = await playApi.getStreamStatus(streamId)
return res.data?.ready === true
} catch {
return false
}
}
function startStreamStatusPolling(streamId: string, maxWaitMs: number) {
stopStreamStatusPolling()
const startTime = Date.now()
const pollInterval = 2000 // 每2秒轮询
streamStatusPollTimer = setInterval(async () => {
const elapsed = Date.now() - startTime
if (elapsed >= maxWaitMs) {
stopStreamStatusPolling()
ElMessage.error('等待流就绪超时')
playing.value = false
streamReady.value = false
currentStream.value = null
return
}
const ready = await pollStreamStatus(streamId)
if (ready) {
stopStreamStatusPolling()
streamReady.value = true
ElMessage.success('流已就绪,开始播放')
}
}, pollInterval)
}
async function startPlay() {
if (!deviceId.value || !channelId.value) {
ElMessage.warning('请输入设备ID和通道ID')
@@ -493,6 +544,8 @@ async function startPlay() {
try {
playing.value = true
streamReady.value = playMode.value !== 'playback' // 实时模式直接就绪
const res = playMode.value === 'live'
? await playApi.start(deviceId.value, channelId.value)
: await playApi.startPlayback({
@@ -506,10 +559,18 @@ async function startPlay() {
currentStream.value = res.data
console.log('播放响应:', res.data)
await refreshSessions()
ElMessage.success('播放请求已发送')
// 录像回放模式:启动轮询,等待流就绪
if (playMode.value === 'playback') {
ElMessage.info('正在等待设备推流,请稍候...')
startStreamStatusPolling(res.data.stream_id, 180000) // 最多等待3分钟
} else {
ElMessage.success('播放请求已发送')
}
}
} catch (error) {
playing.value = false
streamReady.value = false
console.error('播放失败:', error)
ElMessage.error('播放失败: ' + (error instanceof Error ? error.message : String(error)))
}
@@ -540,6 +601,8 @@ async function stopPlay() {
ElMessage.error('停止失败')
} finally {
playing.value = false
streamReady.value = false
stopStreamStatusPolling()
currentStream.value = null
currentSession.value = null
await refreshSessions()
@@ -732,6 +795,8 @@ onMounted(() => {
sessions.value = sessions.value.filter(s => s.stream_id !== stoppedStreamId)
if (currentStream.value.stream_id === stoppedStreamId) {
playing.value = false
streamReady.value = false
stopStreamStatusPolling()
currentStream.value = null
currentSession.value = null
}
@@ -742,6 +807,8 @@ onMounted(() => {
onUnmounted(() => {
// 清理下载进度定时器
stopDownloadProgressPolling()
// 清理流状态轮询定时器
stopStreamStatusPolling()
// 取消 WebSocket 订阅
if (unsubscribeWs) {
unsubscribeWs()

View File

@@ -281,6 +281,72 @@ func (h *PlayHandler) ListSessions(c *gin.Context) {
})
}
// StreamStatusResponse 流状态响应
type StreamStatusResponse struct {
StreamId string `json:"stream_id"`
Ready bool `json:"ready"` // 流是否已就绪FLV 可播放)
StreamActive bool `json:"stream_active"` // RTP 流是否存在
FlvReady bool `json:"flv_ready"` // FLV/RTMP 流是否已生成
Mode string `json:"mode"`
Status string `json:"status"`
}
// GetStreamStatus 获取流就绪状态
// @Summary 获取流就绪状态
// @Tags 视频播放
// @Produce json
// @Param stream_id path string true "流 ID"
// @Success 200 {object} Response{data=StreamStatusResponse}
// @Router /api/play/stream_status/{stream_id} [get]
func (h *PlayHandler) GetStreamStatus(c *gin.Context) {
streamId := c.Param("stream_id")
if streamId == "" {
c.JSON(http.StatusBadRequest, Response{
Code: http.StatusBadRequest,
Message: "流 ID 不能为空",
})
return
}
if h.playService == nil {
c.JSON(http.StatusServiceUnavailable, Response{
Code: http.StatusServiceUnavailable,
Message: "播放服务不可用",
})
return
}
session, exists := h.playService.GetSession(streamId)
if !exists {
c.JSON(http.StatusNotFound, Response{
Code: http.StatusNotFound,
Message: "会话不存在",
})
return
}
streamActive := h.playService.IsStreamActive(streamId)
// 检查 FLV/RTMP 流是否已生成(前端播放需要)
flvReady := h.playService.IsFlvStreamReady(streamId)
// 流就绪条件:会话状态正常 + FLV 流已生成
ready := session.Status == service.PlayStatusPlaying && flvReady
c.JSON(http.StatusOK, Response{
Code: http.StatusOK,
Message: "success",
Data: StreamStatusResponse{
StreamId: streamId,
Ready: ready,
StreamActive: streamActive,
FlvReady: flvReady,
Mode: string(session.Mode),
Status: session.Status,
},
})
}
// GetMediaInfo 获取媒体信息
// @Summary 获取流媒体信息
// @Tags 视频播放

View File

@@ -99,6 +99,7 @@ func SetupRouterWithServices(catalogService *service.CatalogService, catalogSubs
play.POST("/stop", playHandler.Stop)
play.POST("/playback", playHandler.PlayBack)
play.GET("/sessions", playHandler.ListSessions)
play.GET("/stream_status/:stream_id", playHandler.GetStreamStatus)
play.GET("/media/:stream_id", playHandler.GetMediaInfo)
}

View File

@@ -326,6 +326,12 @@ func (s *PlayService) startPlay(deviceId, channelId string, mode PlayMode, range
// 启动流注册超时检测
go s.monitorStreamRegistration(streamId)
// 录像回放和下载模式:异步等待设备推流,不阻塞 HTTP 响应
// 前端需轮询流状态,确认就绪后再连接播放器
if mode == PlayModePlayback || mode == PlayModeDownload {
go s.waitForPlaybackStream(streamId)
}
// 统一记录会话已经进入待确认阶段。
// 这里不能把"返回播放地址"误判为 RTP 已经成功,
// 真正成功仍以后续的 on_publish / on_stream_changed / getMediaList 为准。
@@ -340,6 +346,33 @@ func (s *PlayService) startPlay(deviceId, channelId string, mode PlayMode, range
return s.buildPlayResult(session), nil
}
// waitForPlaybackStream 异步等待录像回放流就绪
// 设备收到 INVITE 后需要时间准备录像数据,等待 streamRegistered 信号后更新状态
func (s *PlayService) waitForPlaybackStream(streamId string) {
s.mu.RLock()
session, exists := s.sessions[streamId]
s.mu.RUnlock()
if !exists {
return
}
if session.Mode != PlayModePlayback && session.Mode != PlayModeDownload {
return
}
waitTimeout := 180 * time.Second
log.Info().Str("stream_id", streamId).Str("mode", string(session.Mode)).Dur("wait_timeout", waitTimeout).Msg("录像回放/下载模式,异步等待设备推流...")
select {
case <-session.streamRegistered:
log.Info().Str("stream_id", streamId).Str("mode", string(session.Mode)).Msg("设备推流已就绪(异步)")
case <-time.After(waitTimeout):
log.Warn().Str("stream_id", streamId).Str("mode", string(session.Mode)).Dur("timeout", waitTimeout).Msg("等待设备推流超时(异步)")
_ = s.Stop(streamId)
}
}
// isStreamActive 检查 ZLM 流是否真实存在
func (s *PlayService) isStreamActive(streamId string) bool {
if s.zlm == nil {
@@ -579,7 +612,8 @@ func (s *PlayService) getZlmReaderCount(streamId string) int64 {
}
// monitorStreamRegistration 监控流注册超时
// 如果 10 秒内未收到流注册通知,自动发送 BYE 清理会话
// 实时视频: 10秒内未收到流注册通知自动发送 BYE 清理会话
// 录像回放: 30秒超时设备准备时间较长
func (s *PlayService) monitorStreamRegistration(streamId string) {
s.mu.RLock()
session, exists := s.sessions[streamId]
@@ -589,11 +623,17 @@ func (s *PlayService) monitorStreamRegistration(streamId string) {
return
}
// 根据播放模式设置超时时间
timeout := 10 * time.Second
if session.Mode == PlayModePlayback || session.Mode == PlayModeDownload {
timeout = 180 * time.Second // 录像回放/下载需要更长时间准备
}
select {
case <-session.streamRegistered:
log.Info().Str("stream_id", streamId).Msg("流注册成功")
case <-time.After(10 * time.Second):
log.Warn().Str("stream_id", streamId).Msg("流注册超时,发送 BYE 清理会话")
log.Info().Str("stream_id", streamId).Str("mode", string(session.Mode)).Msg("流注册成功")
case <-time.After(timeout):
log.Warn().Str("stream_id", streamId).Str("mode", string(session.Mode)).Dur("timeout", timeout).Msg("流注册超时,发送 BYE 清理会话")
// 调用 Stop 清理会话,会发送 BYE、关闭 RTP Server、释放 SSRC
if err := s.Stop(streamId); err != nil {
log.Error().Err(err).Str("stream_id", streamId).Msg("清理超时会话失败")
@@ -869,7 +909,7 @@ func (s *PlayService) CheckStreamHealth(timeout time.Duration) {
// 流不存在,检查是否需要触发断流检测
// 如果会话刚开始10秒内可能是正常的推流流程跳过
sessionAge := now.Sub(item.startTime)
if sessionAge < 10*time.Second {
if sessionAge <= 10*time.Second {
continue
}
@@ -880,7 +920,7 @@ func (s *PlayService) CheckStreamHealth(timeout time.Duration) {
// 流不存在超过3秒触发断流检测流程作为 webhook 的兜底机制)
inactiveDuration := now.Sub(item.lastActiveTime)
reconnectTimeout := 3 * time.Second
reconnectTimeout := 30 * time.Second
if inactiveDuration > reconnectTimeout {
log.Warn().
Str("stream_id", item.streamId).
@@ -1169,6 +1209,33 @@ func (s *PlayService) ListSessions() []*PlaySession {
return result
}
// IsStreamActive 导出流活跃状态检查
func (s *PlayService) IsStreamActive(streamId string) bool {
return s.isStreamActive(streamId)
}
// IsFlvStreamReady 检查 FLV/RTMP 流是否已生成(前端播放需要)
func (s *PlayService) IsFlvStreamReady(streamId string) bool {
if s.zlm == nil {
return false
}
// 查询 ZLM 媒体列表,检查是否有 FLV/RTMP 流
mediaList, err := s.zlm.GetMediaList(s.config.AppName, streamId)
if err != nil || mediaList.Code != 0 {
return false
}
// 检查是否有 RTMP 或 FMP4 流FLV 播放需要)
for _, item := range mediaList.Data {
if item.Schema == "rtmp" || item.Schema == "fmp4" {
return true
}
}
return false
}
func (s *PlayService) generateStreamId(deviceId, channelId string, mode PlayMode, rangeStart, rangeEnd *time.Time) string {
if mode == PlayModePlayback && rangeStart != nil && rangeEnd != nil {
return fmt.Sprintf("%s_%s_%d_%d", deviceId, channelId, rangeStart.Unix(), rangeEnd.Unix())
@@ -1817,6 +1884,47 @@ func (s *PlayService) OnMediaStatusReceived(streamId string) error {
return nil
}
// OnMediaStatusReceivedByCallId 通过 Call-ID 处理录像结束
// 用于设备未发送 StreamId 的情况
func (s *PlayService) OnMediaStatusReceivedByCallId(callId string) error {
s.mu.Lock()
defer s.mu.Unlock()
for streamId, session := range s.sessions {
if session.CallID == callId {
// 关闭 RTP Server
_, _ = s.zlm.CloseRtpServer(streamId)
delete(s.sessions, streamId)
log.Info().Str("device_id", session.DeviceId).Str("stream_id", streamId).Str("call_id", callId).Msg("录像结束通过Call-ID已清理会话")
return nil
}
}
log.Warn().Str("call_id", callId).Msg("通过 Call-ID 未找到对应会话")
return nil
}
// CancelRegistrationTimeoutByCallId 通过 Call-ID 取消流注册超时检测
// 设备发送 MediaStatus(121) 表示回放开始,应取消超时
func (s *PlayService) CancelRegistrationTimeoutByCallId(callId string) {
s.mu.RLock()
defer s.mu.RUnlock()
for streamId, session := range s.sessions {
if session.CallID == callId && session.streamRegistered != nil {
select {
case session.streamRegistered <- true:
log.Info().Str("stream_id", streamId).Str("call_id", callId).Msg("收到回放开始通知,已取消流注册超时")
default:
log.Debug().Str("stream_id", streamId).Msg("流注册通知 channel 已满")
}
return
}
}
log.Debug().Str("call_id", callId).Msg("通过 Call-ID 未找到待注册的会话")
}
// ReconnectSession 手动触发会话重连(供 HTTP API 调用)
// 返回新的流 ID 和播放结果
func (s *PlayService) ReconnectSession(streamId string) (*PlayResult, error) {

View File

@@ -499,28 +499,51 @@ func (s *SIPServer) handleMediaStatusMessage(req *sip.Request, body []byte) {
return
}
// 获取 Call-ID 用于关联会话
callId := req.CallID()
callIdStr := ""
if callId != nil {
callIdStr = callId.Value()
}
log.Info().
Str("device_id", notify.DeviceID).
Str("sn", notify.SN).
Int("notify_type", int(notify.NotifyType)).
Str("stream_id", notify.StreamId).
Str("call_id", callIdStr).
Msg("收到媒体状态通知")
// 处理录像结束通知 (NotifyType=121)
if notify.NotifyType == manscdp.MediaStatusNotifyTypeRecordEnd {
log.Info().Str("stream_id", notify.StreamId).Msg("录像结束通知")
if s.playService == nil {
log.Warn().Msg("播放服务未初始化,无法处理 MediaStatus")
return
}
// 清理播放会话
if s.playService != nil {
err := s.playService.OnMediaStatusReceived(notify.StreamId)
if err != nil {
log.Error().Err(err).Str("stream_id", notify.StreamId).Msg("处理 MediaStatus 失败")
} else {
log.Debug().Str("stream_id", notify.StreamId).Msg("录像结束通知已处理")
}
} else {
log.Warn().Str("stream_id", notify.StreamId).Msg("播放服务未初始化,无法处理录像结束通知")
// 根据通知类型处理
switch notify.NotifyType {
case manscdp.MediaStatusNotifyTypePlaybackStart:
// 121: 录像回放开始 - 取消流注册超时检测
log.Info().Str("device_id", notify.DeviceID).Str("call_id", callIdStr).Msg("录像回放开始通知")
// 通过 DeviceID 或 Call-ID 查找会话并取消超时
if notify.StreamId != "" {
s.playService.NotifyStreamRegistered(notify.StreamId)
} else if callIdStr != "" {
// 设备未发送 StreamId通过 Call-ID 取消超时
s.playService.CancelRegistrationTimeoutByCallId(callIdStr)
}
case manscdp.MediaStatusNotifyTypePlaybackEnd:
// 122: 录像回放结束 - 清理会话
log.Info().Str("device_id", notify.DeviceID).Str("stream_id", notify.StreamId).Msg("录像回放结束通知")
if notify.StreamId != "" {
_ = s.playService.OnMediaStatusReceived(notify.StreamId)
} else if callIdStr != "" {
// 设备未发送 StreamId通过 Call-ID 清理
_ = s.playService.OnMediaStatusReceivedByCallId(callIdStr)
}
default:
log.Warn().Int("notify_type", int(notify.NotifyType)).Msg("未知的 MediaStatus 通知类型")
}
}

View File

@@ -99,5 +99,13 @@ func (h *HookHandler) OnPublish(c *gin.Context) {
Bool("auto_close", response.AutoClose).
Msg("[ZLM HOOK] 推流鉴权通过包含demand参数")
// 推流鉴权通过后更新流活跃时间,重置健康检查超时计时
// 鉴权通过表示设备正在推流,但不等于流已就绪
// 真正的流就绪由 on_stream_changed hook 或 MediaStatus(121) 通知
if h.playService != nil {
h.playService.UpdateStreamActiveTime(req.Stream)
log.Info().Str("stream", req.Stream).Msg("[ZLM HOOK] 推流鉴权通过,已更新流活跃时间")
}
c.JSON(200, response)
}

View File

@@ -10,7 +10,14 @@ import (
type MediaStatusNotifyType int
const (
MediaStatusNotifyTypeRecordEnd MediaStatusNotifyType = 121 // 录像结束
// MediaStatusNotifyTypePlaybackStart 录像文件回放开始
// 设备开始播放录像文件,表示媒体流即将就绪
MediaStatusNotifyTypePlaybackStart MediaStatusNotifyType = 121
// MediaStatusNotifyTypePlaybackEnd 录像文件回放结束
// 设备播放录像文件结束,表示媒体流已结束
MediaStatusNotifyTypePlaybackEnd MediaStatusNotifyType = 122
// 兼容旧常量名(已废弃,请使用 PlaybackEnd
MediaStatusNotifyTypeRecordEnd MediaStatusNotifyType = 122
)
// MediaStatusNotify 媒体状态通知
@@ -19,8 +26,8 @@ type MediaStatusNotify struct {
CmdType string `xml:"CmdType"`
SN string `xml:"SN"`
DeviceID string `xml:"DeviceID"`
NotifyType MediaStatusNotifyType `xml:"NotifyType"` // 121=录像结束
StreamId string `xml:"StreamId"` // 流ID
NotifyType MediaStatusNotifyType `xml:"NotifyType"` // 121=回放开始, 122=回放结束
StreamId string `xml:"StreamId,omitempty"` // 流ID(设备可能不发送)
}
// NewMediaStatusNotify 创建一个新的 MediaStatusNotify 实例