Skip to content

Commit

Permalink
reverting changes to GeoAdd
Browse files Browse the repository at this point in the history
  • Loading branch information
TalZaccai committed May 23, 2024
1 parent 7faeea6 commit 5caeb13
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 148 deletions.
2 changes: 1 addition & 1 deletion libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
SortedSetRangeByScore(_input, input.Length, ref output);
break;
case SortedSetOperation.GEOADD:
GeoAdd(_input, input.Length, ref output);
GeoAdd(_input, input.Length, _output);
break;
case SortedSetOperation.GEOHASH:
GeoHash(_input, input.Length, ref output);
Expand Down
201 changes: 54 additions & 147 deletions libs/server/Objects/SortedSetGeo/SortedSetGeoObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Buffers;
using System.Buffers.Text;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
Expand Down Expand Up @@ -47,180 +46,88 @@ private struct GeoSearchOptions
public bool WithHash { get; set; }
}

private void GeoAdd(byte* input, int length, ref SpanByteAndMemory output)
private void GeoAdd(byte* input, int length, byte* output)
{
var _input = (ObjectInputHeader*)input;
ObjectOutputHeader _output = default;
var _output = (ObjectOutputHeader*)output;
*_output = default;

int count = _input->count;

byte* input_startptr = input + sizeof(ObjectInputHeader);
byte* input_currptr = input_startptr;

bool isMemory = false;
MemoryHandle ptrHandle = default;
byte* ptr = output.SpanByte.ToPointer();
// By default add new elements but do not update the ones already in the set
bool nx = true;

var curr = ptr;
var end = curr + output.Length;
bool ch = false;

try
// Read the options
var optsCount = count % 3;
if (optsCount > 0 && optsCount <= 2)
{
// By default add new elements but do not update the ones already in the set
var nx = false;
var xx = false;
var ch = false;

byte* tokenPtr = null;
var tokenSize = 0;
Span<byte> tokenSpan = default;

// Read the options
while (count > 0)
// Is NX or XX, if not nx then use XX
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var byteOptions, ref input_currptr, input + length))
return;
nx = (byteOptions.Length == 2 && (byteOptions[0] == (int)'N' && byteOptions[1] == (int)'X') || (byteOptions[0] == (int)'n' && byteOptions[1] == (int)'x'));
if (optsCount == 2)
{
if (!RespReadUtils.ReadPtrWithLengthHeader(ref tokenPtr, ref tokenSize, ref input_currptr,
input + length))
// Read CH option
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out byteOptions, ref input_currptr, input + length))
return;

count--;
tokenSpan = new Span<byte>(tokenPtr, tokenSize);
if (tokenSpan.SequenceEqual("NX"u8))
{
nx = true;
}
else if (tokenSpan.SequenceEqual("XX"u8))
{
xx = true;
}
else if (tokenSpan.SequenceEqual("CH"u8))
{
ch = true;
}
else
{
break;
}
}

ReadOnlySpan<byte> errorMessage = default;
// No members defined
if (count == 0)
{
errorMessage = Encoding.ASCII.GetBytes(string.Format(CmdStrings.GenericErrWrongNumArgs, nameof(SortedSetOperation.GEOADD)));
ch = (byteOptions.Length == 2 && (byteOptions[0] == (int)'C' && byteOptions[1] == (int)'H') || (byteOptions[0] == (int)'c' && byteOptions[1] == (int)'h'));
}
// NX & XX can't both be set
// Also, each member definition should contain 3 tokens - longitude latitude member
// Remaining token count should be a multiple of 3
else if ((nx && xx) || (count + 1) % 3 != 0)
{
errorMessage = CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR;
}
else
{
var elementsChanged = 0;

var memberCount = (count + 1) / 3;
for (var c = 0; c < memberCount; c++)
{
double longitude = default;
var longParsed = false;
// If this is the first member, use last token parsed
if (c == 0)
{
longParsed = Utf8Parser.TryParse(tokenSpan, out longitude, out _);
}
else
{
if (!RespReadUtils.ReadDoubleWithLengthHeader(out longitude, out longParsed, ref input_currptr,
input + length))
return;
count--;
}
count -= optsCount;
}

if (!RespReadUtils.ReadDoubleWithLengthHeader(out var latitude, out var latParsed, ref input_currptr,
input + length))
return;
count--;
int elementsChanged = 0;

if (!longParsed || !latParsed)
{
errorMessage = CmdStrings.RESP_ERR_NOT_VALID_FLOAT;
break;
}
for (int c = 0; c < count / 3; c++)
{
if (!RespReadUtils.ReadDoubleWithLengthHeader(out var longitude, out var parsed, ref input_currptr, input + length))
return;
if (!RespReadUtils.ReadDoubleWithLengthHeader(out var latitude, out parsed, ref input_currptr, input + length))
return;
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var member, ref input_currptr, input + length))
return;

if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var member, ref input_currptr,
input + length))
return;
count--;
if (c < _input->done)
continue;

if (c < _input->done)
continue;
_output->countDone++;

var score = server.GeoHash.GeoToLongValue(latitude, longitude);
if (score != -1)
if (parsed)
{
var score = server.GeoHash.GeoToLongValue(latitude, longitude);
if (score != -1)
{
if (!sortedSetDict.TryGetValue(member, out double scoreStored))
{
if (!sortedSetDict.TryGetValue(member, out var scoreStored))
if (nx)
{
if (!xx)
{
sortedSetDict.Add(member, score);
sortedSet.Add((score, member));
_output.opsDone++;
sortedSetDict.Add(member, score);
sortedSet.Add((score, member));
_output->opsDone++;

this.UpdateSize(member);
elementsChanged++;
}
}
else if (!nx && Math.Abs(scoreStored - score) > double.Epsilon)
{
sortedSetDict[member] = score;
var success = sortedSet.Remove((scoreStored, member));
Debug.Assert(success);
success = sortedSet.Add((score, member));
Debug.Assert(success);
elementsChanged++;
this.UpdateSize(member);
}
}
}

_output.opsDone = ch ? elementsChanged : _output.opsDone;
if (elementsChanged == 0)
{

else if (!nx && scoreStored != score)
{
sortedSetDict[member] = score;
var success = sortedSet.Remove((scoreStored, member));
Debug.Assert(success);
success = sortedSet.Add((score, member));
Debug.Assert(success);
elementsChanged++;
}
}
}

// Flush unread tokens
while (count > 0)
{
RespReadUtils.ReadPtrWithLengthHeader(ref tokenPtr, ref tokenSize, ref input_currptr,
input + length);
count--;
}

if (errorMessage != default)
{
while (!RespWriteUtils.WriteError(errorMessage, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}

if (_output.opsDone == 0)
{

}

// Write output
_output.bytesDone = (int)(input_currptr - input_startptr);
_output.countDone = _input->count - count;
_output->opsDone = ch ? elementsChanged : _output->opsDone;
}
finally
{
while (!RespWriteUtils.WriteDirect(ref _output, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

if (isMemory) ptrHandle.Dispose();
output.Length = (int)(curr - ptr);
}
// Write output
_output->bytesDone = (int)(input_currptr - input_startptr);
}

private void GeoHash(byte* input, int length, ref SpanByteAndMemory output)
Expand Down

0 comments on commit 5caeb13

Please sign in to comment.