Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Added] natsConnection_Reconnect #757

Merged
merged 14 commits into from
May 6, 2024
22 changes: 22 additions & 0 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -3422,6 +3422,28 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConnection_Reconnect(natsConnection *nc)
{
if (nc == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
if (natsConn_isClosed(nc))
{
natsConn_Unlock(nc);
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

if (!nc->opts->allowReconnect)
return nats_setDefaultError(NATS_INVALID_ARG);

Check warning on line 3439 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L3439

Added line #L3439 was not covered by tests
levb marked this conversation as resolved.
Show resolved Hide resolved

natsSock_Close(nc->sockCtx.fd);

natsConn_Unlock(nc);
return NATS_OK;
}

// Test if connection has been closed.
bool
natsConnection_IsClosed(natsConnection *nc)
Expand Down
12 changes: 12 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg);
NATS_EXTERN natsStatus
natsConnection_Connect(natsConnection **nc, natsOptions *options);

/** \brief Causes the client to drop the connection to the current server and
* perform standard reconnection process.
*
* This means that all subscriptions and consumers should be resubscribed and
* their work resumed after successful reconnect where all reconnect options are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually not all reconnect options if we consider that "allow reconnect" is a reconnect option ;-). So I would argue that either the Go client should be changed, or the spec modified to specifically cover cases of libraries have an option to disable reconnection (maybe not all do that).

* respected.
*
* @param nc the pointer to the #natsConnection object.
*/
natsStatus
natsConnection_Reconnect(natsConnection *nc);

/** \brief Process a read event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ForcedReconnect
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
85 changes: 85 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20202,6 +20202,90 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
test_ForcedReconnect(void)
{
natsStatus s;
struct threadArg arg;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid pid = NATS_INVALID_PID;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("unable to setup test");

test("Start server, connect, subscribe: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

test("Forced reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Waiting for reconnect: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 5000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

natsConnection_Close(nc);
test("Reconect on a close connection errors: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_CONNECTION_CLOSED);

test("Reconect on a NULL connection errors: ");
s = natsConnection_Reconnect(NULL);
testCond(s == NATS_INVALID_ARG);

test("Reconect on errors if allowReconnect is not set: ");
natsMutex_Lock(nc->mu);
nc->opts->allowReconnect = false;
natsMutex_Unlock(nc->mu);
s = natsConnection_Reconnect(NULL);
testCond(s == NATS_INVALID_ARG);

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&arg);
}

static void
_stopServerInThread(void *closure)
{
Expand Down Expand Up @@ -36210,6 +36294,7 @@ static testInfo allTests[] =
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},
{"ForcedReconnect", test_ForcedReconnect},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down