Skip to content

Commit

Permalink
RavenDB-17793 : minor refactoring + cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
aviv committed Feb 5, 2024
1 parent a3ad92e commit 501f713
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public sealed class AddPrefixedShardingSettingOperation : IMaintenanceOperation
{
private readonly PrefixedShardingSetting _setting;


public AddPrefixedShardingSettingOperation(PrefixedShardingSetting setting)
{
_setting = setting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected PrefixedShardingCommand(DocumentConventions conventions, PrefixedShard

public override HttpRequestMessage CreateRequest(JsonOperationContext ctx, ServerNode node, out string url)
{
url = $"{node.Url}/databases/{node.Database}/admin/sharding/prefixes/{CommandType}";
url = $"{node.Url}/databases/{node.Database}/admin/sharding/prefixed/{CommandType}";

return new HttpRequestMessage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ public async Task ExecuteMoveDocuments()
HttpContext.Response.StatusCode = (int)HttpStatusCode.NoContent;
}

[RavenAction("/databases/*/admin/sharding/prefixes/add", "POST", AuthorizationStatus.DatabaseAdmin)]
public Task AddPrefixConfiguration()
[RavenAction("/databases/*/admin/sharding/prefixed/add", "POST", AuthorizationStatus.DatabaseAdmin)]
public Task AddPrefixedShardingSetting()
{
throw new NotSupportedInShardingException("This operation is not available from a specific shard");
}

[RavenAction("/databases/*/admin/sharding/prefixes/delete", "DELETE", AuthorizationStatus.DatabaseAdmin)]
public Task DeletePrefixConfiguration()
[RavenAction("/databases/*/admin/sharding/prefixed/delete", "DELETE", AuthorizationStatus.DatabaseAdmin)]
public Task DeletePrefixedShardingSetting()
{
throw new NotSupportedInShardingException("This operation is not available from a specific shard");

}

[RavenAction("/databases/*/admin/sharding/prefixes/update", "POST", AuthorizationStatus.DatabaseAdmin)]
public Task UpdatePrefixConfiguration()
[RavenAction("/databases/*/admin/sharding/prefixed/update", "POST", AuthorizationStatus.DatabaseAdmin)]
public Task UpdatePrefixedShardingSetting()
{
throw new NotSupportedInShardingException("This operation is not available from a specific shard");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public async Task ExecuteMoveDocuments()
await processor.ExecuteAsync();
}

[RavenShardedAction("/databases/*/admin/sharding/prefixes/add", "POST")]
public async Task AddPrefixConfiguration()
[RavenShardedAction("/databases/*/admin/sharding/prefixed/add", "POST")]
public async Task AddPrefixedShardingSetting()
{
using (ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
Expand All @@ -54,7 +54,7 @@ public async Task AddPrefixConfiguration()
$"There are existing documents in database '{DatabaseName}' that start with '{setting.Prefix}'. " +
"In order to define sharding by prefix, you cannot have any documents in the database that starts with this prefix.");

var cmd = new AddPrefixedSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var cmd = new AddPrefixedShardingSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var (raftIndex, _) = await ServerStore.SendToLeaderAsync(cmd);

await DatabaseContext.ServerStore.WaitForExecutionOnRelevantNodesAsync(context, shardingConfiguration.Orchestrator.Topology.Members, raftIndex);
Expand All @@ -63,8 +63,8 @@ public async Task AddPrefixConfiguration()
}
}

[RavenShardedAction("/databases/*/admin/sharding/prefixes/delete", "DELETE")]
public async Task DeletePrefixConfiguration()
[RavenShardedAction("/databases/*/admin/sharding/prefixed/delete", "DELETE")]
public async Task DeletePrefixedShardingSetting()
{
using (ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
Expand All @@ -86,7 +86,7 @@ public async Task DeletePrefixConfiguration()
$"There are existing documents in database '{DatabaseName}' that start with '{setting.Prefix}'. " +
"In order to remove a sharding by prefix setting, you cannot have any documents in the database that starts with this prefix.");

var cmd = new DeletePrefixedSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var cmd = new DeletePrefixedShardingSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var (raftIndex, _) = await ServerStore.SendToLeaderAsync(cmd);

await DatabaseContext.ServerStore.WaitForExecutionOnRelevantNodesAsync(context, shardingConfiguration.Orchestrator.Topology.Members, raftIndex);
Expand All @@ -95,8 +95,8 @@ public async Task DeletePrefixConfiguration()
}
}

[RavenShardedAction("/databases/*/admin/sharding/prefixes/update", "POST")]
public async Task UpdatePrefixConfiguration()
[RavenShardedAction("/databases/*/admin/sharding/prefixed/update", "POST")]
public async Task UpdatePrefixedShardingSetting()
{
using (ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
Expand All @@ -111,69 +111,9 @@ public async Task UpdatePrefixConfiguration()
throw new InvalidDataException($"Prefix '{setting.Prefix}' wasn't found in sharding configuration");

var oldSetting = shardingConfiguration.Prefixed[location];
var removedShards = oldSetting.Shards;
AssertValidShardsDistribution(oldSetting, setting, shardingConfiguration);

foreach (var shard in setting.Shards)
{
if (oldSetting.Shards.Contains(shard))
removedShards.Remove(shard);
else if (shardingConfiguration.Shards.ContainsKey(shard) == false)
throw new InvalidDataException($"Cannot assign shard number {shard} to prefix {setting.Prefix}, " +
$"there's no shard '{shard}' in sharding topology!");
}

if (removedShards.Count > 0)
{
// check that there are no bucket ranges mapped to these shards

int index;
bool found = false;
var prefixBucketRangeStart = oldSetting.BucketRangeStart;

for (index = 0; index < shardingConfiguration.BucketRanges.Count; index++)
{
var range = shardingConfiguration.BucketRanges[index];
if (range.BucketRangeStart < prefixBucketRangeStart)
continue;

if (range.BucketRangeStart == prefixBucketRangeStart)
found = true;

break;
}

if (found)
{
var shards = new List<int>
{
shardingConfiguration.BucketRanges[index++].ShardNumber
};
var nextPrefixedRangeStart = prefixBucketRangeStart + ShardHelper.NumberOfBuckets;
for (; index < shardingConfiguration.BucketRanges.Count; index++)
{
var range = shardingConfiguration.BucketRanges[index];
if (range.BucketRangeStart < nextPrefixedRangeStart)
{
shards.Add(range.ShardNumber);
continue;
}

break;
}

foreach (var shard in removedShards)
{
if (shards.Contains(shard))
throw new InvalidOperationException(
$"Cannot remove shard {shard} from '{setting.Prefix}' settings in {nameof(ShardingConfiguration)}.{nameof(ShardingConfiguration.Prefixed)}. " +
$"There are bucket ranges mapped to this shard. " +
"In order to remove a shard from a Prefixed setting, first you need to migrate all its buckets to another shard.");

}
}
}

var cmd = new UpdatePrefixedSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var cmd = new UpdatePrefixedShardingSettingCommand(setting, DatabaseName, GetRaftRequestIdFromQuery());
var (raftIndex, _) = await ServerStore.SendToLeaderAsync(cmd);

await DatabaseContext.ServerStore.WaitForExecutionOnRelevantNodesAsync(context, shardingConfiguration.Orchestrator.Topology.Members, raftIndex);
Expand All @@ -182,6 +122,72 @@ public async Task UpdatePrefixConfiguration()
}
}

private static void AssertValidShardsDistribution(PrefixedShardingSetting oldSetting, PrefixedShardingSetting updatedSetting, ShardingConfiguration configuration)
{
var removedShards = oldSetting.Shards;

foreach (var shard in updatedSetting.Shards)
{
if (oldSetting.Shards.Contains(shard))
removedShards.Remove(shard);
else if (configuration.Shards.ContainsKey(shard) == false)
throw new InvalidDataException($"Cannot assign shard number {shard} to prefix {updatedSetting.Prefix}, " +
$"there's no shard '{shard}' in sharding topology!");
}

if (removedShards.Count <= 0)
return;

// check that there are no bucket ranges mapped to these shards

int index;
bool found = false;
var prefixBucketRangeStart = oldSetting.BucketRangeStart;

for (index = 0; index < configuration.BucketRanges.Count; index++)
{
var range = configuration.BucketRanges[index];
if (range.BucketRangeStart < prefixBucketRangeStart)
continue;

if (range.BucketRangeStart == prefixBucketRangeStart)
found = true;

break;
}

if (found == false)
return;

var shards = new List<int>
{
configuration.BucketRanges[index++].ShardNumber
};

var nextPrefixedRangeStart = prefixBucketRangeStart + ShardHelper.NumberOfBuckets;
for (; index < configuration.BucketRanges.Count; index++)
{
var range = configuration.BucketRanges[index];
if (range.BucketRangeStart < nextPrefixedRangeStart)
{
shards.Add(range.ShardNumber);
continue;
}

break;
}

foreach (var shard in removedShards)
{
if (shards.Contains(shard))
throw new InvalidOperationException(
$"Cannot remove shard {shard} from '{updatedSetting.Prefix}' settings in {nameof(ShardingConfiguration)}.{nameof(ShardingConfiguration.Prefixed)}. " +
$"There are bucket ranges mapped to this shard. " +
"In order to remove a shard from a Prefixed setting, first you need to migrate all its buckets to another shard.");

}
}

private async Task<bool> AssertNoDocumentsStartingWith(TransactionOperationContext context, string prefix, string[] urls, string database = null)
{
using (var requestExecutor = RequestExecutor.CreateForServer(urls, database ?? DatabaseName, ServerStore.Server.Certificate.Certificate, DocumentConventions.DefaultForServer))
Expand Down
2 changes: 1 addition & 1 deletion src/Raven.Server/Rachis/Leader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private void CheckPromotables()
await _engine.TxMerger.Enqueue(rachisMergedCommand); //wait until 'rachisMergedCommand' is executed (until 'rachisMergedCommand.TaskResult' wont be null).

var t = rachisMergedCommand.TaskResult;
if (await t.WaitWithTimeout(timeout.Add(TimeSpan.FromSeconds(45))) == false)
if (await t.WaitWithTimeout(timeout) == false)
{
throw new TimeoutException($"Waited for {timeout} but the command {command.RaftCommandIndex} was not applied in this time.");
}
Expand Down
9 changes: 5 additions & 4 deletions src/Raven.Server/ServerWide/ClusterCommandsVersionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ public sealed class ClusterCommandsVersionManager
[nameof(UpdateQueueSinkCommand)] = 60_000,
[nameof(RemoveQueueSinkProcessStateCommand)] = 60_000,
[nameof(UpdateQueueSinkProcessStateCommand)] = 60_000,

[nameof(EditDataArchivalCommand)] = 60_000,

[nameof(UpdateResponsibleNodeForTasksCommand)] = UpdateResponsibleNodeForTasksCommand.CommandVersion,
[nameof(AddPrefixedSettingCommand)] = 60_001,
[nameof(DeletePrefixedSettingCommand)] = 60_001,
[nameof(UpdatePrefixedSettingCommand)] = 60_001,
[nameof(AddPrefixedShardingSettingCommand)] = 60_001,
[nameof(DeletePrefixedShardingSettingCommand)] = 60_001,
[nameof(UpdatePrefixedShardingSettingCommand)] = 60_001

};

Expand Down
6 changes: 3 additions & 3 deletions src/Raven.Server/ServerWide/ClusterStateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,9 @@ protected override void Apply(ClusterOperationContext context, BlittableJsonRead
case nameof(SourceMigrationSendCompletedCommand):
case nameof(DestinationMigrationConfirmCommand):
case nameof(SourceMigrationCleanupCommand):
case nameof(AddPrefixedSettingCommand):
case nameof(DeletePrefixedSettingCommand):
case nameof(UpdatePrefixedSettingCommand):
case nameof(AddPrefixedShardingSettingCommand):
case nameof(DeletePrefixedShardingSettingCommand):
case nameof(UpdatePrefixedShardingSettingCommand):
UpdateDatabase(context, type, cmd, index, serverStore);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

namespace Raven.Server.ServerWide.Commands.Sharding
{
public sealed class AddPrefixedSettingCommand : UpdateDatabaseCommand
public sealed class AddPrefixedShardingSettingCommand : UpdateDatabaseCommand
{
public PrefixedShardingSetting Setting;

public AddPrefixedSettingCommand()
public AddPrefixedShardingSettingCommand()
{
}

public AddPrefixedSettingCommand(PrefixedShardingSetting setting, string database, string raftId) : base(database, raftId)
public AddPrefixedShardingSettingCommand(PrefixedShardingSetting setting, string database, string raftId) : base(database, raftId)
{
Setting = setting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

namespace Raven.Server.ServerWide.Commands.Sharding
{
public sealed class DeletePrefixedSettingCommand : UpdateDatabaseCommand
public sealed class DeletePrefixedShardingSettingCommand : UpdateDatabaseCommand
{
public PrefixedShardingSetting Prefix;

public DeletePrefixedSettingCommand()
public DeletePrefixedShardingSettingCommand()
{
}

public DeletePrefixedSettingCommand(PrefixedShardingSetting prefix, string database, string raftId) : base(database, raftId)
public DeletePrefixedShardingSettingCommand(PrefixedShardingSetting prefix, string database, string raftId) : base(database, raftId)
{
Prefix = prefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,7 @@ public override void UpdateDatabaseRecord(DatabaseRecord record, long etag)
}
}

if (string.IsNullOrEmpty(Prefix) == false)
{
// prefixed bucket range
var index = record.Sharding.Prefixed.BinarySearch(new PrefixedShardingSetting(Prefix), PrefixedSettingComparer.Instance);
if (index < 0)
throw new RachisApplyException($"Prefix {Prefix} doesn't exists");

var shards = record.Sharding.Prefixed[index].Shards;
if (shards == null || shards.Contains(DestinationShard) == false)
throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists");
}

if (record.Sharding.Shards.ContainsKey(DestinationShard) == false)
throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists");
AssertDestinationShardExists(record.Sharding);

_migration = new ShardBucketMigration
{
Expand Down Expand Up @@ -129,6 +116,24 @@ private void ProcessSubscriptionsForMigration(ClusterOperationContext context, S
}
}

private void AssertDestinationShardExists(ShardingConfiguration shardingConfiguration)
{
if (shardingConfiguration.Shards.ContainsKey(DestinationShard) == false)
throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists");

if (string.IsNullOrEmpty(Prefix))
return;

// prefixed bucket range
var index = shardingConfiguration.Prefixed.BinarySearch(new PrefixedShardingSetting(Prefix), PrefixedSettingComparer.Instance);
if (index < 0)
throw new RachisApplyException($"Prefix {Prefix} doesn't exists");

var shards = shardingConfiguration.Prefixed[index].Shards;
if (shards == null || shards.Contains(DestinationShard) == false)
throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists");
}

public override void FillJson(DynamicJsonValue json)
{
json[nameof(SourceShard)] = SourceShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

namespace Raven.Server.ServerWide.Commands.Sharding
{
public sealed class UpdatePrefixedSettingCommand : UpdateDatabaseCommand
public sealed class UpdatePrefixedShardingSettingCommand : UpdateDatabaseCommand
{
public PrefixedShardingSetting Setting;

public UpdatePrefixedSettingCommand()
public UpdatePrefixedShardingSettingCommand()
{
}

public UpdatePrefixedSettingCommand(PrefixedShardingSetting setting, string database, string raftId) : base(database, raftId)
public UpdatePrefixedShardingSettingCommand(PrefixedShardingSetting setting, string database, string raftId) : base(database, raftId)
{
Setting = setting;
}
Expand Down
Loading

0 comments on commit 501f713

Please sign in to comment.