Skip to content

Commit

Permalink
Improvements in command handling #4 (#549)
Browse files Browse the repository at this point in the history
* partial checkin

* nit

* nit

* idea for initial spike for making input a struct type
does not work

* support ZADD and ZREM using safe struct wrappers for input

* nit

* wip

* wip

* Fixing non-determinism + refactoring HashGet

* dotnet format

* Fixed some API calls

* dotnet format

* wip

* small fix

* Removing unused method

* wip - refactoring ProcessAdminCommands

* Undoing changes to RandomUtils

* Continued refactoring of AdminCommands

* Added "TryGetInt" and "TryGetLong" to parse state api

* dotnet format

* wip

* format

* wip

* wip

* wip

* cleanup

* wip

* wip

* format

* wip

* Fixing build

* cont

* bugfix

* Fixing build

* wip

* wip

* bugfix

* Few small fixes

* format

* bugfix

* Added range validation to SessionParseState read methods

* wip

* merging from latest main

* format

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* clean up

* Fixed object store PERSIST + added missing test

* wip - adding parseState to ObjectInput & implementing RMW AOF recovery, using parseState in ZADD implementation

* Cleaned up ObjectInput and added XML comments

* wip

* wip

* wip

* remove unnecessary serialization during upsert

* Addressing comments

* wip

* wip

* wip

* wip

* wip

* wip

* bugfix

* wip

* wip

* format

* format

* Using ObjectInput in custom object commands

* wip

* some cleanup

* safe parsing for cluster commands

* format

* Fixing build errors

* Test fix

* Fixing merge

* Fixing broken test

* some small fixes

---------

Co-authored-by: Badrish Chandramouli <[email protected]>
  • Loading branch information
TalZaccai and badrishc authored Aug 23, 2024
1 parent 23c4e3e commit af67519
Show file tree
Hide file tree
Showing 73 changed files with 2,639 additions and 3,080 deletions.
5 changes: 5 additions & 0 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_WORKERS_NOT_INITIALIZED => "ERR workers not initialized"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_CONFIG_EPOCH_NOT_SET => "ERR Node config epoch was not set due to invalid epoch specified"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_NOT_IN_IMPORTING_STATE => "ERR Node not in IMPORTING state"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_SLOT => "ERR Invalid or out of range slot"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_BOOLEAN => "ERR value is not a boolean."u8;

/// <summary>
/// Generic error response strings for <c>MIGRATE</c> command
Expand All @@ -73,5 +76,7 @@ static class CmdStrings
/// Response string templates
/// </summary>
public const string GenericErrWrongNumArgs = "ERR wrong number of arguments for '{0}' command";

public const string GenericErrInvalidPort = "ERR Invalid TCP base port specified: {0}";
}
}
160 changes: 67 additions & 93 deletions libs/cluster/Session/ClusterCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private List<byte[]> GetKeysInSlot(int slot, int keyCount)
/// <summary>
/// Try to parse slots
/// </summary>
/// <param name="startIdx"></param>
/// <param name="slots"></param>
/// <param name="errorMessage">
/// The ASCII encoded error message if there one of the following conditions is true
/// <list type="bullet">
Expand All @@ -61,63 +63,62 @@ private List<byte[]> GetKeysInSlot(int slot, int keyCount)
/// </list>
/// otherwise <see langword="default" />
/// </param>
/// <param name="range"></param>
/// <returns>A boolean indicating that there was error in parsing of the arguments.</returns>
/// <remarks>
/// The error handling is little special for this method because we need to drain all arguments even in the case of error.
/// <para/>
/// The <paramref name="errorMessage"/> will only have a generic error message set in the event of duplicate or out of range slot.
/// The method will still return <see langword="true" /> in case of such error.
/// </remarks>
private bool TryParseSlots(int count, ref byte* ptr, out HashSet<int> slots, out ReadOnlySpan<byte> errorMessage, bool range)
private bool TryParseSlots(int startIdx, out HashSet<int> slots, out ReadOnlySpan<byte> errorMessage, bool range)
{
slots = [];
errorMessage = default;
var duplicate = false;
var outOfRange = false;
var invalidRange = false;
int slotStart;
int slotEnd;

while (count > 0)
var currTokenIdx = startIdx;
while (currTokenIdx < parseState.Count)
{
int slotStart;
int slotEnd;
if (range)
{
if (!RespReadUtils.ReadIntWithLengthHeader(out slotStart, ref ptr, recvBufferPtr + bytesRead))
return false;
if (!RespReadUtils.ReadIntWithLengthHeader(out slotEnd, ref ptr, recvBufferPtr + bytesRead))
if (!parseState.TryGetInt(currTokenIdx++, out slotStart) ||
!parseState.TryGetInt(currTokenIdx++, out slotEnd))
{
errorMessage = CmdStrings.RESP_ERR_INVALID_SLOT;
return false;
count -= 2;
}
}
else
{
if (!RespReadUtils.ReadIntWithLengthHeader(out slotStart, ref ptr, recvBufferPtr + bytesRead))
if (!parseState.TryGetInt(currTokenIdx++, out slotStart))
{
errorMessage = CmdStrings.RESP_ERR_INVALID_SLOT;
return false;
count--;
}

slotEnd = slotStart;
}

if (duplicate || outOfRange || invalidRange)
continue;

if (slotStart > slotEnd)
{
errorMessage = Encoding.ASCII.GetBytes($"ERR Invalid range {slotStart} > {slotEnd}!");
invalidRange = true;
continue;
return false;
}

if (ClusterConfig.OutOfRange(slotStart) || ClusterConfig.OutOfRange(slotEnd))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SLOT_OUT_OFF_RANGE;
outOfRange = true;
return false;
}

for (var slot = slotStart; slot <= slotEnd && !duplicate; slot++)
for (var slot = slotStart; slot <= slotEnd; slot++)
{
if (!slots.Add(slot))
{
errorMessage = Encoding.ASCII.GetBytes($"ERR Slot {slot} specified multiple times");
duplicate = true;
return false;
}
}
}
Expand All @@ -129,81 +130,54 @@ private bool TryParseSlots(int count, ref byte* ptr, out HashSet<int> slots, out
/// Handle cluster subcommands.
/// </summary>
/// <param name="command">Subcommand to execute.</param>
/// <param name="count">Number of parameters in teh command buffer</param>
/// <param name="invalidParameters">True if number of parameters is invalid</param>
/// <returns>True if command is fully processed, false if more processing is needed.</returns>
private bool ProcessClusterCommands(RespCommand command, int count)
private void ProcessClusterCommands(RespCommand command, out bool invalidParameters)
{
bool result;
bool invalidParameters;

result =
command switch
{
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(count, out invalidParameters),
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_AOFSYNC => NetworkClusterAOFSync(count, out invalidParameters),
RespCommand.CLUSTER_APPENDLOG => NetworkClusterAppendLog(count, out invalidParameters),
RespCommand.CLUSTER_BANLIST => NetworkClusterBanList(count, out invalidParameters),
RespCommand.CLUSTER_BEGIN_REPLICA_RECOVER => NetworkClusterBeginReplicaRecover(count, out invalidParameters),
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(count, out invalidParameters),
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(count, out invalidParameters),
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_ENDPOINT => NetworkClusterEndpoint(count, out invalidParameters),
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(count, out invalidParameters),
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(count, out invalidParameters),
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(count, out invalidParameters),
RespCommand.CLUSTER_FORGET => NetworkClusterForget(count, out invalidParameters),
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(count, out invalidParameters),
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(count, out invalidParameters),
RespCommand.CLUSTER_HELP => NetworkClusterHelp(count, out invalidParameters),
RespCommand.CLUSTER_INFO => NetworkClusterInfo(count, out invalidParameters),
RespCommand.CLUSTER_INITIATE_REPLICA_SYNC => NetworkClusterInitiateReplicaSync(count, out invalidParameters),
RespCommand.CLUSTER_KEYSLOT => NetworkClusterKeySlot(count, out invalidParameters),
RespCommand.CLUSTER_MEET => NetworkClusterMeet(count, out invalidParameters),
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(count, out invalidParameters),
RespCommand.CLUSTER_MTASKS => NetworkClusterMTasks(count, out invalidParameters),
RespCommand.CLUSTER_MYID => NetworkClusterMyId(count, out invalidParameters),
RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(count, out invalidParameters),
RespCommand.CLUSTER_NODES => NetworkClusterNodes(count, out invalidParameters),
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(count, out invalidParameters),
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(count, out invalidParameters),
RespCommand.CLUSTER_RESET => NetworkClusterReset(count, out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_FILE_SEGMENT => NetworkClusterSendCheckpointFileSegment(count, out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_METADATA => NetworkClusterSendCheckpointMetadata(count, out invalidParameters),
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(count, out invalidParameters),
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(count, out invalidParameters),
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(count, out invalidParameters),
RespCommand.CLUSTER_SHARDS => NetworkClusterShards(count, out invalidParameters),
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(count, out invalidParameters),
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(count, out invalidParameters),
_ => throw new Exception($"Unexpected cluster subcommand: {command}")
};

if (invalidParameters)
_ = command switch
{
if (!DrainCommands(count))
return false;

// Have to lookup the RESP name now that we're in the failure case
string subCommand;
if (RespCommandsInfo.TryGetRespCommandInfo(command, out var info))
{
subCommand = info.Name.ToLowerInvariant();
}
else
{
subCommand = "unknown";
}

var errorMsg = string.Format(CmdStrings.GenericErrWrongNumArgs, subCommand);
while (!RespWriteUtils.WriteError(errorMsg, ref dcurr, dend))
SendAndReset();
}

return result;
RespCommand.CLUSTER_ADDSLOTS => NetworkClusterAddSlots(out invalidParameters),
RespCommand.CLUSTER_ADDSLOTSRANGE => NetworkClusterAddSlotsRange(out invalidParameters),
RespCommand.CLUSTER_AOFSYNC => NetworkClusterAOFSync(out invalidParameters),
RespCommand.CLUSTER_APPENDLOG => NetworkClusterAppendLog(out invalidParameters),
RespCommand.CLUSTER_BANLIST => NetworkClusterBanList(out invalidParameters),
RespCommand.CLUSTER_BEGIN_REPLICA_RECOVER => NetworkClusterBeginReplicaRecover(out invalidParameters),
RespCommand.CLUSTER_BUMPEPOCH => NetworkClusterBumpEpoch(out invalidParameters),
RespCommand.CLUSTER_COUNTKEYSINSLOT => NetworkClusterCountKeysInSlot(out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOT => NetworkClusterDelKeysInSlot(out invalidParameters),
RespCommand.CLUSTER_DELKEYSINSLOTRANGE => NetworkClusterDelKeysInSlotRange(out invalidParameters),
RespCommand.CLUSTER_DELSLOTS => NetworkClusterDelSlots(out invalidParameters),
RespCommand.CLUSTER_DELSLOTSRANGE => NetworkClusterDelSlotsRange(out invalidParameters),
RespCommand.CLUSTER_ENDPOINT => NetworkClusterEndpoint(out invalidParameters),
RespCommand.CLUSTER_FAILOVER => NetworkClusterFailover(out invalidParameters),
RespCommand.CLUSTER_FAILREPLICATIONOFFSET => NetworkClusterFailReplicationOffset(out invalidParameters),
RespCommand.CLUSTER_FAILSTOPWRITES => NetworkClusterFailStopWrites(out invalidParameters),
RespCommand.CLUSTER_FORGET => NetworkClusterForget(out invalidParameters),
RespCommand.CLUSTER_GOSSIP => NetworkClusterGossip(out invalidParameters),
RespCommand.CLUSTER_GETKEYSINSLOT => NetworkClusterGetKeysInSlot(out invalidParameters),
RespCommand.CLUSTER_HELP => NetworkClusterHelp(out invalidParameters),
RespCommand.CLUSTER_INFO => NetworkClusterInfo(out invalidParameters),
RespCommand.CLUSTER_INITIATE_REPLICA_SYNC => NetworkClusterInitiateReplicaSync(out invalidParameters),
RespCommand.CLUSTER_KEYSLOT => NetworkClusterKeySlot(out invalidParameters),
RespCommand.CLUSTER_MEET => NetworkClusterMeet(out invalidParameters),
RespCommand.CLUSTER_MIGRATE => NetworkClusterMigrate(out invalidParameters),
RespCommand.CLUSTER_MTASKS => NetworkClusterMTasks(out invalidParameters),
RespCommand.CLUSTER_MYID => NetworkClusterMyId(out invalidParameters),
RespCommand.CLUSTER_MYPARENTID => NetworkClusterMyParentId(out invalidParameters),
RespCommand.CLUSTER_NODES => NetworkClusterNodes(out invalidParameters),
RespCommand.CLUSTER_REPLICAS => NetworkClusterReplicas(out invalidParameters),
RespCommand.CLUSTER_REPLICATE => NetworkClusterReplicate(out invalidParameters),
RespCommand.CLUSTER_RESET => NetworkClusterReset(out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_FILE_SEGMENT => NetworkClusterSendCheckpointFileSegment(out invalidParameters),
RespCommand.CLUSTER_SEND_CKPT_METADATA => NetworkClusterSendCheckpointMetadata(out invalidParameters),
RespCommand.CLUSTER_SETCONFIGEPOCH => NetworkClusterSetConfigEpoch(out invalidParameters),
RespCommand.CLUSTER_SETSLOT => NetworkClusterSetSlot(out invalidParameters),
RespCommand.CLUSTER_SETSLOTSRANGE => NetworkClusterSetSlotsRange(out invalidParameters),
RespCommand.CLUSTER_SHARDS => NetworkClusterShards(out invalidParameters),
RespCommand.CLUSTER_SLOTS => NetworkClusterSlots(out invalidParameters),
RespCommand.CLUSTER_SLOTSTATE => NetworkClusterSlotState(out invalidParameters),
_ => throw new Exception($"Unexpected cluster subcommand: {command}")
};
}
}
}
Loading

0 comments on commit af67519

Please sign in to comment.