Skip to content

Commit

Permalink
Got everythign working
Browse files Browse the repository at this point in the history
  • Loading branch information
hamdaankhalidmsft committed Dec 20, 2024
1 parent e750564 commit 003a660
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 70 deletions.
4 changes: 2 additions & 2 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public GarnetStatus SET_Conditional(ref SpanByte key, ref RawStringInput input)
=> storageSession.SET_Conditional(ref key, ref input, ref context);

/// <inheritdoc />
public GarnetStatus SET_Conditional(ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory output, RespCommand cmd)
=> storageSession.SET_Conditional(ref key, ref input, ref output, ref context, cmd);
public GarnetStatus SET_Conditional(ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory output)
=> storageSession.SET_Conditional(ref key, ref input, ref output, ref context);

/// <inheritdoc />
public GarnetStatus SET(ArgSlice key, Memory<byte> value)
Expand Down
4 changes: 0 additions & 4 deletions libs/server/API/GarnetStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,5 @@ public enum GarnetStatus : byte
/// Wrong type
/// </summary>
WRONGTYPE,
/// <summary>
/// ETAG mismatch result for an etag based command
/// </summary>
ETAGMISMATCH,
}
}
2 changes: 1 addition & 1 deletion libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <summary>
/// SET Conditional
/// </summary>
GarnetStatus SET_Conditional(ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory output, RespCommand cmd);
GarnetStatus SET_Conditional(ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory output);

/// <summary>
/// SET
Expand Down
100 changes: 90 additions & 10 deletions libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,96 @@ private bool NetworkGETIFNOTMATCH<TGarnetApi>(ref TGarnetApi storageApi)
private bool NetworkSETIFMATCH<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Debug.Assert(parseState.Count == 3);
if (parseState.Count < 3 || parseState.Count > 5)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.SETIFMATCH));
}

var key = parseState.GetArgSliceByRef(0).SpanByte;
// SETIFMATCH Args: KEY VAL ETAG -> [ ((EX || PX) expiration)]
int expiry = 0;
ReadOnlySpan<byte> errorMessage = default;
var expOption = ExpirationOption.None;

var tokenIdx = 3;
Span<byte> nextOpt = default;
var optUpperCased = false;
while (tokenIdx < parseState.Count || optUpperCased)
{
if (!optUpperCased)
{
nextOpt = parseState.GetArgSliceByRef(tokenIdx++).Span;
}

NetworkSET_Conditional(RespCommand.SETIFMATCH, 0, ref key, getValue: true, highPrecision: false, withEtag: true, ref storageApi);
if (nextOpt.SequenceEqual(CmdStrings.EX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.EX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else if (nextOpt.SequenceEqual(CmdStrings.PX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.PX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else
{
if (!optUpperCased)
{
AsciiUtils.ToUpperInPlace(nextOpt);
optUpperCased = true;
continue;
}

errorMessage = CmdStrings.RESP_ERR_GENERIC_UNK_CMD;
break;
}

optUpperCased = false;
}

if (!errorMessage.IsEmpty)
{
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
SendAndReset();
return true;
}

SpanByte key = parseState.GetArgSliceByRef(0).SpanByte;

NetworkSET_Conditional(RespCommand.SETIFMATCH, expiry, ref key, getValue: true, highPrecision: expOption == ExpirationOption.PX, withEtag: true, ref storageApi);

return true;
}
Expand Down Expand Up @@ -797,7 +882,7 @@ private bool NetworkSET_Conditional<TGarnetApi>(RespCommand cmd, int expiry, ref
// anything with getValue or withEtag may choose to write to the buffer in success scenarios
outputBuffer = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
status = storageApi.SET_Conditional(ref key,
ref input, ref outputBuffer, cmd);
ref input, ref outputBuffer);
}
else
{
Expand All @@ -809,13 +894,8 @@ private bool NetworkSET_Conditional<TGarnetApi>(RespCommand cmd, int expiry, ref

switch ((getValue, withEtag, cmd, status))
{
case (_, _, RespCommand.SETIFMATCH, GarnetStatus.ETAGMISMATCH): // write back mismatch error
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ETAGMISMTACH, ref dcurr, dend))
SendAndReset();
break;

// since SET with etag goes down RMW a not found is okay and data is on buffer
case (_, true, RespCommand.SET, GarnetStatus.NOTFOUND):
case (_, true, RespCommand.SET, GarnetStatus.NOTFOUND):
// if getvalue || etag and Status is OK then the response is always on the buffer, getvalue is never used with conditionals
// extra pattern matching on command below for invariant get value cannot be used with EXXX and EXNX
case (true, _, RespCommand.SET or RespCommand.SETIFMATCH or RespCommand.SETKEEPTTL, GarnetStatus.OK):
Expand Down
78 changes: 46 additions & 32 deletions libs/server/Storage/Functions/MainStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
var cmd = input.header.cmd;
int etagIgnoredOffset = 0;
int etagIgnoredEnd = -1;
long oldEtag = -1;
long oldEtag = Constants.BaseEtag;
if (recordInfo.ETag)
{
etagIgnoredOffset = Constants.EtagSize;
Expand Down Expand Up @@ -322,27 +322,37 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
case RespCommand.SETIFMATCH:
long etagFromClient = input.parseState.GetLong(1);

// No Etag is the same as having an etag of 0
long prevEtag = recordInfo.ETag ? *(long*)value.ToPointer() : 0;

if (prevEtag != etagFromClient)
if (oldEtag != etagFromClient)
{
// Cancelling the operation and returning false is used to indicate ETAGMISMATCH
rmwInfo.Action = RMWAction.CancelOperation;
return false;
// write back array of the format [etag, value]
var valueToWrite = value.AsReadOnlySpan(etagIgnoredOffset);
var digitsInLenOfValue = NumUtils.NumDigitsInLong(valueToWrite.Length);
// *2\r\n: + <numDigitsInEtag> + \r\n + $ + <digitsInLenOfValue> + \r\n + <valueToWrite.Length> + \r\n
var numDigitsInEtag = NumUtils.NumDigitsInLong(oldEtag);
WriteValAndEtagToDst(4 + 1 + numDigitsInEtag + 2 + 1 + digitsInLenOfValue + 2 + valueToWrite.Length + 2, ref valueToWrite, oldEtag, ref output, writeDirect: false);
return true;
}

// Need Copy update if no space for new value
var inputValue = input.parseState.GetArgSliceByRef(0);
if (value.Length < inputValue.length + Constants.EtagSize)

// retain metadata unless metadata sent
int metadataSize = input.arg1 != 0 ? sizeof(long) : value.MetadataSize;

if (value.Length < inputValue.length + Constants.EtagSize + metadataSize)
return false;

if (input.arg1 != 0)
{
value.ExtraMetadata = input.arg1;
}

recordInfo.SetHasETag();
// Increment the ETag
long newEtag = prevEtag + 1;
long newEtag = oldEtag + 1;

// Adjust value length if user shrinks it, how to get rid of spanbyte infront
value.ShrinkSerializedLength(inputValue.Length + Constants.EtagSize);
value.ShrinkSerializedLength(metadataSize + inputValue.Length + Constants.EtagSize);
rmwInfo.SetUsedValueLength(ref recordInfo, ref value, value.TotalSize);
rmwInfo.ClearExtraValueLength(ref recordInfo, ref value, value.TotalSize);

Expand Down Expand Up @@ -379,7 +389,7 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
ArgSlice setValue = input.parseState.GetArgSliceByRef(0);

// Need CU if no space for new value
int metadataSize = input.arg1 == 0 ? 0 : sizeof(long);
metadataSize = input.arg1 == 0 ? 0 : sizeof(long);
if (setValue.Length + metadataSize > value.Length - nextUpdateEtagOffset)
return false;

Expand Down Expand Up @@ -433,7 +443,6 @@ private bool InPlaceUpdaterWorker(ref SpanByte key, ref RawStringInput input, re
// this is the case where we have withetag option and no etag from before
nextUpdateEtagOffset = Constants.EtagSize;
nextUpdateEtagIgnoredEnd = value.LengthWithoutMetadata;
oldEtag = Constants.BaseEtag;
}

setValue = input.parseState.GetArgSliceByRef(0);
Expand Down Expand Up @@ -765,20 +774,24 @@ public bool NeedCopyUpdate(ref SpanByte key, ref RawStringInput input, ref SpanB
long etagToCheckWith = input.parseState.GetLong(1);
// lack of an etag is the same as having a zero'd etag
long existingEtag;
// No Etag is the same as having an etag of 0
// No Etag is the same as having the base etag
if (rmwInfo.RecordInfo.ETag)
{
existingEtag = *(long*)oldValue.ToPointer();
}
else
{
existingEtag = 0;
existingEtag = Constants.BaseEtag;
}

if (existingEtag != etagToCheckWith)
{
// cancellation and return false indicates ETag mismatch
rmwInfo.Action = RMWAction.CancelOperation;
// write back array of the format [etag, value]
var valueToWrite = oldValue.AsReadOnlySpan(etagIgnoredOffset);
var digitsInLenOfValue = NumUtils.NumDigitsInLong(valueToWrite.Length);
// *2\r\n: + <numDigitsInEtag> + \r\n + $ + <digitsInLenOfValue> + \r\n <valueToWrite.Length> + \r\n
var numDigitsInEtag = NumUtils.NumDigitsInLong(existingEtag);
WriteValAndEtagToDst(4 + 1 + numDigitsInEtag + 2 + 1 + digitsInLenOfValue + 2 + valueToWrite.Length + 2, ref valueToWrite, existingEtag, ref output, writeDirect: false);
return false;
}

Expand Down Expand Up @@ -848,7 +861,7 @@ public bool CopyUpdater(ref SpanByte key, ref RawStringInput input, ref SpanByte
bool shouldUpdateEtag = true;
int etagIgnoredOffset = 0;
int etagIgnoredEnd = -1;
long oldEtag = -1;
long oldEtag = Constants.BaseEtag;
if (recordInfo.ETag)
{
etagIgnoredEnd = oldValue.LengthWithoutMetadata;
Expand All @@ -861,24 +874,27 @@ public bool CopyUpdater(ref SpanByte key, ref RawStringInput input, ref SpanByte
case RespCommand.SETIFMATCH:
shouldUpdateEtag = false;

// No Etag is the same as having an etag of 0
if (!recordInfo.ETag)
{
oldEtag = 0;
}

*(long*)newValue.ToPointer() = oldEtag + 1;

// Copy input to value
Span<byte> dest = newValue.AsSpan(Constants.EtagSize);
ReadOnlySpan<byte> src = input.parseState.GetArgSliceByRef(0).ReadOnlySpan;

Debug.Assert(src.Length + Constants.EtagSize + oldValue.MetadataSize == newValue.Length);
// retain metadata unless metadata sent
int metadataSize = input.arg1 != 0 ? sizeof(long) : oldValue.MetadataSize;

Debug.Assert(src.Length + Constants.EtagSize + metadataSize == newValue.Length);

// retain metadata
newValue.ExtraMetadata = oldValue.ExtraMetadata;
src.CopyTo(dest);

newValue.ExtraMetadata = oldValue.ExtraMetadata;
if (input.arg1 != 0)
{
newValue.ExtraMetadata = input.arg1;
}

*(long*)newValue.ToPointer() = oldEtag + 1;

recordInfo.SetHasETag();

// Write Etag and Val back to Client
CopyRespToWithInput(ref input, ref newValue, ref output, isFromPending: false, 0, -1, hasEtagInVal: true);
break;
Expand All @@ -900,7 +916,6 @@ public bool CopyUpdater(ref SpanByte key, ref RawStringInput input, ref SpanByte
// this is the case where we have withetag option and no etag from before
nextUpdateEtagOffset = Constants.EtagSize;
nextUpdateEtagIgnoredEnd = oldValue.LengthWithoutMetadata;
oldEtag = 0;
recordInfo.SetHasETag();
}

Expand All @@ -914,7 +929,7 @@ public bool CopyUpdater(ref SpanByte key, ref RawStringInput input, ref SpanByte

// Copy input to value
var newInputValue = input.parseState.GetArgSliceByRef(0).ReadOnlySpan;
int metadataSize = input.arg1 == 0 ? 0 : sizeof(long);
metadataSize = input.arg1 == 0 ? 0 : sizeof(long);

// new value when allocated should have 8 bytes more if the previous record had etag and the cmd was not SETEXXX
Debug.Assert(newInputValue.Length + metadataSize + nextUpdateEtagOffset == newValue.Length);
Expand Down Expand Up @@ -947,7 +962,6 @@ public bool CopyUpdater(ref SpanByte key, ref RawStringInput input, ref SpanByte
// this is the case where we have withetag option and no etag from before
nextUpdateEtagOffset = Constants.EtagSize;
nextUpdateEtagIgnoredEnd = oldValue.LengthWithoutMetadata;
oldEtag = 0;
recordInfo.SetHasETag();
}

Expand Down
6 changes: 3 additions & 3 deletions libs/server/Storage/Functions/MainStore/VarLenInputMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ public int GetRMWModifiedValueLength(ref SpanByte t, ref RawStringInput input, b
return sizeof(int) + t.LengthWithoutMetadata;
case RespCommand.SETIFMATCH:
var newValue = input.parseState.GetArgSliceByRef(0).ReadOnlySpan;
// always preserves the metadata and includes the etag
return sizeof(int) + newValue.Length + Constants.EtagSize + t.MetadataSize;
int metadataSize = input.arg1 == 0 ? t.MetadataSize : sizeof(long);
return sizeof(int) + newValue.Length + Constants.EtagSize + metadataSize;
case RespCommand.EXPIRE:
case RespCommand.PEXPIRE:
case RespCommand.EXPIREAT:
Expand Down Expand Up @@ -250,7 +250,7 @@ public int GetRMWModifiedValueLength(ref SpanByte t, ref RawStringInput input, b
{
var functions = functionsState.GetCustomCommandFunctions((ushort)cmd);
// compute metadata for result
var metadataSize = input.arg1 switch
metadataSize = input.arg1 switch
{
-1 => 0,
0 => t.MetadataSize,
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Session/MainStore/HyperLogLogOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public unsafe GarnetStatus HyperLogLogMerge(ref RawStringInput input, out bool e
parseState.InitializeWithArgument(mergeSlice);

currInput.parseState = parseState;
SET_Conditional(ref dstKey, ref currInput, ref mergeBuffer, ref currLockableContext, input.header.cmd);
SET_Conditional(ref dstKey, ref currInput, ref mergeBuffer, ref currLockableContext);

#endregion
}
Expand Down
Loading

0 comments on commit 003a660

Please sign in to comment.