Skip to content

Commit

Permalink
Improve Stream Management
Browse files Browse the repository at this point in the history
* Delay the notification of the library user that the connection was
  successful, until SM is reported by the server as enabled.
* Clear the SM queue in case resumption failed.
* Improve some debug statements & comments.

Signed-off-by: Steffen Jaeckel <[email protected]>
  • Loading branch information
sjaeckel committed Jan 29, 2024
1 parent bfd0872 commit 88a750f
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 85 deletions.
177 changes: 111 additions & 66 deletions src/auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ static void _auth(xmpp_conn_t *conn)
}
}

static void _auth_success(xmpp_conn_t *conn)
static void _stream_negotiation_success(xmpp_conn_t *conn)
{
tls_clear_password_cache(conn);
conn->stream_negotiation_completed = 1;
Expand Down Expand Up @@ -962,6 +962,7 @@ static int _do_bind(xmpp_conn_t *conn, xmpp_stanza_t *bind)
/* send bind request */
iq = xmpp_iq_new(conn->ctx, "set", "_xmpp_bind1");
if (!iq) {
xmpp_stanza_release(bind);
disconnect_mem_error(conn);
return 0;
}
Expand Down Expand Up @@ -1102,7 +1103,7 @@ static int
_handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
{
const char *type;
xmpp_stanza_t *iq, *enable, *session, *binding, *jid_stanza;
xmpp_stanza_t *iq, *session, *binding, *jid_stanza, *enable = NULL;

UNUSED(userdata);

Expand All @@ -1125,6 +1126,23 @@ _handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
}
}

/* send enable directly after the bind request */
if (conn->sm_state->sm_support && !conn->sm_disable) {
enable = xmpp_stanza_new(conn->ctx);
if (!enable) {
disconnect_mem_error(conn);
return 0;
}
xmpp_stanza_set_name(enable, "enable");
xmpp_stanza_set_ns(enable, XMPP_NS_SM);
if (!conn->sm_state->dont_request_resume)
xmpp_stanza_set_attribute(enable, "resume", "true");
handler_add(conn, _handle_sm, XMPP_NS_SM, NULL, NULL, NULL);
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
conn->sm_state->sm_sent_nr = 0;
conn->sm_state->sm_enabled = 1;
}

/* establish a session if required */
if (conn->session_required) {
/* setup response handlers */
Expand Down Expand Up @@ -1155,22 +1173,12 @@ _handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
send_stanza(conn, iq, XMPP_QUEUE_STROPHE);
}

if (conn->sm_state->sm_support && !conn->sm_disable) {
enable = xmpp_stanza_new(conn->ctx);
if (!enable) {
disconnect_mem_error(conn);
return 0;
}
xmpp_stanza_set_name(enable, "enable");
xmpp_stanza_set_ns(enable, XMPP_NS_SM);
if (!conn->sm_state->dont_request_resume)
xmpp_stanza_set_attribute(enable, "resume", "true");
handler_add(conn, _handle_sm, XMPP_NS_SM, NULL, NULL, NULL);
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
}

if (!conn->session_required) {
_auth_success(conn);
/* if there's no xmpp session required and we didn't try to enable
* stream-management, we're done here and the stream-negotiation was
* successful
*/
if (!conn->session_required && !enable) {
_stream_negotiation_success(conn);
}
} else {
strophe_error(conn->ctx, "xmpp", "Server sent malformed bind reply.");
Expand Down Expand Up @@ -1207,7 +1215,7 @@ _handle_session(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
} else if (type && strcmp(type, "result") == 0) {
strophe_debug(conn->ctx, "xmpp", "Session establishment successful.");

_auth_success(conn);
_stream_negotiation_success(conn);
} else {
strophe_error(conn->ctx, "xmpp",
"Server sent malformed session reply.");
Expand Down Expand Up @@ -1238,23 +1246,59 @@ static int _handle_missing_legacy(xmpp_conn_t *conn, void *userdata)
return 0;
}

static int _get_h_attribute(xmpp_stanza_t *stanza, unsigned long *ul_h)
{
const char *h = xmpp_stanza_get_attribute(stanza, "h");
if (!h || string_to_ul(h, ul_h)) {
strophe_error(
stanza->ctx, "xmpp",
"SM error: failed parsing 'h', \"%s\" got converted to %llu.",
STR_MAYBE_NULL(h), *ul_h);
return -1;
}
return 0;
}

static void _sm_queue_cleanup(xmpp_conn_t *conn, unsigned long ul_h)
{
xmpp_send_queue_t *e;
while ((e = peek_queue_front(&conn->sm_state->sm_queue))) {
if (e->sm_h > ul_h)
break;
e = pop_queue_front(&conn->sm_state->sm_queue);
strophe_free(conn->ctx, queue_element_free(conn->ctx, e));
}
}

static void _sm_queue_resend(xmpp_conn_t *conn)
{
xmpp_send_queue_t *e;
while ((e = pop_queue_front(&conn->sm_state->sm_queue))) {
/* Re-send what was already sent out and is still in the
* SM queue (i.e. it hasn't been ACK'ed by the server)
*/
strophe_debug_verbose(2, conn->ctx, "conn", "SM_Q_RESEND: %p, h=%lu", e,
e->sm_h);
send_raw(conn, e->data, e->len, e->owner, NULL);
strophe_free(conn->ctx, queue_element_free(conn->ctx, e));
}
}

static int _handle_sm(xmpp_conn_t *const conn,
xmpp_stanza_t *const stanza,
void *const userdata)
{
xmpp_stanza_t *failed_cause;
const char *name, *id, *previd, *resume, *h, *cause;
xmpp_send_queue_t *e;
xmpp_stanza_t *failed_cause, *bind = NULL;
const char *name, *id, *previd, *resume, *cause;
unsigned long ul_h = 0;

UNUSED(userdata);

name = xmpp_stanza_get_name(stanza);
if (!name)
goto LBL_ERR;
goto err_sm;

if (strcmp(name, "enabled") == 0) {
conn->sm_state->sm_enabled = 1;
conn->sm_state->sm_handled_nr = 0;
resume = xmpp_stanza_get_attribute(stanza, "resume");
if (resume && (strcasecmp(resume, "true") || strcmp(resume, "1"))) {
Expand All @@ -1264,28 +1308,29 @@ static int _handle_sm(xmpp_conn_t *const conn,
"SM error: server said it can resume, but "
"didn't provide an ID.");
name = NULL;
goto LBL_ERR;
goto err_sm;
}
conn->sm_state->can_resume = 1;
conn->sm_state->id = strophe_strdup(conn->ctx, id);
}
/* We maybe have stuff in the SM queue if we tried to resume, but the
* server doesn't remember all details of our session, but the `h` was
* still available.
*/
_sm_queue_resend(conn);
_stream_negotiation_success(conn);
} else if (strcmp(name, "resumed") == 0) {
previd = xmpp_stanza_get_attribute(stanza, "previd");
if (!previd || strcmp(previd, conn->sm_state->previd)) {
strophe_error(conn->ctx, "xmpp",
"SM error: previd didn't match, ours is \"%s\".",
conn->sm_state->previd);
name = NULL;
goto LBL_ERR;
goto err_sm;
}
h = xmpp_stanza_get_attribute(stanza, "h");
if (!h || string_to_ul(h, &ul_h)) {
strophe_error(conn->ctx, "xmpp",
"SM error: failed parsing 'h', it got converted "
"to %llu.",
ul_h);
if (_get_h_attribute(stanza, &ul_h)) {
name = NULL;
goto LBL_ERR;
goto err_sm;
}
conn->sm_state->sm_enabled = 1;
conn->sm_state->id = conn->sm_state->previd;
Expand All @@ -1296,54 +1341,53 @@ static int _handle_sm(xmpp_conn_t *const conn,
conn->sm_state->sm_sent_nr = conn->sm_state->sm_queue.head->sm_h;
else
conn->sm_state->sm_sent_nr = ul_h;
while ((e = pop_queue_front(&conn->sm_state->sm_queue))) {
if (e->sm_h >= ul_h) {
/* Re-send what was already sent out and is still in the
* SM queue (i.e. it hasn't been ACK'ed by the server)
*/
send_raw(conn, e->data, e->len, e->owner, NULL);
}
strophe_free(conn->ctx, queue_element_free(conn->ctx, e));
}
_sm_queue_cleanup(conn, ul_h);
_sm_queue_resend(conn);
strophe_debug(conn->ctx, "xmpp", "Session resumed successfully.");
_auth_success(conn);
_stream_negotiation_success(conn);
} else if (strcmp(name, "failed") == 0) {
name = NULL;
conn->sm_state->sm_enabled = 0;

failed_cause =
xmpp_stanza_get_child_by_ns(stanza, XMPP_NS_STANZAS_IETF);
if (!failed_cause)
goto LBL_ERR;
goto err_sm;

cause = xmpp_stanza_get_name(failed_cause);
if (!cause)
goto LBL_ERR;
goto err_sm;

if (!strcmp(cause, "item-not-found") ||
!strcmp(cause, "feature-not-implemented")) {
if (!strcmp(cause, "item-not-found")) {
if (conn->sm_state->resume) {
conn->sm_state->resume = 0;
conn->sm_state->can_resume = 0;
/* remember that the server reports having support
* for resumption, but actually it doesn't ...
/* It's no error if there's no `h` attribute included
* but if there is, it gives a hint at what the server
* already received.
*/
conn->sm_state->dont_request_resume =
!strcmp(cause, "feature-not-implemented");
strophe_free(conn->ctx, conn->sm_state->previd);
conn->sm_state->previd = NULL;
strophe_free(conn->ctx, conn->sm_state->bound_jid);
conn->sm_state->bound_jid = NULL;
_do_bind(conn, conn->sm_state->bind);
conn->sm_state->bind = NULL;
if (!_get_h_attribute(stanza, &ul_h)) {
/* In cases there's no `h` included, drop all elements. */
ul_h = (unsigned long)-1;
}
_sm_queue_cleanup(conn, ul_h);
}
} else if (!strcmp(cause, "feature-not-implemented")) {
conn->sm_state->resume = 0;
conn->sm_state->can_resume = 0;
/* remember that the server reports having support
* for resumption, but actually it doesn't ...
*/
conn->sm_state->dont_request_resume = 1;
}
conn->sm_state->sm_handled_nr = 0;
bind = conn->sm_state->bind;
conn->sm_state->bind = NULL;
reset_sm_state(conn->sm_state);
_do_bind(conn, bind);
} else {
/* unknown stanza received */
name = NULL;
}

LBL_ERR:
err_sm:
if (!name) {
char *err = "Couldn't convert stanza to text!";
char *buf;
Expand All @@ -1362,7 +1406,8 @@ static int _handle_sm(xmpp_conn_t *const conn,
buf);
if (buf != err)
strophe_free(conn->ctx, buf);
conn->sm_state->sm_enabled = 0;
/* Don't disable for <failure> cases, they're no hard errors */
conn->sm_state->sm_enabled = bind != NULL;
}
return 0;
}
Expand Down Expand Up @@ -1395,7 +1440,7 @@ _handle_legacy(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
/* auth succeeded */
strophe_debug(conn->ctx, "xmpp", "Legacy auth succeeded.");

_auth_success(conn);
_stream_negotiation_success(conn);
} else {
strophe_error(conn->ctx, "xmpp",
"Server sent us a legacy authentication "
Expand Down Expand Up @@ -1585,7 +1630,7 @@ int _handle_component_hs_response(xmpp_conn_t *conn,
xmpp_disconnect(conn);
return XMPP_EINT;
} else {
_auth_success(conn);
_stream_negotiation_success(conn);
}

/* We don't need this handler anymore, return 0 so it can be deleted
Expand All @@ -1607,8 +1652,8 @@ int _handle_missing_handshake(xmpp_conn_t *conn, void *userdata)
void auth_handle_open_raw(xmpp_conn_t *conn)
{
handler_reset_timed(conn, 0);
/* user handlers are not called before authentication is completed. */
_auth_success(conn);
/* user handlers are not called before stream negotiation has completed. */
_stream_negotiation_success(conn);
}

void auth_handle_open_stub(xmpp_conn_t *conn)
Expand Down
13 changes: 11 additions & 2 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ void strophe_log_internal(const xmpp_ctx_t *ctx,
const char *fmt,
va_list ap);

#if defined(__OpenBSD__)
#define STR_MAYBE_NULL(p) (p) ? (p) : "(null)"
#else
#define STR_MAYBE_NULL(p) (p)
#endif

/** connection **/

/* opaque connection object */
Expand Down Expand Up @@ -193,10 +199,10 @@ struct _xmpp_sm_t {
int sm_support;
int sm_enabled;
int can_resume, resume, dont_request_resume;
xmpp_queue_t sm_queue;
int r_sent;
uint32_t sm_handled_nr;
uint32_t sm_sent_nr;
xmpp_queue_t sm_queue;
char *id, *previd, *bound_jid;
xmpp_stanza_t *bind;
};
Expand Down Expand Up @@ -276,7 +282,8 @@ struct _xmpp_conn_t {
/* stream open handler */
xmpp_open_handler open_handler;

/* user handlers only get called after authentication */
/* user handlers only get called after the stream negotiation has completed
*/
int stream_negotiation_completed;

/* connection events handler */
Expand Down Expand Up @@ -341,6 +348,7 @@ void handler_add(xmpp_conn_t *conn,
void handler_system_delete_all(xmpp_conn_t *conn);

/* utility functions */
void reset_sm_state(xmpp_sm_state_t *sm_state);
void disconnect_mem_error(xmpp_conn_t *conn);

/* auth functions */
Expand All @@ -351,6 +359,7 @@ void auth_handle_open_stub(xmpp_conn_t *conn);

/* queue functions */
void add_queue_back(xmpp_queue_t *queue, xmpp_send_queue_t *item);
xmpp_send_queue_t *peek_queue_front(xmpp_queue_t *queue);
xmpp_send_queue_t *pop_queue_front(xmpp_queue_t *queue);
char *queue_element_free(xmpp_ctx_t *ctx, xmpp_send_queue_t *e);

Expand Down
Loading

0 comments on commit 88a750f

Please sign in to comment.