Skip to content

Commit

Permalink
Big ole refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hamdaankhalidmsft committed Jan 9, 2025
1 parent bc21fc4 commit adabc24
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 415 deletions.
1 change: 1 addition & 0 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private unsafe void RecoverReplay(long untilAddress)
{
count++;
ProcessAofRecord(entry, length);
this.respServerSession.storageSession.functionsState.etagState.ResetToDefaultVals();
if (count % 100_000 == 0)
logger?.LogInformation("Completed AOF replay of {count} records, until AOF address {nextAofAddress}", count, nextAofAddress);
}
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ internal static class Constants
{
public const byte EtagSize = sizeof(long);

public const int BaseEtag = 0;
public const long BaseEtag = 0;
}
}
169 changes: 0 additions & 169 deletions libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using Garnet.common;
Expand Down Expand Up @@ -311,174 +310,6 @@ private bool NetworkGETSET<TGarnetApi>(ref TGarnetApi storageApi)
return NetworkSETEXNX(ref storageApi);
}

/// <summary>
/// GETWITHETAG key
/// Given a key get the value and ETag
/// </summary>
private bool NetworkGETWITHETAG<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Debug.Assert(parseState.Count == 1);

var key = parseState.GetArgSliceByRef(0).SpanByte;
var input = new RawStringInput(RespCommand.GETWITHETAG, ref parseState, startIdx: 1);
var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
var status = storageApi.GET(ref key, ref input, ref output);

switch (status)
{
case GarnetStatus.NOTFOUND:
Debug.Assert(output.IsSpanByte);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
default:
if (!output.IsSpanByte)
SendAndReset(output.Memory, output.Length);
else
dcurr += output.Length;
break;
}

return true;
}

/// <summary>
/// GETIFNOTMATCH key etag
/// Given a key and an etag, return the value and it's etag only if the sent ETag does not match the existing ETag.
/// </summary>
private bool NetworkGETIFNOTMATCH<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Debug.Assert(parseState.Count == 2);

var key = parseState.GetArgSliceByRef(0).SpanByte;
var input = new RawStringInput(RespCommand.GETIFNOTMATCH, ref parseState, startIdx: 1);
var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
var status = storageApi.GET(ref key, ref input, ref output);

switch (status)
{
case GarnetStatus.NOTFOUND:
Debug.Assert(output.IsSpanByte);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
default:
if (!output.IsSpanByte)
SendAndReset(output.Memory, output.Length);
else
dcurr += output.Length;
break;
}

return true;
}

/// <summary>
/// SETIFMATCH key val etag EX|PX expiry
/// Sets a key value pair only if an already existing etag does not match the etag sent as a part of the request
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool NetworkSETIFMATCH<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 3 || parseState.Count > 5)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.SETIFMATCH));
}

// SETIFMATCH Args: KEY VAL ETAG -> [ ((EX || PX) expiration)]
int expiry = 0;
ReadOnlySpan<byte> errorMessage = default;
var expOption = ExpirationOption.None;

var tokenIdx = 3;
Span<byte> nextOpt = default;
var optUpperCased = false;
while (tokenIdx < parseState.Count || optUpperCased)
{
if (!optUpperCased)
{
nextOpt = parseState.GetArgSliceByRef(tokenIdx++).Span;
}

if (nextOpt.SequenceEqual(CmdStrings.EX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.EX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else if (nextOpt.SequenceEqual(CmdStrings.PX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.PX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else
{
if (!optUpperCased)
{
AsciiUtils.ToUpperInPlace(nextOpt);
optUpperCased = true;
continue;
}

errorMessage = CmdStrings.RESP_ERR_GENERIC_UNK_CMD;
break;
}

optUpperCased = false;
}

if (!errorMessage.IsEmpty)
{
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
SendAndReset();
return true;
}

SpanByte key = parseState.GetArgSliceByRef(0).SpanByte;

NetworkSET_Conditional(RespCommand.SETIFMATCH, expiry, ref key, getValue: true, highPrecision: expOption == ExpirationOption.PX, withEtag: true, ref storageApi);

return true;
}

/// <summary>
/// SETRANGE
/// </summary>
Expand Down
187 changes: 187 additions & 0 deletions libs/server/Resp/BasicEtagCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.server
{
/// <summary>
/// Server session for RESP protocol - ETag associated commands are in this file
/// </summary>
internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
/// <summary>
/// GETWITHETAG key
/// Given a key get the value and it's ETag
/// </summary>
private bool NetworkGETWITHETAG<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Debug.Assert(parseState.Count == 1);

var key = parseState.GetArgSliceByRef(0).SpanByte;
var input = new RawStringInput(RespCommand.GETWITHETAG, ref parseState, startIdx: 1);
var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
var status = storageApi.GET(ref key, ref input, ref output);

switch (status)
{
case GarnetStatus.NOTFOUND:
Debug.Assert(output.IsSpanByte);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
default:
if (!output.IsSpanByte)
SendAndReset(output.Memory, output.Length);
else
dcurr += output.Length;
break;
}

return true;
}

/// <summary>
/// GETIFNOTMATCH key etag
/// Given a key and an etag, return the value and it's etag.
/// </summary>
private bool NetworkGETIFNOTMATCH<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Debug.Assert(parseState.Count == 2);

var key = parseState.GetArgSliceByRef(0).SpanByte;
var input = new RawStringInput(RespCommand.GETIFNOTMATCH, ref parseState, startIdx: 1);
var output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
var status = storageApi.GET(ref key, ref input, ref output);

switch (status)
{
case GarnetStatus.NOTFOUND:
Debug.Assert(output.IsSpanByte);
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();
break;
default:
if (!output.IsSpanByte)
SendAndReset(output.Memory, output.Length);
else
dcurr += output.Length;
break;
}

return true;
}

/// <summary>
/// SETIFMATCH key val etag EX|PX expiry
/// Sets a key value pair only if the already existing etag matches the etag sent as a part of the request.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool NetworkSETIFMATCH<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 3 || parseState.Count > 5)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.SETIFMATCH));
}

// SETIFMATCH Args: KEY VAL ETAG -> [ ((EX || PX) expiration)]
int expiry = 0;
ReadOnlySpan<byte> errorMessage = default;
var expOption = ExpirationOption.None;

var tokenIdx = 3;
Span<byte> nextOpt = default;
var optUpperCased = false;
while (tokenIdx < parseState.Count || optUpperCased)
{
if (!optUpperCased)
{
nextOpt = parseState.GetArgSliceByRef(tokenIdx++).Span;
}

if (nextOpt.SequenceEqual(CmdStrings.EX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.EX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else if (nextOpt.SequenceEqual(CmdStrings.PX))
{
// Validate expiry
if (!parseState.TryGetInt(tokenIdx++, out expiry))
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER;
break;
}

if (expOption != ExpirationOption.None)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
break;
}

expOption = ExpirationOption.PX;
if (expiry <= 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_INVALIDEXP_IN_SET;
break;
}
}
else
{
if (!optUpperCased)
{
AsciiUtils.ToUpperInPlace(nextOpt);
optUpperCased = true;
continue;
}

errorMessage = CmdStrings.RESP_ERR_GENERIC_UNK_CMD;
break;
}

optUpperCased = false;
}

if (!errorMessage.IsEmpty)
{
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
SendAndReset();
return true;
}

SpanByte key = parseState.GetArgSliceByRef(0).SpanByte;

NetworkSET_Conditional(RespCommand.SETIFMATCH, expiry, ref key, getValue: true, highPrecision: expOption == ExpirationOption.PX, withEtag: true, ref storageApi);

return true;
}
}
}
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ private void ProcessMessages()

while (bytesRead - readHead >= 4)
{
storageSession.functionsState.etagState.ResetToDefaultVals();
// First, parse the command, making sure we have the entire command available
// We use endReadHead to track the end of the current command
// On success, readHead is left at the start of the command payload for legacy operators
Expand Down
Loading

0 comments on commit adabc24

Please sign in to comment.