From e51d73cfa251399cbca7416bff13d6604175acf5 Mon Sep 17 00:00:00 2001 From: aviv Date: Thu, 13 Jul 2023 17:46:58 +0300 Subject: [PATCH] RavenDB-20788 : DeleteBucketAsync : before starting cleanup, wait for DestinationMigrationConfirm command to be applied on all orchestrator nodes --- .../Sharding/ShardedDocumentDatabase.cs | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/Raven.Server/Documents/Sharding/ShardedDocumentDatabase.cs b/src/Raven.Server/Documents/Sharding/ShardedDocumentDatabase.cs index 7959f68c088..a4a003d6f16 100644 --- a/src/Raven.Server/Documents/Sharding/ShardedDocumentDatabase.cs +++ b/src/Raven.Server/Documents/Sharding/ShardedDocumentDatabase.cs @@ -1,9 +1,13 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; using System.Threading.Tasks; +using Raven.Client.Http; using Raven.Client.ServerWide; using Raven.Client.ServerWide.Sharding; using Raven.Server.Config; +using Raven.Server.Documents.Commands; using Raven.Server.Documents.Indexes; using Raven.Server.Documents.Indexes.Sharding; using Raven.Server.Documents.Replication; @@ -16,6 +20,7 @@ using Raven.Server.ServerWide.Context; using Raven.Server.Utils; using Sparrow; +using Sparrow.Json; using Sparrow.Utils; namespace Raven.Server.Documents.Sharding; @@ -139,8 +144,11 @@ public void HandleReshardingChanges() continue; } + Debug.Assert(process.ConfirmationIndex.HasValue, $"invalid ShardBucketMigration for bucket '{process.Bucket}', " + + "got Status = OwnershipTransferred but no ConfirmationIndex"); + // cleanup values - t = DeleteBucketAsync(process.Bucket, process.MigrationIndex, process.LastSourceChangeVector); + t = DeleteBucketAsync(process.Bucket, process.MigrationIndex, process.ConfirmationIndex.Value, process.LastSourceChangeVector); t.ContinueWith(__ => DocumentsMigrator.ExecuteMoveDocumentsAsync()); } @@ -195,7 +203,7 @@ public override void Dispose() } } - public async Task DeleteBucketAsync(int bucket, long migrationIndex, string uptoChangeVector) + public async Task DeleteBucketAsync(int bucket, long migrationIndex, long confirmationIndex, string uptoChangeVector) { if (string.IsNullOrEmpty(uptoChangeVector)) { @@ -203,6 +211,10 @@ public async Task DeleteBucketAsync(int bucket, long migrationIndex, string upto return; } + // before starting cleanup, wait for DestinationMigrationConfirm command + // to be applied on all orchestrator nodes + await WaitForConfirmationIndex(confirmationIndex); + while (true) { var cmd = new DeleteBucketCommand(this, bucket, uptoChangeVector); @@ -226,6 +238,30 @@ public async Task DeleteBucketAsync(int bucket, long migrationIndex, string upto } } + private async Task WaitForConfirmationIndex(long confirmationIndex) + { + var cmd = new WaitForIndexNotificationCommand(new List { confirmationIndex }); + var tasks = new List(ShardingConfiguration.Orchestrator.Topology.Members.Count); + var clusterTopology = ServerStore.GetClusterTopology(); + + using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context)) + { + foreach (var nodeTag in ShardingConfiguration.Orchestrator.Topology.Members) + { + var chosenNode = new ServerNode + { + ClusterTag = nodeTag, + Database = ShardedDatabaseName, + ServerRole = ServerNode.Role.Member, + Url = clusterTopology.GetUrlFromTag(nodeTag) + }; + tasks.Add(ServerStore.ClusterRequestExecutor.ExecuteAsync(chosenNode, null, context, cmd, token: DatabaseShutdown)); + } + + await Task.WhenAll(tasks); + } + } + public static ShardedDocumentDatabase CastToShardedDocumentDatabase(DocumentDatabase database) => database as ShardedDocumentDatabase ?? throw new ArgumentException($"Database {database.Name} must be sharded!"); public class DeleteBucketCommand : DocumentMergedTransactionCommand