avformat: rtsp: add functions to allow stored replies

This adds the ability to have a stored reply, needed for the
SET_PARAMETERS command feature to reliably report the reply even
while intermediate packets are read.
This commit is contained in:
Marvin Scholz
2025-10-10 17:53:23 +02:00
parent 5daa45fa01
commit b63f69fccf
2 changed files with 167 additions and 15 deletions

View File

@@ -801,6 +801,7 @@ void ff_rtsp_undo_setup(AVFormatContext *s, int send_packets)
RTSPState *rt = s->priv_data;
int i;
rt->stored_msg.expected_seq = -1;
for (i = 0; i < rt->nb_rtsp_streams; i++) {
RTSPStream *rtsp_st = rt->rtsp_streams[i];
if (!rtsp_st)
@@ -1222,9 +1223,11 @@ int ff_rtsp_skip_packet(AVFormatContext *s)
return 0;
}
int ff_rtsp_read_reply(AVFormatContext *s, RTSPMessageHeader *reply,
unsigned char **content_ptr,
int return_on_interleaved_data, const char *method)
static int ff_rtsp_read_reply_internal(AVFormatContext *s,
RTSPMessageHeader *reply,
unsigned char **content_ptr,
int return_on_interleaved_data,
const char *method)
{
RTSPState *rt = s->priv_data;
char buf[MAX_URL_SIZE], buf1[MAX_URL_SIZE], *q;
@@ -1233,18 +1236,6 @@ int ff_rtsp_read_reply(AVFormatContext *s, RTSPMessageHeader *reply,
int ret, content_length, line_count, request;
unsigned char *content;
// If we returned on pending packet last time,
// do not try to read again, as it would corrupt
// the state due to the already consumed '$'.
if (rt->pending_packet) {
if (return_on_interleaved_data)
return 1;
ret = ff_rtsp_skip_packet(s);
if (ret < 0)
return ret;
}
start:
line_count = 0;
request = 0;
@@ -1386,6 +1377,56 @@ start:
return 0;
}
int ff_rtsp_read_reply(AVFormatContext *s, RTSPMessageHeader *reply,
unsigned char **content_ptr,
int return_on_interleaved_data, const char *method)
{
int ret;
RTSPState *rt = s->priv_data;
// If we returned on pending packet last time,
// do not try to read again, as it would corrupt
// the state due to the already consumed '$'.
if (rt->pending_packet) {
if (return_on_interleaved_data)
return 1;
ret = ff_rtsp_skip_packet(s);
if (ret < 0)
return ret;
}
if (rt->stored_msg.expected_seq != -1) {
RTSPMessageHeader header;
ret = ff_rtsp_read_reply_internal(s, &header,
&rt->stored_msg.body, return_on_interleaved_data, NULL);
if (ret != 0)
return ret;
if (rt->stored_msg.expected_seq == header.seq) {
// Got the expected reply, store it for later
rt->stored_msg.expected_seq = -1;
rt->stored_msg.header = av_calloc(1, sizeof(*rt->stored_msg.header));
if (!rt->stored_msg.header) {
av_freep(&rt->stored_msg.body);
return AVERROR(ENOMEM);
}
memcpy(rt->stored_msg.header, &header, sizeof(header));
} else {
av_log(s, AV_LOG_WARNING, "Unexpected reply with seq %d, expected %d\n",
rt->stored_msg.header->seq, rt->stored_msg.expected_seq);
av_freep(&rt->stored_msg.body);
}
// Do not return here as we still need to read the reply
// the caller was actually wanting to retrieve.
}
return ff_rtsp_read_reply_internal(s, reply, content_ptr,
return_on_interleaved_data, method);
}
/**
* Send a command to the RTSP server without waiting for the reply.
*
@@ -1456,6 +1497,24 @@ int ff_rtsp_send_cmd_with_content_async(AVFormatContext *s,
return 0;
}
int ff_rtsp_send_cmd_with_content_async_stored(AVFormatContext *s,
const char *method, const char *url,
const char *headers,
const unsigned char *send_content,
int send_content_length)
{
RTSPState *rt = s->priv_data;
int ret = ff_rtsp_send_cmd_with_content_async(s, method, url, headers,
send_content, send_content_length);
if (ret < 0)
return ret;
rt->stored_msg.expected_seq = rt->seq;
av_freep(&rt->stored_msg.header);
av_freep(&rt->stored_msg.body);
return 0;
}
int ff_rtsp_send_cmd_async(AVFormatContext *s, const char *method,
const char *url, const char *headers)
{
@@ -1509,6 +1568,33 @@ retry:
return 0;
}
int ff_rtsp_read_reply_async_stored(AVFormatContext *s, RTSPMessageHeader **reply,
unsigned char **content_ptr)
{
RTSPState *rt = s->priv_data;
if (rt->stored_msg.header == NULL) {
if (rt->stored_msg.expected_seq == -1)
return AVERROR(EINVAL); // Reply to be stored was never requested
// Reply pending, tell caller to try again later
return AVERROR(EAGAIN);
}
if (reply)
*reply = rt->stored_msg.header;
else
av_free(rt->stored_msg.header);
if (content_ptr)
*content_ptr = rt->stored_msg.body;
else
av_free(rt->stored_msg.body);
rt->stored_msg.header = NULL;
rt->stored_msg.body = NULL;
return 0;
}
int ff_rtsp_make_setup_request(AVFormatContext *s, const char *host, int port,
int lower_transport, const char *real_challenge)
{
@@ -1796,6 +1882,7 @@ int ff_rtsp_connect(AVFormatContext *s)
struct sockaddr_storage peer;
socklen_t peer_len = sizeof(peer);
rt->stored_msg.expected_seq = -1;
if (rt->rtp_port_max < rt->rtp_port_min) {
av_log(s, AV_LOG_ERROR, "Invalid UDP port range, max port %d less "
"than min port %d\n", rt->rtp_port_max,

View File

@@ -286,6 +286,25 @@ typedef struct RTSPState {
/** The last reply of the server to a RTSP command */
char last_reply[2048]; /* XXX: allocate ? */
/**
* Stored message context
* This is used to store the last reply marked to be
* stored with ::ff_rtsp_send_cmd_with_content_async_stored
* as well as accompanying state to know when to store
* a reply and if a reply has been stored yet.
*/
struct {
/**
* Sequence number of the reply to be stored
* -1 if we are not waiting to store any message
*/
int expected_seq;
/** Last stored reply message from the RTSP server */
RTSPMessageHeader *header;
/** Last stored reply message body from the RTSP server */
unsigned char *body;
} stored_msg;
/** Indicates if a packet is pending to be read (useful for interleaved reads) */
int pending_packet;
@@ -530,6 +549,30 @@ int ff_rtsp_send_cmd_with_content_async(AVFormatContext *s,
const unsigned char *send_content,
int send_content_length);
/**
* Send a command to the RTSP server, storing the reply on future reads
*
* Sends a command to the server, without waiting for the reply and
* marking the request as awaiting a response, which will be stored
* when it is encountered during future read operations and should
* be retrieved with ::ff_rtsp_read_reply_async_stored.
*
* @param s RTSP (de)muxer context
* @param method the method for the request
* @param url the target url for the request
* @param headers extra header lines to include in the request
* @param send_content if non-null, the data to send as request body content
* @param send_content_length the length of the send_content data, or 0 if
* send_content is null
*
* @return zero if success, nonzero otherwise
*/
int ff_rtsp_send_cmd_with_content_async_stored(AVFormatContext *s,
const char *method, const char *url,
const char *headers,
const unsigned char *send_content,
int send_content_length);
/**
* Send a command to the RTSP server and wait for the reply.
*
@@ -590,6 +633,28 @@ int ff_rtsp_read_reply(AVFormatContext *s, RTSPMessageHeader *reply,
unsigned char **content_ptr,
int return_on_interleaved_data, const char *method);
/**
* Retrieve a previously stored RTSP reply message from the server.
*
* Retrieves a reply for a message sent with
* ::ff_rtsp_send_cmd_with_content_async_stored previously.
* If more than one message was received, this function will only
* return the last one and intermediate messages are discarded.
*
* Both reply and content must be ::av_free'd by the caller.
*
* @param s RTSP (de)muxer context
* @param reply Pointer where the RTSP message header will be stored
* @param content_ptr Pointer where the RTSP message body, if any, will
* be stored (length is in reply)
*
* @return 0 on success, AVERROR(EAGAIN) if no reply was received yet,
* other AVERROR for any other errors.
*/
int ff_rtsp_read_reply_async_stored(AVFormatContext *s, RTSPMessageHeader **reply,
unsigned char **content_ptr);
/**
* Skip a RTP/TCP interleaved packet.
*