From fd1fd5850de4f3beb240422b67889bb776b26c77 Mon Sep 17 00:00:00 2001 From: Niklas Haas Date: Mon, 8 Sep 2025 19:11:29 +0200 Subject: [PATCH] fftools/ffmpeg_sched: unchoke upstream nodes on recv-closed filter inputs This allows upstream filters to observe EOF on their corresponding outputs and terminate early, which is particularly useful for decoders and demuxers that may then gracefully release their resources. Prevents "leaking" memory for previously used, but now unused, filter inputs. --- fftools/ffmpeg_sched.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index dbe337ca16..d08f4a061d 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1368,6 +1368,18 @@ static void schedule_update_locked(Scheduler *sch) } } + // also unchoke any sources feeding into closed filter graph inputs, so + // that they can observe the downstream EOF + for (unsigned i = 0; i < sch->nb_filters; i++) { + SchFilterGraph *fg = &sch->filters[i]; + + for (unsigned j = 0; j < fg->nb_inputs; j++) { + SchFilterIn *fi = &fg->inputs[j]; + if (fi->receive_finished && !fi->send_finished) + unchoke_for_stream(sch, fi->src); + } + } + // make sure to unchoke at least one source, if still available for (unsigned type = 0; !have_unchoked && type < 2; type++) for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { @@ -2478,6 +2490,8 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx) av_assert0(in_idx < fg->nb_inputs); fi = &fg->inputs[in_idx]; + pthread_mutex_lock(&sch->schedule_lock); + if (!fi->receive_finished) { fi->receive_finished = 1; tq_receive_finish(fg->queue, in_idx); @@ -2485,7 +2499,11 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx) // close the control stream when all actual inputs are done if (++fg->nb_inputs_finished_receive == fg->nb_inputs) tq_receive_finish(fg->queue, fg->nb_inputs); + + schedule_update_locked(sch); } + + pthread_mutex_unlock(&sch->schedule_lock); } int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)