Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Reconnect #906

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/NATS.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,26 @@ private void processDisconnect()
return;
}

public void Reconnect(ReconnectOptions reconnectOptions = null)
{
if (reconnectOptions != null)
{
if (reconnectOptions.FlushTimeout > 0)
{
try
{
Flush(reconnectOptions.FlushTimeout);
}
catch (Exception)
{
// don't really care since we are going to try to reconnect anyway
}
}
}
lastEx = null;
processReconnect();
}

// This will process a disconnect when reconnect is allowed.
// The lock should not be held on entering this function.
private void processReconnect()
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public interface IConnection : IDisposable
/// </summary>
Exception LastError { get; }

/// <summary>
/// Manually start reconnect behavior.
/// <param name="reconnectOptions">ReconnectOptions, optional</param>
/// </summary>
void Reconnect(ReconnectOptions reconnectOptions = null);

/// <summary>
/// Publishes <paramref name="data"/> to the given <paramref name="subject"/>.
/// </summary>
Expand Down
46 changes: 46 additions & 0 deletions src/NATS.Client/ReconnectOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Text;

namespace NATS.Client
{
/// <summary>
/// Options used when calling IConnection.Reconnect
/// </summary>
public sealed class ReconnectOptions
{
private int flushTimeout = 0;

/// <summary>
/// Milliseconds. If supplied and at least 1 millisecond, the Reconnect will
/// call IConnection.Flush(timeout) before closing and reconnecting.
/// </summary>
public int FlushTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document unit (seconds, ms)

{
get { return flushTimeout; }
set { flushTimeout = value < 1 ? 0 : flushTimeout; }
}

/// <summary>
///
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc

/// </summary>
/// <param name="timeout">The number of milliseconds to wait for the flush to complete.</param>
/// <returns></returns>
public ReconnectOptions WithFlushTimeout(int timeout)
{
FlushTimeout = timeout;
return this;
}
}
}
33 changes: 33 additions & 0 deletions src/Tests/IntegrationTests/TestReconnect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
using NATS.Client.JetStream;
using UnitTests;
using Xunit;
using Xunit.Abstractions;

namespace IntegrationTests
{
Expand Down Expand Up @@ -675,6 +678,36 @@ public void TestMaxReconnectOnConnect()

t.Join(5000);
}


[Fact]
public void TestForceReconnect()
{
Action<Options> OptionsModifier = options =>
{
options.IgnoreDiscoveredServers = true;
options.NoRandomize = true;
};

Context.RunInJsCluster(OptionsModifier, (c0, c1, c2) =>
{
ServerInfo si = c0.ServerInfo;
string connectedServer = si.ServerId;

c0.Reconnect(null);
for (int x = 0; x < 10; x++)
{
if (c0.State == ConnState.CONNECTED)
{
break;
}
Thread.Sleep(100);
}

si = c0.ServerInfo;
Assert.NotEqual(connectedServer, si.ServerId);
});
}
}

public class TestPublishErrorsDuringReconnect : TestSuite<PublishErrorsDuringReconnectSuiteContext>
Expand Down
117 changes: 90 additions & 27 deletions src/Tests/IntegrationTests/TestSuite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public static class TestSeedPorts
public const int DefaultSuiteNormalClusterServers = 4551; //7pc

public const int AuthorizationSuite = 11490; //3pc
public const int ReconnectSuite = 11493; //1pc
public const int PublishErrorsDuringReconnectSuite = 11494; //1pc
public const int ClusterSuite = 11495; //10pc
public const int ConnectionSuite = 11505;//2pc
Expand All @@ -79,8 +78,7 @@ public static class TestSeedPorts
public const int RxSuite = 11517; //1pc
public const int AsyncAwaitDeadlocksSuite = 11518; //1pc
public const int ConnectionIpV6Suite = 11519; //1pc
public const int KvSuite = 11520; //4pc
public const int ConnectionBehaviorSuite = 11524; //2pc
public const int ConnectionBehaviorSuite = 11520; //2pc

public static InterlockedInt AutoPort = new InterlockedInt(11550);
}
Expand Down Expand Up @@ -238,7 +236,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
streamWriter.WriteLine("port: " + hubServerInfo.Port);
streamWriter.WriteLine("server_name: " + HubDomain);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir());
streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir());
streamWriter.WriteLine(" domain: " + HubDomain);
streamWriter.WriteLine("}");
streamWriter.WriteLine("leafnodes {");
Expand All @@ -253,7 +251,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
streamWriter.WriteLine("port: " + leafServerInfo.Port);
streamWriter.WriteLine("server_name: " + LeafDomain);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir());
streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir());
streamWriter.WriteLine(" domain: " + LeafDomain);
streamWriter.WriteLine("}");
streamWriter.WriteLine("leafnodes {");
Expand Down Expand Up @@ -281,6 +279,72 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo,
}
}

private static String makeClusterConfFile(string cluster, string serverPrefix, int serverId, TestServerInfo port, TestServerInfo listen, TestServerInfo route1, TestServerInfo route2) {
string confFile = TestBase.TempConfFile();
StreamWriter streamWriter = File.CreateText(confFile);
streamWriter.WriteLine("port: " + port.Port);
streamWriter.WriteLine("jetstream {");
streamWriter.WriteLine(" store_dir=" + TestBase.TempJsStoreDir());
streamWriter.WriteLine("}");
streamWriter.WriteLine("server_name=" + serverPrefix + serverId);
streamWriter.WriteLine("cluster {");
streamWriter.WriteLine(" name: " + cluster);
streamWriter.WriteLine(" listen: 127.0.0.1:" + listen.Port);
streamWriter.WriteLine(" routes: [");
streamWriter.WriteLine(" nats-route://127.0.0.1:" + route1.Port);
streamWriter.WriteLine(" nats-route://127.0.0.1:" + route2.Port);
streamWriter.WriteLine(" ]");
streamWriter.WriteLine("}");
streamWriter.Flush();
streamWriter.Close();
return confFile;
}

public void RunInJsCluster(Action<Options> optionsModifier,
TestServerInfo info1, TestServerInfo info2, TestServerInfo info3,
TestServerInfo listen1, TestServerInfo listen2, TestServerInfo listen3,
Action<IConnection, IConnection, IConnection> test)
{
string unique = Nuid.NextGlobalSequence();
string cluster = $"clstr{unique}";
string serverPrefix = $"srvr{unique}";

string confFile1 = makeClusterConfFile(cluster, serverPrefix, 1, info1, listen1, listen2, listen3);
string confFile2 = makeClusterConfFile(cluster, serverPrefix, 2, info2, listen2, listen1, listen3);
string confFile3 = makeClusterConfFile(cluster, serverPrefix, 3, info3, listen3, listen1, listen2);

Action<Options> ClusterOptionsModifier = options =>
{
options.Servers = new []{ info1.Url.ToString(), info2.Url.ToString(), info3.Url.ToString() };
NATSServer.QuietOptionsModifier.Invoke(options);
if (optionsModifier != null)
{
optionsModifier.Invoke(options);
}
};

using (var srv1 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile1}"))
using (var srv2 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile2}"))
using (var srv3 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile3}"))
{
using (var c1 = OpenConnection(info1.Port, ClusterOptionsModifier))
using (var c2 = OpenConnection(info2.Port, ClusterOptionsModifier))
using (var c3 = OpenConnection(info3.Port, ClusterOptionsModifier))
{
try
{
test(c1, c2, c3);
}
finally
{
CleanupJs(c1);
CleanupJs(c2);
CleanupJs(c3);
}
}
}
}

public void CleanupJs(IConnection c)
{
try
Expand Down Expand Up @@ -436,13 +500,6 @@ public class EncodingSuiteContext : SuiteContext
public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
}

public class ReconnectSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.ReconnectSuite;

public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
}

public class PublishErrorsDuringReconnectSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.PublishErrorsDuringReconnectSuite;
Expand Down Expand Up @@ -479,25 +536,31 @@ public class JetStreamPublishSuiteContext : OneServerSuiteContext {}
public class JetStreamPushAsyncSuiteContext : OneServerSuiteContext {}
public class JetStreamPushSyncSuiteContext : OneServerSuiteContext {}
public class JetStreamPushSyncQueueSuiteContext : OneServerSuiteContext {}
public class KeyValueSuiteContext : HubLeafSuiteContext {}
public class ObjectStoreSuiteContext : HubLeafSuiteContext {}
public class KeyValueSuiteContext : JsClusterSuiteContext {}
public class ObjectStoreSuiteContext : JsClusterSuiteContext {}
public class ReconnectSuiteContext : JsClusterSuiteContext {}

public class HubLeafSuiteContext : SuiteContext
public class JsClusterSuiteContext : SuiteContext
{
private const int SeedPort = TestSeedPorts.KvSuite;

public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort);
public readonly TestServerInfo Server2 = new TestServerInfo(SeedPort + 1);
public readonly TestServerInfo Server3 = new TestServerInfo(SeedPort + 2);
public readonly TestServerInfo Server4 = new TestServerInfo(SeedPort + 3);

public void RunInJsServer(Action<IConnection> test) => base.RunInJsServer(Server4, test);
public void RunInJsServer(Func<ServerInfo, bool> versionCheck, Action<IConnection> test) => base.RunInJsServer(Server4, versionCheck, null, test);
public void RunInServer(Action<IConnection> test) => base.RunInServer(Server4, test);
public readonly TestServerInfo Server1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Cluster2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());
public readonly TestServerInfo Listen2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment());

public void RunInJsServer(Action<IConnection> test) => base.RunInJsServer(Server1, test);
public void RunInJsServer(Func<ServerInfo, bool> versionCheck, Action<IConnection> test) => base.RunInJsServer(Server1, versionCheck, null, test);
public void RunInServer(Action<IConnection> test) => base.RunInServer(Server1, test);
public void RunInJsHubLeaf(Action<IConnection, IConnection> test) =>
base.RunInJsHubLeaf(Server1, Server2, Server3, test);
base.RunInJsHubLeaf(Cluster0, Cluster1, Cluster2, test);
public void RunInJsCluster(Action<IConnection, IConnection, IConnection> test) =>
base.RunInJsCluster(null, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test);
public void RunInJsCluster(Action<Options> optionsModifier, Action<IConnection, IConnection, IConnection> test) =>
base.RunInJsCluster(optionsModifier, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test);
}

public class OneServerSuiteContext : SuiteContext
{
public readonly TestServerInfo Server1;
Expand Down
6 changes: 6 additions & 0 deletions src/Tests/UnitTests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ public static string TempConfDir()
return Path.GetTempPath().Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash
}

public static string TempJsStoreDir()
{
DirectoryInfo info = Directory.CreateDirectory(Path.Combine(Path.GetTempPath(), Nuid.NextGlobalSequence()));
return info.FullName.Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash
}

public static string TempConfFile()
{
return Path.GetTempPath() + "nats_net_test" + Guid.NewGuid() + ".conf";
Expand Down
Loading