Skip to content

Commit

Permalink
[Added] natsConnection_Reconnect (#757)
Browse files Browse the repository at this point in the history
* wip

* removed debug log

* removed whitespace

* removed debug log

* Added a test

* Oops forgot to free opts/threadargs from last edit

* Simplifed

* Added trivial cerror hecks to the test, for coverage

* PR feedback

* PR feedback: natsConnection_Reconnect to error unless allowReconnect is set

* PR feedback: oops, too fast

* Team feedback: ignore allowReconnect

* different status on Ubuntu vs MacOS

* Comment for clarity
  • Loading branch information
levb authored May 6, 2024
1 parent b06c1c9 commit ecf49bc
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -3422,6 +3422,25 @@ natsConnection_ConnectTo(natsConnection **newConn, const char *url)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConnection_Reconnect(natsConnection *nc)
{
if (nc == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
if (natsConn_isClosed(nc))
{
natsConn_Unlock(nc);
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

natsSock_Close(nc->sockCtx.fd);

natsConn_Unlock(nc);
return NATS_OK;
}

// Test if connection has been closed.
bool
natsConnection_IsClosed(natsConnection *nc)
Expand Down
12 changes: 12 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg);
NATS_EXTERN natsStatus
natsConnection_Connect(natsConnection **nc, natsOptions *options);

/** \brief Causes the client to drop the connection to the current server and
* perform standard reconnection process.
*
* This means that all subscriptions and consumers should be resubscribed and
* their work resumed after successful reconnect where all reconnect options are
* respected.
*
* @param nc the pointer to the #natsConnection object.
*/
natsStatus
natsConnection_Reconnect(natsConnection *nc);

/** \brief Process a read event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ForcedReconnect
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
85 changes: 85 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20202,6 +20202,90 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
test_ForcedReconnect(void)
{
natsStatus s;
struct threadArg arg;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid pid = NATS_INVALID_PID;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("unable to setup test");

test("Start server, connect, subscribe: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);

test("Send a message to foo: ");
IFOK(s, natsConnection_PublishString(nc, "foo", "bar"));
testCond(s == NATS_OK);

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

test("Forced reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Waiting for reconnect: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 5000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Send a message to foo: ");
IFOK(s, natsConnection_PublishString(nc, "foo", "bar"));
testCond(s == NATS_OK);

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

test("Reconnect again with allowReconnect false, the call succeeds: ");
natsMutex_Lock(nc->mu);
nc->opts->allowReconnect = false;
natsMutex_Unlock(nc->mu);
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

// On MacOS this returns NATS_CONNECTION_CLOSED, on Ubuntu we get a
// NATS_TIMEOUT.
test("But the connection is closed: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond(((s == NATS_CONNECTION_CLOSED) || (s = NATS_TIMEOUT)) && (msg == NULL));

natsConnection_Close(nc);
test("Reconect on a close connection errors: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_CONNECTION_CLOSED);

test("Reconect on a NULL connection errors: ");
s = natsConnection_Reconnect(NULL);
testCond(s == NATS_INVALID_ARG);

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&arg);
}

static void
_stopServerInThread(void *closure)
{
Expand Down Expand Up @@ -36210,6 +36294,7 @@ static testInfo allTests[] =
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},
{"ForcedReconnect", test_ForcedReconnect},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down

0 comments on commit ecf49bc

Please sign in to comment.