diff --git a/src/NATS.Client/Connection.cs b/src/NATS.Client/Connection.cs index fba63dda..9584d3da 100644 --- a/src/NATS.Client/Connection.cs +++ b/src/NATS.Client/Connection.cs @@ -1534,6 +1534,26 @@ private void processDisconnect() return; } + public void Reconnect(ReconnectOptions reconnectOptions = null) + { + if (reconnectOptions != null) + { + if (reconnectOptions.FlushTimeout > 0) + { + try + { + Flush(reconnectOptions.FlushTimeout); + } + catch (Exception) + { + // don't really care since we are going to try to reconnect anyway + } + } + } + lastEx = null; + processReconnect(); + } + // This will process a disconnect when reconnect is allowed. // The lock should not be held on entering this function. private void processReconnect() diff --git a/src/NATS.Client/IConnection.cs b/src/NATS.Client/IConnection.cs index 90f56f14..7faa5c3f 100644 --- a/src/NATS.Client/IConnection.cs +++ b/src/NATS.Client/IConnection.cs @@ -92,6 +92,12 @@ public interface IConnection : IDisposable /// Exception LastError { get; } + /// + /// Manually start reconnect behavior. + /// ReconnectOptions, optional + /// + void Reconnect(ReconnectOptions reconnectOptions = null); + /// /// Publishes to the given . /// diff --git a/src/NATS.Client/ReconnectOptions.cs b/src/NATS.Client/ReconnectOptions.cs new file mode 100644 index 00000000..388c46a2 --- /dev/null +++ b/src/NATS.Client/ReconnectOptions.cs @@ -0,0 +1,46 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Text; + +namespace NATS.Client +{ + /// + /// Options used when calling IConnection.Reconnect + /// + public sealed class ReconnectOptions + { + private int flushTimeout = 0; + + /// + /// Milliseconds. If supplied and at least 1 millisecond, the Reconnect will + /// call IConnection.Flush(timeout) before closing and reconnecting. + /// + public int FlushTimeout + { + get { return flushTimeout; } + set { flushTimeout = value < 1 ? 0 : flushTimeout; } + } + + /// + /// + /// + /// The number of milliseconds to wait for the flush to complete. + /// + public ReconnectOptions WithFlushTimeout(int timeout) + { + FlushTimeout = timeout; + return this; + } + } +} diff --git a/src/Tests/IntegrationTests/TestReconnect.cs b/src/Tests/IntegrationTests/TestReconnect.cs index 7936fd2e..c7578a3e 100644 --- a/src/Tests/IntegrationTests/TestReconnect.cs +++ b/src/Tests/IntegrationTests/TestReconnect.cs @@ -18,7 +18,10 @@ using System.Threading; using System.Threading.Tasks; using NATS.Client; +using NATS.Client.JetStream; +using UnitTests; using Xunit; +using Xunit.Abstractions; namespace IntegrationTests { @@ -675,6 +678,36 @@ public void TestMaxReconnectOnConnect() t.Join(5000); } + + + [Fact] + public void TestForceReconnect() + { + Action OptionsModifier = options => + { + options.IgnoreDiscoveredServers = true; + options.NoRandomize = true; + }; + + Context.RunInJsCluster(OptionsModifier, (c0, c1, c2) => + { + ServerInfo si = c0.ServerInfo; + string connectedServer = si.ServerId; + + c0.Reconnect(null); + for (int x = 0; x < 10; x++) + { + if (c0.State == ConnState.CONNECTED) + { + break; + } + Thread.Sleep(100); + } + + si = c0.ServerInfo; + Assert.NotEqual(connectedServer, si.ServerId); + }); + } } public class TestPublishErrorsDuringReconnect : TestSuite diff --git a/src/Tests/IntegrationTests/TestSuite.cs b/src/Tests/IntegrationTests/TestSuite.cs index 3030ff06..61cfc757 100644 --- a/src/Tests/IntegrationTests/TestSuite.cs +++ b/src/Tests/IntegrationTests/TestSuite.cs @@ -66,7 +66,6 @@ public static class TestSeedPorts public const int DefaultSuiteNormalClusterServers = 4551; //7pc public const int AuthorizationSuite = 11490; //3pc - public const int ReconnectSuite = 11493; //1pc public const int PublishErrorsDuringReconnectSuite = 11494; //1pc public const int ClusterSuite = 11495; //10pc public const int ConnectionSuite = 11505;//2pc @@ -79,8 +78,7 @@ public static class TestSeedPorts public const int RxSuite = 11517; //1pc public const int AsyncAwaitDeadlocksSuite = 11518; //1pc public const int ConnectionIpV6Suite = 11519; //1pc - public const int KvSuite = 11520; //4pc - public const int ConnectionBehaviorSuite = 11524; //2pc + public const int ConnectionBehaviorSuite = 11520; //2pc public static InterlockedInt AutoPort = new InterlockedInt(11550); } @@ -238,7 +236,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo, streamWriter.WriteLine("port: " + hubServerInfo.Port); streamWriter.WriteLine("server_name: " + HubDomain); streamWriter.WriteLine("jetstream {"); - streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir()); + streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir()); streamWriter.WriteLine(" domain: " + HubDomain); streamWriter.WriteLine("}"); streamWriter.WriteLine("leafnodes {"); @@ -253,7 +251,7 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo, streamWriter.WriteLine("port: " + leafServerInfo.Port); streamWriter.WriteLine("server_name: " + LeafDomain); streamWriter.WriteLine("jetstream {"); - streamWriter.WriteLine(" store_dir: " + TestBase.TempConfDir()); + streamWriter.WriteLine(" store_dir: " + TestBase.TempJsStoreDir()); streamWriter.WriteLine(" domain: " + LeafDomain); streamWriter.WriteLine("}"); streamWriter.WriteLine("leafnodes {"); @@ -281,6 +279,72 @@ public void RunInJsHubLeaf(TestServerInfo hubServerInfo, } } + private static String makeClusterConfFile(string cluster, string serverPrefix, int serverId, TestServerInfo port, TestServerInfo listen, TestServerInfo route1, TestServerInfo route2) { + string confFile = TestBase.TempConfFile(); + StreamWriter streamWriter = File.CreateText(confFile); + streamWriter.WriteLine("port: " + port.Port); + streamWriter.WriteLine("jetstream {"); + streamWriter.WriteLine(" store_dir=" + TestBase.TempJsStoreDir()); + streamWriter.WriteLine("}"); + streamWriter.WriteLine("server_name=" + serverPrefix + serverId); + streamWriter.WriteLine("cluster {"); + streamWriter.WriteLine(" name: " + cluster); + streamWriter.WriteLine(" listen: 127.0.0.1:" + listen.Port); + streamWriter.WriteLine(" routes: ["); + streamWriter.WriteLine(" nats-route://127.0.0.1:" + route1.Port); + streamWriter.WriteLine(" nats-route://127.0.0.1:" + route2.Port); + streamWriter.WriteLine(" ]"); + streamWriter.WriteLine("}"); + streamWriter.Flush(); + streamWriter.Close(); + return confFile; + } + + public void RunInJsCluster(Action optionsModifier, + TestServerInfo info1, TestServerInfo info2, TestServerInfo info3, + TestServerInfo listen1, TestServerInfo listen2, TestServerInfo listen3, + Action test) + { + string unique = Nuid.NextGlobalSequence(); + string cluster = $"clstr{unique}"; + string serverPrefix = $"srvr{unique}"; + + string confFile1 = makeClusterConfFile(cluster, serverPrefix, 1, info1, listen1, listen2, listen3); + string confFile2 = makeClusterConfFile(cluster, serverPrefix, 2, info2, listen2, listen1, listen3); + string confFile3 = makeClusterConfFile(cluster, serverPrefix, 3, info3, listen3, listen1, listen2); + + Action ClusterOptionsModifier = options => + { + options.Servers = new []{ info1.Url.ToString(), info2.Url.ToString(), info3.Url.ToString() }; + NATSServer.QuietOptionsModifier.Invoke(options); + if (optionsModifier != null) + { + optionsModifier.Invoke(options); + } + }; + + using (var srv1 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile1}")) + using (var srv2 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile2}")) + using (var srv3 = NATSServer.CreateJetStreamFast(int.MinValue, $"--config {confFile3}")) + { + using (var c1 = OpenConnection(info1.Port, ClusterOptionsModifier)) + using (var c2 = OpenConnection(info2.Port, ClusterOptionsModifier)) + using (var c3 = OpenConnection(info3.Port, ClusterOptionsModifier)) + { + try + { + test(c1, c2, c3); + } + finally + { + CleanupJs(c1); + CleanupJs(c2); + CleanupJs(c3); + } + } + } + } + public void CleanupJs(IConnection c) { try @@ -436,13 +500,6 @@ public class EncodingSuiteContext : SuiteContext public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort); } - public class ReconnectSuiteContext : SuiteContext - { - private const int SeedPort = TestSeedPorts.ReconnectSuite; - - public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort); - } - public class PublishErrorsDuringReconnectSuiteContext : SuiteContext { private const int SeedPort = TestSeedPorts.PublishErrorsDuringReconnectSuite; @@ -479,25 +536,31 @@ public class JetStreamPublishSuiteContext : OneServerSuiteContext {} public class JetStreamPushAsyncSuiteContext : OneServerSuiteContext {} public class JetStreamPushSyncSuiteContext : OneServerSuiteContext {} public class JetStreamPushSyncQueueSuiteContext : OneServerSuiteContext {} - public class KeyValueSuiteContext : HubLeafSuiteContext {} - public class ObjectStoreSuiteContext : HubLeafSuiteContext {} + public class KeyValueSuiteContext : JsClusterSuiteContext {} + public class ObjectStoreSuiteContext : JsClusterSuiteContext {} + public class ReconnectSuiteContext : JsClusterSuiteContext {} - public class HubLeafSuiteContext : SuiteContext + public class JsClusterSuiteContext : SuiteContext { - private const int SeedPort = TestSeedPorts.KvSuite; - - public readonly TestServerInfo Server1 = new TestServerInfo(SeedPort); - public readonly TestServerInfo Server2 = new TestServerInfo(SeedPort + 1); - public readonly TestServerInfo Server3 = new TestServerInfo(SeedPort + 2); - public readonly TestServerInfo Server4 = new TestServerInfo(SeedPort + 3); - - public void RunInJsServer(Action test) => base.RunInJsServer(Server4, test); - public void RunInJsServer(Func versionCheck, Action test) => base.RunInJsServer(Server4, versionCheck, null, test); - public void RunInServer(Action test) => base.RunInServer(Server4, test); + public readonly TestServerInfo Server1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Cluster0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Cluster1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Cluster2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Listen0 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Listen1 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + public readonly TestServerInfo Listen2 = new TestServerInfo(TestSeedPorts.AutoPort.Increment()); + + public void RunInJsServer(Action test) => base.RunInJsServer(Server1, test); + public void RunInJsServer(Func versionCheck, Action test) => base.RunInJsServer(Server1, versionCheck, null, test); + public void RunInServer(Action test) => base.RunInServer(Server1, test); public void RunInJsHubLeaf(Action test) => - base.RunInJsHubLeaf(Server1, Server2, Server3, test); + base.RunInJsHubLeaf(Cluster0, Cluster1, Cluster2, test); + public void RunInJsCluster(Action test) => + base.RunInJsCluster(null, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test); + public void RunInJsCluster(Action optionsModifier, Action test) => + base.RunInJsCluster(optionsModifier, Cluster0, Cluster1, Cluster2, Listen0, Listen1, Listen2, test); } - + public class OneServerSuiteContext : SuiteContext { public readonly TestServerInfo Server1; diff --git a/src/Tests/UnitTests/TestBase.cs b/src/Tests/UnitTests/TestBase.cs index 03a657c8..b5cabec4 100644 --- a/src/Tests/UnitTests/TestBase.cs +++ b/src/Tests/UnitTests/TestBase.cs @@ -119,6 +119,12 @@ public static string TempConfDir() return Path.GetTempPath().Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash } + public static string TempJsStoreDir() + { + DirectoryInfo info = Directory.CreateDirectory(Path.Combine(Path.GetTempPath(), Nuid.NextGlobalSequence())); + return info.FullName.Replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash + } + public static string TempConfFile() { return Path.GetTempPath() + "nats_net_test" + Guid.NewGuid() + ".conf";