From f5ea81b700debe7b9a6ec578b5c2979b6fadeb68 Mon Sep 17 00:00:00 2001 From: msft-paddy14 <97080072+msft-paddy14@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:07:31 +0530 Subject: [PATCH 1/6] Remove meet lock --- libs/cluster/Server/GarnetServerNode.cs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index 0c8d554fba..19251aeee5 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -29,11 +29,6 @@ internal sealed class GarnetServerNode /// </summary> ClusterConfig lastConfig = null; - /// <summary> - /// Gossip with meet command lock - /// </summary> - SingleWriterMultiReaderLock meetLock; - /// <summary> /// Outstanding gossip task if any /// </summary> @@ -202,17 +197,9 @@ private Task Gossip(byte[] configByteArray) /// <returns></returns> public MemoryResult<byte> TryMeet(byte[] configByteArray) { - try - { - _ = meetLock.TryWriteLock(); - UpdateGossipSend(); - var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).GetAwaiter().GetResult(); - return resp; - } - finally - { - meetLock.WriteUnlock(); - } + UpdateGossipSend(); + var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).GetAwaiter().GetResult(); + return resp; } /// <summary> @@ -273,4 +260,4 @@ public ConnectionInfo GetConnectionInfo() }; } } -} \ No newline at end of file +} From fb5996b07abe49e0a8d9cfcdf98c56c105cfd13d Mon Sep 17 00:00:00 2001 From: msft-paddy14 <97080072+msft-paddy14@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:15:50 +0530 Subject: [PATCH 2/6] Fix newline at end of GarnetServerNode.cs file --- libs/cluster/Server/GarnetServerNode.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index 19251aeee5..6c600331fa 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. using System; From 156319df5a6b25653a894fc95fa55b70d3d8713e Mon Sep 17 00:00:00 2001 From: msft-paddy14 <97080072+msft-paddy14@users.noreply.github.com> Date: Wed, 15 Jan 2025 07:56:25 +0530 Subject: [PATCH 3/6] fix formatting From 2cebec769b1e2ec41f0f4060d78b18338175b9ed Mon Sep 17 00:00:00 2001 From: "Padmanabh Gupta (from Dev Box)" <padgupta@microsoft.com> Date: Wed, 15 Jan 2025 08:36:52 +0530 Subject: [PATCH 4/6] Make all background connection handling async --- .../Server/Failover/ReplicaFailoverSession.cs | 16 ++++++++-------- libs/cluster/Server/GarnetServerNode.cs | 10 +++++----- libs/cluster/Server/Gossip.cs | 16 ++++++++-------- libs/cluster/Server/Migration/MigrationDriver.cs | 8 ++++---- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs index b7f0e48dec..dcbcc7dda9 100644 --- a/libs/cluster/Server/Failover/ReplicaFailoverSession.cs +++ b/libs/cluster/Server/Failover/ReplicaFailoverSession.cs @@ -36,7 +36,7 @@ internal sealed partial class FailoverSession : IDisposable /// <param name="nodeId">Node-id to use for search the connection array</param> /// <returns></returns> /// <exception cref="GarnetException"></exception> - private GarnetClient GetOrAddConnection(string nodeId) + private async Task<GarnetClient> GetOrAddConnectionAsync(string nodeId) { _ = clusterProvider.clusterManager.clusterConnectionStore.GetConnection(nodeId, out var gsn); @@ -65,7 +65,7 @@ private GarnetClient GetOrAddConnection(string nodeId) throw new GarnetException($"Connection not established to node {nodeId}"); } - gsn.Initialize(); + await gsn.InitializeAsync(); return gsn.Client; } @@ -75,7 +75,7 @@ private GarnetClient GetOrAddConnection(string nodeId) /// </summary> /// <param name="nodeId">Id of node to create connection for</param> /// <returns></returns> - private GarnetClient CreateConnection(string nodeId) + private async Task<GarnetClient> CreateConnectionAsync(string nodeId) { var (address, port) = oldConfig.GetEndpointFromNodeId(nodeId); var client = new GarnetClient( @@ -90,7 +90,7 @@ private GarnetClient CreateConnection(string nodeId) try { if (!client.IsConnected) - client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token).GetAwaiter().GetResult(); + await client.ReconnectAsync().WaitAsync(failoverTimeout, cts.Token); return client; } @@ -108,9 +108,9 @@ private GarnetClient CreateConnection(string nodeId) /// </summary> /// <param name="nodeId"></param> /// <returns></returns> - private GarnetClient GetConnection(string nodeId) + private Task<GarnetClient> GetConnectionAsync(string nodeId) { - return useGossipConnections ? GetOrAddConnection(nodeId) : CreateConnection(nodeId); + return useGossipConnections ? GetOrAddConnectionAsync(nodeId) : CreateConnectionAsync(nodeId); } /// <summary> @@ -120,7 +120,7 @@ private GarnetClient GetConnection(string nodeId) private async Task<bool> PauseWritesAndWaitForSync() { var primaryId = oldConfig.LocalNodePrimaryId; - var client = GetConnection(primaryId); + var client = await GetConnectionAsync(primaryId); try { if (client == null) @@ -210,7 +210,7 @@ private async Task BroadcastConfigAndRequestAttach(string replicaId, byte[] conf { var oldPrimaryId = oldConfig.LocalNodePrimaryId; var newConfig = clusterProvider.clusterManager.CurrentConfig; - var client = oldPrimaryId.Equals(replicaId) ? primaryClient : GetConnection(replicaId); + var client = oldPrimaryId.Equals(replicaId) ? primaryClient : await GetConnectionAsync(replicaId); try { diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index 6c600331fa..2a873c982e 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -90,13 +90,13 @@ public GarnetServerNode(ClusterProvider clusterProvider, string address, int por /// Initialize connection and cancellation tokens. /// Initialization is performed only once /// </summary> - public void Initialize() + public Task InitializeAsync() { // Ensure initialize executes only once - if (Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return; + if (Interlocked.CompareExchange(ref initialized, 1, 0) != 0) return Task.CompletedTask; cts = CancellationTokenSource.CreateLinkedTokenSource(clusterProvider.clusterManager.ctsGossip.Token, internalCts.Token); - gc.ReconnectAsync().WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token).GetAwaiter().GetResult(); + return gc.ReconnectAsync().WaitAsync(clusterProvider.clusterManager.gossipDelay, cts.Token); } public void Dispose() @@ -195,10 +195,10 @@ private Task Gossip(byte[] configByteArray) /// </summary> /// <param name="configByteArray"></param> /// <returns></returns> - public MemoryResult<byte> TryMeet(byte[] configByteArray) + public Task<MemoryResult<byte>> TryMeet(byte[] configByteArray) { UpdateGossipSend(); - var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token).GetAwaiter().GetResult(); + var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token); return resp; } diff --git a/libs/cluster/Server/Gossip.cs b/libs/cluster/Server/Gossip.cs index ef4bdcac1e..71a27fb160 100644 --- a/libs/cluster/Server/Gossip.cs +++ b/libs/cluster/Server/Gossip.cs @@ -134,7 +134,7 @@ public bool TryMerge(ClusterConfig senderConfig, bool acquireLock = true) /// <param name="address"></param> /// <param name="port"></param> public void RunMeetTask(string address, int port) - => Task.Run(() => TryMeet(address, port)); + => Task.Run(() => TryMeetAsync(address, port)); /// <summary> /// This task will immediately communicate with the new node and try to merge the retrieve configuration to its own. @@ -143,7 +143,7 @@ public void RunMeetTask(string address, int port) /// <param name="address">Address of node to issue meet to</param> /// <param name="port"> Port of node to issue meet to</param> /// <param name="acquireLock">Whether to acquire lock for merging. Default true</param> - public void TryMeet(string address, int port, bool acquireLock = true) + public async Task TryMeetAsync(string address, int port, bool acquireLock = true) { GarnetServerNode gsn = null; var conf = CurrentConfig; @@ -165,10 +165,10 @@ public void TryMeet(string address, int port, bool acquireLock = true) // Initialize GarnetServerNode // Thread-Safe initialization executes only once - gsn.Initialize(); + await gsn.InitializeAsync(); // Send full config in Gossip - resp = gsn.TryMeet(conf.ToByteArray()); + resp = await gsn.TryMeet(conf.ToByteArray()); if (resp.Length > 0) { var other = ClusterConfig.FromByteArray(resp.Span.ToArray()); @@ -203,7 +203,7 @@ public void TryMeet(string address, int port, bool acquireLock = true) /// <summary> /// Main gossip async task /// </summary> - async void GossipMain() + async Task GossipMain() { // Main gossip loop try @@ -211,7 +211,7 @@ async void GossipMain() while (true) { if (ctsGossip.Token.IsCancellationRequested) return; - InitConnections(); + await InitConnections(); // Choose between full broadcast or sample gossip to few nodes if (GossipSamplePercent == 100) @@ -240,7 +240,7 @@ async void GossipMain() } // Initialize connections for nodes that have either been dispose due to banlist (after expiry) or timeout - void InitConnections() + async Task InitConnections() { DisposeBannedWorkerConnections(); @@ -263,7 +263,7 @@ void InitConnections() }; try { - gsn.Initialize(); + await gsn.InitializeAsync(); if (!clusterConnectionStore.AddConnection(gsn)) gsn.Dispose(); } diff --git a/libs/cluster/Server/Migration/MigrationDriver.cs b/libs/cluster/Server/Migration/MigrationDriver.cs index cfd52f5cc4..0d5d12d97a 100644 --- a/libs/cluster/Server/Migration/MigrationDriver.cs +++ b/libs/cluster/Server/Migration/MigrationDriver.cs @@ -7,7 +7,7 @@ namespace Garnet.cluster { - internal sealed unsafe partial class MigrateSession : IDisposable + internal sealed partial class MigrateSession : IDisposable { /// <summary> /// Begin migration task @@ -49,7 +49,7 @@ public bool TryStartMigrationTask(out ReadOnlySpan<byte> errorMessage) /// <summary> /// Migrate slots session background task /// </summary> - private void BeginAsyncMigrationTask() + private async Task BeginAsyncMigrationTask() { var configResumed = true; try @@ -91,7 +91,7 @@ private void BeginAsyncMigrationTask() // Lock config merge to avoid a background epoch bump clusterProvider.clusterManager.SuspendConfigMerge(); configResumed = false; - clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false); + await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false); // Change ownership of slots to target node. if (!TrySetSlotRanges(GetTargetNodeId, MigrateState.NODE)) @@ -112,7 +112,7 @@ private void BeginAsyncMigrationTask() } // Gossip again to ensure that source and target agree on the slot exchange - clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false); + await clusterProvider.clusterManager.TryMeetAsync(_targetAddress, _targetPort, acquireLock: false); // Ensure that config merge resumes clusterProvider.clusterManager.ResumeConfigMerge(); From 364eac52914c3e3a330b19a15a00c928e0e37704 Mon Sep 17 00:00:00 2001 From: "Padmanabh Gupta (from Dev Box)" <padgupta@microsoft.com> Date: Wed, 15 Jan 2025 08:40:32 +0530 Subject: [PATCH 5/6] formatting fix --- libs/cluster/Server/GarnetServerNode.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index 2a873c982e..1c6481f62d 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -260,4 +260,4 @@ public ConnectionInfo GetConnectionInfo() }; } } -} +} \ No newline at end of file From e56af599c064ef2cdcdf106aa341187584d39fe8 Mon Sep 17 00:00:00 2001 From: "Padmanabh Gupta (from Dev Box)" <padgupta@microsoft.com> Date: Thu, 16 Jan 2025 10:47:38 +0530 Subject: [PATCH 6/6] address comments for async/await syntax --- libs/cluster/Server/GarnetServerNode.cs | 4 ++-- libs/cluster/Server/Gossip.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libs/cluster/Server/GarnetServerNode.cs b/libs/cluster/Server/GarnetServerNode.cs index 1c6481f62d..dae6afb3e4 100644 --- a/libs/cluster/Server/GarnetServerNode.cs +++ b/libs/cluster/Server/GarnetServerNode.cs @@ -195,10 +195,10 @@ private Task Gossip(byte[] configByteArray) /// </summary> /// <param name="configByteArray"></param> /// <returns></returns> - public Task<MemoryResult<byte>> TryMeet(byte[] configByteArray) + public async Task<MemoryResult<byte>> TryMeetAsync(byte[] configByteArray) { UpdateGossipSend(); - var resp = gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token); + var resp = await gc.GossipWithMeet(configByteArray).WaitAsync(clusterProvider.clusterManager.clusterTimeout, cts.Token); return resp; } diff --git a/libs/cluster/Server/Gossip.cs b/libs/cluster/Server/Gossip.cs index 71a27fb160..e1b1987b60 100644 --- a/libs/cluster/Server/Gossip.cs +++ b/libs/cluster/Server/Gossip.cs @@ -134,7 +134,7 @@ public bool TryMerge(ClusterConfig senderConfig, bool acquireLock = true) /// <param name="address"></param> /// <param name="port"></param> public void RunMeetTask(string address, int port) - => Task.Run(() => TryMeetAsync(address, port)); + => Task.Run(async () => await TryMeetAsync(address, port)); /// <summary> /// This task will immediately communicate with the new node and try to merge the retrieve configuration to its own. @@ -168,7 +168,7 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true await gsn.InitializeAsync(); // Send full config in Gossip - resp = await gsn.TryMeet(conf.ToByteArray()); + resp = await gsn.TryMeetAsync(conf.ToByteArray()); if (resp.Length > 0) { var other = ClusterConfig.FromByteArray(resp.Span.ToArray());