diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c index b035ffe11d..eb33431c98 100644 --- a/fftools/thread_queue.c +++ b/fftools/thread_queue.c @@ -38,6 +38,7 @@ enum { }; struct ThreadQueue { + int choked; int *finished; unsigned int nb_streams; @@ -157,6 +158,9 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx, { unsigned int nb_finished = 0; + if (tq->choked) + return AVERROR(EAGAIN); + while (av_container_fifo_read(tq->fifo, data, 0) >= 0) { unsigned idx; int ret; @@ -230,6 +234,7 @@ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx) * next time the consumer thread tries to read this stream it will get * an EOF and recv-finished flag will be set */ tq->finished[stream_idx] |= FINISHED_SEND; + tq->choked = 0; pthread_cond_broadcast(&tq->cond); pthread_mutex_unlock(&tq->lock); @@ -249,3 +254,15 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx) pthread_mutex_unlock(&tq->lock); } + +void tq_choke(ThreadQueue *tq, int choked) +{ + pthread_mutex_lock(&tq->lock); + + int prev_choked = tq->choked; + tq->choked = choked; + if (choked != prev_choked) + pthread_cond_broadcast(&tq->cond); + + pthread_mutex_unlock(&tq->lock); +} diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h index cc01c8a2c9..ad7669f131 100644 --- a/fftools/thread_queue.h +++ b/fftools/thread_queue.h @@ -58,6 +58,15 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data); */ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx); +/** + * Prevent further reads from the thread queue until it is unchoked. Threads + * attempting to read from the queue will block, similar to when the queue is + * empty. + * + * @param choked 1 to choke, 0 to unchoke + */ +void tq_choke(ThreadQueue *tq, int choked); + /** * Read the next item from the queue. *