Skip to content

Commit

Permalink
Implement Reconnect (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 3, 2024
1 parent 06c5665 commit 0633fb9
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 27 deletions.
20 changes: 20 additions & 0 deletions src/NATS.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,26 @@ private void processDisconnect()
return;
}

public void Reconnect(ReconnectOptions reconnectOptions = null)
{
if (reconnectOptions != null)
{
if (reconnectOptions.FlushTimeout > 0)
{
try
{
Flush(reconnectOptions.FlushTimeout);
}
catch (Exception)
{
// don't really care since we are going to try to reconnect anyway
}
}
}
lastEx = null;
processReconnect();
}

// This will process a disconnect when reconnect is allowed.
// The lock should not be held on entering this function.
private void processReconnect()
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public interface IConnection : IDisposable
/// </summary>
Exception LastError { get; }

/// <summary>
/// Manually start reconnect behavior.
/// <param name="reconnectOptions">ReconnectOptions, optional</param>
/// </summary>
void Reconnect(ReconnectOptions reconnectOptions = null);

/// <summary>
/// Publishes <paramref name="data"/> to the given <paramref name="subject"/>.
/// </summary>
Expand Down
46 changes: 46 additions & 0 deletions src/NATS.Client/ReconnectOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Text;

namespace NATS.Client
{
/// <summary>
/// Options used when calling IConnection.Reconnect
/// </summary>
public sealed class ReconnectOptions
{
private int flushTimeout = 0;

/// <summary>
/// Milliseconds. If supplied and at least 1 millisecond, the Reconnect will
/// call IConnection.Flush(timeout) before closing and reconnecting.
/// </summary>
public int FlushTimeout
{
get { return flushTimeout; }
set { flushTimeout = value < 1 ? 0 : flushTimeout; }
}

/// <summary>
///
/// </summary>
/// <param name="timeout">The number of milliseconds to wait for the flush to complete.</param>
/// <returns></returns>
public ReconnectOptions WithFlushTimeout(int timeout)
{
FlushTimeout = timeout;
return this;
}
}
}
33 changes: 33 additions & 0 deletions src/Tests/IntegrationTests/TestReconnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
using NATS.Client.JetStream;
using UnitTests;
using Xunit;
using Xunit.Abstractions;

namespace IntegrationTests
{
Expand Down Expand Up @@ -675,6 +678,36 @@ public void TestMaxReconnectOnConnect()

t.Join(5000);
}


[Fact]
public void TestForceReconnect()
{
Action<Options> OptionsModifier = options =>
{
options.IgnoreDiscoveredServers = true;
options.NoRandomize = true;
};

Context.RunInJsCluster(OptionsModifier, (c0, c1, c2) =>
{
ServerInfo si = c0.ServerInfo;
string connectedServer = si.ServerId;

c0.Reconnect(null);
for (int x = 0; x < 10; x++)
{
if (c0.State == ConnState.CONNECTED)
{
break;
}
Thread.Sleep(100);
}

si = c0.ServerInfo;
Assert.NotEqual(connectedServer, si.ServerId);
});
}
}

public class TestPublishErrorsDuringReconnect : TestSuite<PublishErrorsDuringReconnectSuiteContext>
Expand Down
117 changes: 90 additions & 27 deletions src/Tests/IntegrationTests/TestSuite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public static class TestSeedPorts
public const int DefaultSuiteNormalClusterServers = 4551; //7pc

public const int AuthorizationSuite = 11490; //3pc
public const int ReconnectSuite = 11493; //1pc
public const int PublishErrorsDuringReconnectSuite = 11494; //1pc
public const int ClusterSuite = 11495; //10pc
public const int ConnectionSuite = 11505;//2pc
Expand All @@ -79,8 +78,7 @@ public static class TestSeedPorts
public const int RxSuite = 11517; //1pc
public const int AsyncAwaitDeadlocksSuite = 11518; //1pc
public const int ConnectionIpV6Suite = 11519; //1pc
public const int KvSuite = 11520; //4pc
public const int ConnectionBehaviorSuite = 11524; //2pc
public const int ConnectionBehaviorSuite = 11520; //2pc

public static InterlockedInt AutoPort = new InterlockedInt(11550);
}
Expand Down Expand Up @@ -238,7 +236,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
streamWriter.WriteLine("port: " + hubServerInfo.Port);
streamWriter.WriteLine("server_name: " + HubDomain);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir());
streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir());
streamWriter.WriteLine(" domain: " + HubDomain);
streamWriter.WriteLine("}");
streamWriter.WriteLine("leafnodes {");
Expand All @@ -253,7 +251,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
streamWriter.WriteLine("port: " + leafServerInfo.Port);
streamWriter.WriteLine("server_name: " + LeafDomain);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir());
streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir());
streamWriter.WriteLine(" domain: " + LeafDomain);
streamWriter.WriteLine("}");
streamWriter.WriteLine("leafnodes {");
Expand Down Expand Up @@ -281,6 +279,72 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
}
}

private static String makeClusterConfFile(string cluster, string serverPrefix, int serverId, TestServerInfo port, TestServerInfo listen, TestServerInfo route1, TestServerInfo route2) {
string confFile = TestBase.TempConfFile();
StreamWriter streamWriter = File.CreateText(confFile);
streamWriter.WriteLine("port: " + port.Port);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir=" + TestBase.TempJsStoreDir());
streamWriter.WriteLine("}");
streamWriter.WriteLine("server_name=" + serverPrefix + serverId);
streamWriter.WriteLine("cluster {");
streamWriter.WriteLine(" name: " + cluster);
streamWriter.WriteLine(" listen: 127.0.0.1:" + listen.Port);
streamWriter.WriteLine(" routes: [");
streamWriter.WriteLine(" nats-route://127.0.0.1:" + route1.Port);
streamWriter.WriteLine(" nats-route://127.0.0.1:" + route2.Port);
streamWriter.WriteLine(" ]");
streamWriter.WriteLine("}");
streamWriter.Flush();
streamWriter.Close();
return confFile;
}

public void RunInJsCluster(Action<Options> optionsModifier,
TestServerInfo info1, TestServerInfo info2, TestServerInfo info3,
TestServerInfo listen1, TestServerInfo listen2, TestServerInfo listen3,
Action<IConnection, IConnection, IConnection> test)
{
string unique = Nuid.NextGlobalSequence();
string cluster = $"clstr{unique}";
string serverPrefix = $"srvr{unique}";

string confFile1 = makeClusterConfFile(cluster, serverPrefix, 1, info1, listen1, listen2, listen3);
string confFile2 = makeClusterConfFile(cluster, serverPrefix, 2, info2, listen2, listen1, listen3);
string confFile3 = makeClusterConfFile(cluster, serverPrefix, 3, info3, listen3, listen1, listen2);

Action<Options> ClusterOptionsModifier = options =>
{
options.Servers = new []{ info1.Url.ToString(), info2.Url.ToString(), info3.Url.ToString() };
NATSServer.QuietOptionsModifier.Invoke(options);
if (optionsModifier != null)
{
optionsModifier.Invoke(options);
}
};

using (var srv1 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile1}"))
using (var srv2 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile2}"))
using (var srv3 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile3}"))
{
using (var c1 = OpenConnection(info1.Port, ClusterOptionsModifier))
using (var c2 = OpenConnection(info2.Port, ClusterOptionsModifier))
using (var c3 = OpenConnection(info3.Port, ClusterOptionsModifier))
{
try
{
test(c1, c2, c3);
}
finally
{
CleanupJs(c1);
CleanupJs(c2);
CleanupJs(c3);
}
}
}
}

public void CleanupJs(IConnection c)
{
try
Expand Down Expand Up @@ -436,13 +500,6 @@ public class EncodingSuiteContext : SuiteContext
public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
}

public class ReconnectSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.ReconnectSuite;

public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
}

public class PublishErrorsDuringReconnectSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.PublishErrorsDuringReconnectSuite;
Expand Down Expand Up @@ -479,25 +536,31 @@ public class JetStreamPublishSuiteContext : OneServerSuiteContext {}
public class JetStreamPushAsyncSuiteContext : OneServerSuiteContext {}
public class JetStreamPushSyncSuiteContext : OneServerSuiteContext {}
public class JetStreamPushSyncQueueSuiteContext : OneServerSuiteContext {}
public class KeyValueSuiteContext : HubLeafSuiteContext {}
public class ObjectStoreSuiteContext : HubLeafSuiteContext {}
public class KeyValueSuiteContext : JsClusterSuiteContext {}
public class ObjectStoreSuiteContext : JsClusterSuiteContext {}
public class ReconnectSuiteContext : JsClusterSuiteContext {}

public class HubLeafSuiteContext : SuiteContext
public class JsClusterSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.KvSuite;

public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
public readonly TestServerInfo Server2 = new TestServerInfo(SeedPort + 1);
public readonly TestServerInfo Server3 = new TestServerInfo(SeedPort + 2);
public readonly TestServerInfo Server4 = new TestServerInfo(SeedPort + 3);

public void RunInJsServer(Action<IConnection> test) => base.RunInJsServer(Server4, test);
public void RunInJsServer(Func<ServerInfo, bool> versionCheck, Action<IConnection> test) => base.RunInJsServer(Server4, versionCheck, null, test);
public void RunInServer(Action<IConnection> test) => base.RunInServer(Server4, test);
public readonly TestServerInfo Server1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());

public void RunInJsServer(Action<IConnection> test) => base.RunInJsServer(Server1, test);
public void RunInJsServer(Func<ServerInfo, bool> versionCheck, Action<IConnection> test) => base.RunInJsServer(Server1, versionCheck, null, test);
public void RunInServer(Action<IConnection> test) => base.RunInServer(Server1, test);
public void RunInJsHubLeaf(Action<IConnection, IConnection> test) =>
base.RunInJsHubLeaf(Server1, Server2, Server3, test);
base.RunInJsHubLeaf(Cluster0, Cluster1, Cluster2, test);
public void RunInJsCluster(Action<IConnection, IConnection, IConnection> test) =>
base.RunInJsCluster(null, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test);
public void RunInJsCluster(Action<Options> optionsModifier, Action<IConnection, IConnection, IConnection> test) =>
base.RunInJsCluster(optionsModifier, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test);
}

public class OneServerSuiteContext : SuiteContext
{
public readonly TestServerInfo Server1;
Expand Down
6 changes: 6 additions & 0 deletions src/Tests/UnitTests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ public static string TempConfDir()
return Path.GetTempPath().Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash
}

public static string TempJsStoreDir()
{
DirectoryInfo info = Directory.CreateDirectory(Path.Combine(Path.GetTempPath(), Nuid.NextGlobalSequence()));
return info.FullName.Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash
}

public static string TempConfFile()
{
return Path.GetTempPath() + "nats_net_test" + Guid.NewGuid() + ".conf";
Expand Down

0 comments on commit 0633fb9

Please sign in to comment.