Skip to content

Commit

Permalink
rtspconnection: protect cancellable by a mutex
Browse files Browse the repository at this point in the history
It is entirely possible for the cancellable to be cancelled (and freed)
in gst_rtsp_connection_flush() while there may be an ongoing read/write
operation.

Nothing prevents gst_rtsp_connection_flush() from waiting for the
outstanding read/writes.

This could lead to a crash like (where cancellable has been freed
within gst_rtsp_connection_flush()):

 #0  0x00007ffff4351096 in g_output_stream_writev (stream=stream@entry=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6af950,  cancellable=cancellable@entry=0x7fff300288a0, error=error@entry=0x7ffe2c6af958) at ../subprojects/glib/gio/goutputstream.c:377
 Kurento#1  0x00007ffff44b2c38 in writev_bytes (stream=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6afb90, block=block@entry=1, cancellable=0x7fff300288a0) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:1320
 Kurento#2  0x00007ffff44b583e in gst_rtsp_connection_send_messages_usec (conn=0x7fff30001370, messages=messages@entry=0x7ffe2c6afcc0, n_messages=n_messages@entry=1, timeout=timeout@entry=3000000) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:2056
 #3  0x00007ffff44d2669 in gst_rtsp_client_sink_connection_send_messages (sink=0x7fffac0192c0, timeout=3000000, n_messages=1, messages=0x7ffe2c6afcc0, conninfo=0x7fffac019610) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:1929
 #4  gst_rtsp_client_sink_try_send (sink=sink@entry=0x7fffac0192c0, conninfo=conninfo@entry=0x7fffac019610, requests=requests@entry=0x7ffe2c6afcc0, n_requests=n_requests@entry=1, response=response@entry=0x0, code=code@entry=0x0) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:2845
 #5  0x00007ffff44d3077 in do_send_data (buffer=0x7fff38075c60, channel=<optimized out>, context=0x7fffac042640) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:3896
 #6  0x00007ffff4281cc6 in gst_rtsp_stream_transport_send_rtp (trans=trans@entry=0x7fff20061f80, buffer=<optimized out>) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream-transport.c:632
 #7  0x00007ffff4278e9b in push_data (stream=0x7fff40019bf0, is_rtp=<optimized out>, buffer_list=0x0, buffer=<optimized out>, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2586
 #8  check_transport_backlog (stream=0x7fff40019bf0, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2645
 #9  0x00007ffff42793b3 in send_tcp_message (idx=<optimized out>, stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2741
 #10 send_func (stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2776
 #11 0x00007ffff7d59fad in g_thread_proxy (data=0x7fffbc062920) at ../subprojects/glib/glib/gthread.c:827
 #12 0x00007ffff7a8ce2d in start_thread () from /lib64/libc.so.6
 #13 0x00007ffff7b12620 in clone3 () from /lib64/libc.so.6

Fix by adding a cancellable lock and returning an extra reference used
across all read/write operations.  gst_rtsp_connection_flush() can free
the in-use cancellable and it will no longer affect any in progress
read/write.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2816>
  • Loading branch information
ystreet authored and tp-m committed Jul 30, 2022
1 parent 3907c8d commit 91d0a48
Showing 1 changed file with 79 additions and 18 deletions.
97 changes: 79 additions & 18 deletions subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ struct _GstRTSPConnection
GMutex socket_use_mutex;
gboolean manual_http;
gboolean may_cancel;
GCancellable *cancellable;
GMutex cancellable_mutex;
GCancellable *cancellable; /* protected by cancellable_mutex */

gchar tunnelid[TUNNELID_LEN];
gboolean tunneled;
Expand Down Expand Up @@ -352,6 +353,20 @@ socket_client_event (GSocketClient * client, GSocketClientEvent event,
}
}

/* transfer full */
static GCancellable *
get_cancellable (GstRTSPConnection * conn)
{
GCancellable *cancellable = NULL;

g_mutex_lock (&conn->cancellable_mutex);
if (conn->cancellable)
cancellable = g_object_ref (conn->cancellable);
g_mutex_unlock (&conn->cancellable_mutex);

return cancellable;
}

/**
* gst_rtsp_connection_create:
* @url: a #GstRTSPUrl
Expand All @@ -377,6 +392,7 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)

newconn->may_cancel = TRUE;
newconn->cancellable = g_cancellable_new ();
g_mutex_init (&newconn->cancellable_mutex);
newconn->client = g_socket_client_new ();

if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
Expand Down Expand Up @@ -839,6 +855,7 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
gchar *connection_uri = NULL;
gchar *request_uri = NULL;
gchar *host = NULL;
GCancellable *cancellable;

url = conn->url;

Expand Down Expand Up @@ -896,18 +913,23 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,

connection_uri = get_tunneled_connection_uri_strdup (url, url_port);

cancellable = get_cancellable (conn);

/* connect to the host/port */
if (conn->proxy_host) {
connection = g_socket_client_connect_to_host (conn->client,
conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
conn->proxy_host, conn->proxy_port, cancellable, &error);
request_uri = g_strdup (connection_uri);
} else {
connection = g_socket_client_connect_to_uri (conn->client,
connection_uri, 0, conn->cancellable, &error);
connection_uri, 0, cancellable, &error);
request_uri =
g_strdup_printf ("%s%s%s", url->abspath,
url->query ? "?" : "", url->query ? url->query : "");
}

g_clear_object (&cancellable);

if (connection == NULL)
goto connect_failed;

Expand Down Expand Up @@ -1033,6 +1055,7 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
GstClockTime to;
guint16 url_port;
GstRTSPUrl *url;
GCancellable *cancellable;

g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
Expand All @@ -1052,18 +1075,23 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
connection_uri = gst_rtsp_url_get_request_uri (url);
}

cancellable = get_cancellable (conn);

if (conn->proxy_host) {
connection = g_socket_client_connect_to_host (conn->client,
conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
conn->proxy_host, conn->proxy_port, cancellable, &error);
request_uri = g_strdup (connection_uri);
} else {
connection = g_socket_client_connect_to_uri (conn->client,
connection_uri, url_port, conn->cancellable, &error);
connection_uri, url_port, cancellable, &error);

/* use the relative component of the uri for non-proxy connections */
request_uri = g_strdup_printf ("%s%s%s", url->abspath,
url->query ? "?" : "", url->query ? url->query : "");
}

g_clear_object (&cancellable);

if (connection == NULL)
goto connect_failed;

Expand Down Expand Up @@ -1289,6 +1317,8 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
/* ERRORS */
error:
{
g_object_unref (cancellable);

if (G_UNLIKELY (r == 0))
return GST_RTSP_EEOF;

Expand Down Expand Up @@ -1422,13 +1452,19 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
if (G_LIKELY (size > (guint) out)) {
gssize r;
gsize count = size - out;
GCancellable *cancellable;

cancellable = conn->may_cancel ? get_cancellable (conn) : NULL;

if (block)
r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
count, conn->may_cancel ? conn->cancellable : NULL, err);
count, cancellable, err);
else
r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
(conn->input_stream), (gchar *) & buffer[out], count,
conn->may_cancel ? conn->cancellable : NULL, err);
cancellable, err);

g_clear_object (&cancellable);

if (G_UNLIKELY (r < 0)) {
if (out == 0) {
Expand Down Expand Up @@ -1732,6 +1768,7 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
{
guint offset;
GstRTSPResult res;
GCancellable *cancellable;

g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
Expand All @@ -1741,9 +1778,10 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,

set_write_socket_timeout (conn, timeout);

cancellable = get_cancellable (conn);
res =
write_bytes (conn->output_stream, data, &offset, size, TRUE,
conn->cancellable);
write_bytes (conn->output_stream, data, &offset, size, TRUE, cancellable);
g_clear_object (&cancellable);

clear_write_socket_timeout (conn);

Expand Down Expand Up @@ -1937,6 +1975,7 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
guint n_vectors, n_memories;
gint i, j, k;
gsize bytes_to_write, bytes_written;
GCancellable *cancellable;

g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
Expand Down Expand Up @@ -2053,9 +2092,11 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
/* write request: this is synchronous */
set_write_socket_timeout (conn, timeout);

cancellable = get_cancellable (conn);
res =
writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
TRUE, conn->cancellable);
TRUE, cancellable);
g_clear_object (&cancellable);

clear_write_socket_timeout (conn);

Expand Down Expand Up @@ -2928,8 +2969,10 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)

res = gst_rtsp_connection_close (conn);

if (conn->cancellable)
g_object_unref (conn->cancellable);
g_mutex_lock (&conn->cancellable_mutex);
g_clear_object (&conn->cancellable);
g_mutex_unlock (&conn->cancellable_mutex);
g_mutex_clear (&conn->cancellable_mutex);
if (conn->client)
g_object_unref (conn->client);
if (conn->tls_database)
Expand Down Expand Up @@ -2975,6 +3018,7 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
GMainContext *ctx;
GSource *rs, *ws, *ts;
GIOCondition condition;
GCancellable *cancellable;

g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
Expand All @@ -2992,21 +3036,22 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
g_source_unref (ts);
}

cancellable = get_cancellable (conn);
if (events & GST_RTSP_EV_READ) {
rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
conn->cancellable);
cancellable);
g_source_set_dummy_callback (rs);
g_source_attach (rs, ctx);
g_source_unref (rs);
}

if (events & GST_RTSP_EV_WRITE) {
ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
conn->cancellable);
ws = g_socket_create_source (conn->write_socket, G_IO_OUT, cancellable);
g_source_set_dummy_callback (ws);
g_source_attach (ws, ctx);
g_source_unref (ws);
}
g_clear_object (&cancellable);

/* Returns after handling all pending events */
while (!g_main_context_iteration (ctx, TRUE));
Expand Down Expand Up @@ -3115,10 +3160,14 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

if (flush) {
g_cancellable_cancel (conn->cancellable);
GCancellable *cancellable = get_cancellable (conn);
g_cancellable_cancel (cancellable);
g_clear_object (&cancellable);
} else {
g_mutex_lock (&conn->cancellable_mutex);
g_object_unref (conn->cancellable);
conn->cancellable = g_cancellable_new ();
g_mutex_unlock (&conn->cancellable_mutex);
}

return GST_RTSP_OK;
Expand Down Expand Up @@ -3632,6 +3681,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
}

/* clean up some of the state of conn2 */
g_mutex_lock (&conn2->cancellable_mutex);
g_cancellable_cancel (conn2->cancellable);
conn2->write_socket = conn2->read_socket = NULL;
conn2->socket0 = NULL;
Expand All @@ -3642,6 +3692,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
conn2->control_stream = NULL;
g_object_unref (conn2->cancellable);
conn2->cancellable = NULL;
g_mutex_unlock (&conn2->cancellable_mutex);

/* We make socket0 the write socket and socket1 the read socket. */
conn->write_socket = conn->socket0;
Expand Down Expand Up @@ -4000,6 +4051,7 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
guint n_vectors, n_memories, n_ids, drop_messages;
gint i, j, l, n_mmap;
GstRTSPSerializedMessage *msg;
GCancellable *cancellable;

/* if this connection was already closed, stop now */
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
Expand Down Expand Up @@ -4125,11 +4177,15 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
}
}

cancellable = get_cancellable (watch->conn);

res =
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
&bytes_written, FALSE, watch->conn->cancellable);
&bytes_written, FALSE, cancellable);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);

g_clear_object (&cancellable);

/* First unmap all memories here, this simplifies the code below
* as we don't have to skip all memories that were already written
* before */
Expand Down Expand Up @@ -4505,6 +4561,7 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
{
GstRTSPResult res;
GMainContext *context = NULL;
GCancellable *cancellable;
gint i;

g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
Expand Down Expand Up @@ -4565,11 +4622,15 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
}
}

cancellable = get_cancellable (watch->conn);

res =
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
&bytes_written, FALSE, watch->conn->cancellable);
&bytes_written, FALSE, cancellable);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);

g_clear_object (&cancellable);

/* At this point we sent everything we could without blocking or
* error and updated the offsets inside the message accordingly */

Expand Down

0 comments on commit 91d0a48

Please sign in to comment.