Skip to content

Commit

Permalink
tmp - GeoAdd broken
Browse files Browse the repository at this point in the history
  • Loading branch information
TalZaccai committed May 23, 2024
1 parent dcac12e commit 7faeea6
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 199 deletions.
2 changes: 1 addition & 1 deletion libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
SortedSetRangeByScore(_input, input.Length, ref output);
break;
case SortedSetOperation.GEOADD:
GeoAdd(_input, input.Length, _output);
GeoAdd(_input, input.Length, ref output);
break;
case SortedSetOperation.GEOHASH:
GeoHash(_input, input.Length, ref output);
Expand Down
201 changes: 147 additions & 54 deletions libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Buffers.Text;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
Expand Down Expand Up @@ -46,88 +47,180 @@ private struct GeoSearchOptions
public bool WithHash { get; set; }
}

private void GeoAdd(byte* input, int length, byte* output)
private void GeoAdd(byte* input, int length, ref SpanByteAndMemory output)
{
var _input = (ObjectInputHeader*)input;
var _output = (ObjectOutputHeader*)output;
*_output = default;
ObjectOutputHeader _output = default;

int count = _input->count;

byte* input_startptr = input + sizeof(ObjectInputHeader);
byte* input_currptr = input_startptr;

// By default add new elements but do not update the ones already in the set
bool nx = true;
bool isMemory = false;
MemoryHandle ptrHandle = default;
byte* ptr = output.SpanByte.ToPointer();

bool ch = false;
var curr = ptr;
var end = curr + output.Length;

// Read the options
var optsCount = count % 3;
if (optsCount > 0 && optsCount <= 2)
try
{
// Is NX or XX, if not nx then use XX
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var byteOptions, ref input_currptr, input + length))
return;
nx = (byteOptions.Length == 2 && (byteOptions[0] == (int)'N' && byteOptions[1] == (int)'X') || (byteOptions[0] == (int)'n' && byteOptions[1] == (int)'x'));
if (optsCount == 2)
// By default add new elements but do not update the ones already in the set
var nx = false;
var xx = false;
var ch = false;

byte* tokenPtr = null;
var tokenSize = 0;
Span<byte> tokenSpan = default;

// Read the options
while (count > 0)
{
// Read CH option
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out byteOptions, ref input_currptr, input + length))
if (!RespReadUtils.ReadPtrWithLengthHeader(ref tokenPtr, ref tokenSize, ref input_currptr,
input + length))
return;
ch = (byteOptions.Length == 2 && (byteOptions[0] == (int)'C' && byteOptions[1] == (int)'H') || (byteOptions[0] == (int)'c' && byteOptions[1] == (int)'h'));

count--;
tokenSpan = new Span<byte>(tokenPtr, tokenSize);
if (tokenSpan.SequenceEqual("NX"u8))
{
nx = true;
}
else if (tokenSpan.SequenceEqual("XX"u8))
{
xx = true;
}
else if (tokenSpan.SequenceEqual("CH"u8))
{
ch = true;
}
else
{
break;
}
}
count -= optsCount;
}

int elementsChanged = 0;
ReadOnlySpan<byte> errorMessage = default;
// No members defined
if (count == 0)
{
errorMessage = Encoding.ASCII.GetBytes(string.Format(CmdStrings.GenericErrWrongNumArgs, nameof(SortedSetOperation.GEOADD)));
}
// NX & XX can't both be set
// Also, each member definition should contain 3 tokens - longitude latitude member
// Remaining token count should be a multiple of 3
else if ((nx && xx) || (count + 1) % 3 != 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
}
else
{
var elementsChanged = 0;

for (int c = 0; c < count / 3; c++)
{
if (!RespReadUtils.ReadDoubleWithLengthHeader(out var longitude, out var parsed, ref input_currptr, input + length))
return;
if (!RespReadUtils.ReadDoubleWithLengthHeader(out var latitude, out parsed, ref input_currptr, input + length))
return;
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var member, ref input_currptr, input + length))
return;
var memberCount = (count + 1) / 3;
for (var c = 0; c < memberCount; c++)
{
double longitude = default;
var longParsed = false;
// If this is the first member, use last token parsed
if (c == 0)
{
longParsed = Utf8Parser.TryParse(tokenSpan, out longitude, out _);
}
else
{
if (!RespReadUtils.ReadDoubleWithLengthHeader(out longitude, out longParsed, ref input_currptr,
input + length))
return;
count--;
}

if (c < _input->done)
continue;
if (!RespReadUtils.ReadDoubleWithLengthHeader(out var latitude, out var latParsed, ref input_currptr,
input + length))
return;
count--;

_output->countDone++;
if (!longParsed || !latParsed)
{
errorMessage = CmdStrings.RESP_ERR_NOT_VALID_FLOAT;
break;
}

if (parsed)
{
var score = server.GeoHash.GeoToLongValue(latitude, longitude);
if (score != -1)
{
if (!sortedSetDict.TryGetValue(member, out double scoreStored))
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var member, ref input_currptr,
input + length))
return;
count--;

if (c < _input->done)
continue;

var score = server.GeoHash.GeoToLongValue(latitude, longitude);
if (score != -1)
{
if (nx)
if (!sortedSetDict.TryGetValue(member, out var scoreStored))
{
sortedSetDict.Add(member, score);
sortedSet.Add((score, member));
_output->opsDone++;
if (!xx)
{
sortedSetDict.Add(member, score);
sortedSet.Add((score, member));
_output.opsDone++;

this.UpdateSize(member);
this.UpdateSize(member);
elementsChanged++;
}
}
else if (!nx && Math.Abs(scoreStored - score) > double.Epsilon)
{
sortedSetDict[member] = score;
var success = sortedSet.Remove((scoreStored, member));
Debug.Assert(success);
success = sortedSet.Add((score, member));
Debug.Assert(success);
elementsChanged++;
}
}
else if (!nx && scoreStored != score)
{
sortedSetDict[member] = score;
var success = sortedSet.Remove((scoreStored, member));
Debug.Assert(success);
success = sortedSet.Add((score, member));
Debug.Assert(success);
elementsChanged++;
}
}

_output.opsDone = ch ? elementsChanged : _output.opsDone;
if (elementsChanged == 0)
{

}
}
_output->opsDone = ch ? elementsChanged : _output->opsDone;

// Flush unread tokens
while (count > 0)
{
RespReadUtils.ReadPtrWithLengthHeader(ref tokenPtr, ref tokenSize, ref input_currptr,
input + length);
count--;
}

if (errorMessage != default)
{
while (!RespWriteUtils.WriteError(errorMessage, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}

if (_output.opsDone == 0)
{

}

// Write output
_output.bytesDone = (int)(input_currptr - input_startptr);
_output.countDone = _input->count - count;
}
finally
{
while (!RespWriteUtils.WriteDirect(ref _output, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

// Write output
_output->bytesDone = (int)(input_currptr - input_startptr);
if (isMemory) ptrHandle.Dispose();
output.Length = (int)(curr - ptr);
}
}

private void GeoHash(byte* input, int length, ref SpanByteAndMemory output)
Expand Down
9 changes: 1 addition & 8 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -861,15 +861,8 @@ private bool NetworkTYPE<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storag
private bool NetworkMODULE<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count != 1)
return AbortWithWrongNumberOfArguments("MODULE", count);

// MODULE nameofmodule
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var nameofmodule, ref ptr, recvBufferPtr + bytesRead))
return false;

// TODO: pending implementation for module support.
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERR_GENERIC_UNK_CMD, ref dcurr, dend))
SendAndReset();

// Advance pointers
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_RETURN_VAL_1 => ":1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_0 => ":0\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N1 => ":-1\r\n"u8;
public static ReadOnlySpan<byte> RESP_RETURN_VAL_N2 => ":-2\r\n"u8;
public static ReadOnlySpan<byte> SUSCRIBE_PONG => "*2\r\n$4\r\npong\r\n$0\r\n\r\n"u8;
public static ReadOnlySpan<byte> RESP_PONG => "+PONG\r\n"u8;
public static ReadOnlySpan<byte> RESP_EMPTY => "$0\r\n\r\n"u8;
Expand Down Expand Up @@ -123,6 +124,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_INVALID_INDEX => "ERR invalid database index."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SELECT_CLUSTER_MODE => "ERR SELECT is not allowed in cluster mode"u8;
public static ReadOnlySpan<byte> RESP_ERR_UNSUPPORTED_PROTOCOL_VERSION => "ERR Unsupported protocol version"u8;
public static ReadOnlySpan<byte> RESP_ERR_NOT_VALID_FLOAT => "ERR value is not a valid float"u8;
public static ReadOnlySpan<byte> RESP_WRONGPASS_INVALID_PASSWORD => "WRONGPASS Invalid password"u8;
public static ReadOnlySpan<byte> RESP_WRONGPASS_INVALID_USERNAME_PASSWORD => "WRONGPASS Invalid username/password combination"u8;

Expand All @@ -133,6 +135,8 @@ static partial class CmdStrings
public const string GenericErrWrongNumArgs = "ERR wrong number of arguments for '{0}' command";
public const string GenericErrUnknownOption = "ERR Unknown option or number of arguments for CONFIG SET - '{0}'";
public const string GenericErrUnknownSubCommand = "ERR unknown subcommand '{0}'. Try {1} HELP";
public const string GenericErrWrongNumArgsTxn =
"ERR Invalid number of parameters to stored proc {0}, expected {1}, actual {2}";

/// <summary>
/// Object types
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/KeyAdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private bool NetworkTTL<TGarnetApi>(byte* ptr, RespCommand command, ref TGarnetA
}
else
{
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N1, ref dcurr, dend))
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_RETURN_VAL_N2, ref dcurr, dend))
SendAndReset();
}
return true;
Expand Down
8 changes: 8 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,16 @@ private unsafe bool SetRandomMember<TGarnetApi>(int count, byte* ptr, ref TGarne
return false;
break;
case GarnetStatus.NOTFOUND:
if (count == 2)
{
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();
break;
}

while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_ERRNOTFOUND, ref dcurr, dend))
SendAndReset();

break;
}

Expand Down
Loading

0 comments on commit 7faeea6

Please sign in to comment.