diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index ee3af45908..f547963847 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1188,6 +1188,45 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src); + +// Unchoke any filter graphs that are downstream of this node, to prevent it +// from getting stuck trying to push data to a full queue +static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) +{ + SchFilterGraph *fg; + SchDec *dec; + SchEnc *enc; + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + dec = &sch->dec[dst->idx]; + for (int i = 0; i < dec->nb_dst; i++) + unchoke_downstream(sch, &dec->dst[i]); + break; + case SCH_NODE_TYPE_ENC: + enc = &sch->enc[dst->idx]; + for (int i = 0; i < enc->nb_dst; i++) + unchoke_downstream(sch, &enc->dst[i]); + break; + case SCH_NODE_TYPE_MUX: + // muxers are never choked + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + } else { + // ensure that this filter graph is not stuck waiting for + // input from a different upstream demuxer + unchoke_for_stream(sch, fg->inputs[fg->best_input].src); + } + break; + default: + av_assert0(!"Invalid destination node type?"); + break; + } +} + static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) { while (1) { @@ -1195,7 +1234,13 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) // fed directly by a demuxer (i.e. not through a filtergraph) if (src.type == SCH_NODE_TYPE_DEMUX) { + SchDemux *demux = &sch->demux[src.idx]; sch->demux[src.idx].waiter.choked_next = 0; + if (demux->waiter.choked_next == 0) + return; // prevent infinite loop + demux->waiter.choked_next = 0; + for (int i = 0; i < demux->nb_streams; i++) + unchoke_downstream(sch, demux->streams[i].dst); return; }