Skip to content

Commit

Permalink
[Compatibility] Added BLMPOP command (#842)
Browse files Browse the repository at this point in the history
* Added BLMPOP command

* Code format fix

* Fixed review commants

* Removed commented code

---------

Co-authored-by: Vasileios Zois <[email protected]>
  • Loading branch information
Vijay-Nirmal and vazois authored Dec 10, 2024
1 parent 4ebe239 commit cf2f89e
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 21 deletions.
58 changes: 58 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,64 @@
}
]
},
{
"Command": "BLMPOP",
"Name": "BLMPOP",
"Summary": "Pops the first element from one of multiple lists. Blocks until an element is available otherwise. Deletes the list if the last element was popped.",
"Group": "List",
"Complexity": "O(N\u002BM) where N is the number of provided keys and M is the number of elements returned.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "Double"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "NUMKEYS",
"DisplayText": "numkeys",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "WHERE",
"Type": "OneOf",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "LEFT",
"DisplayText": "left",
"Type": "PureToken",
"Token": "LEFT"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "RIGHT",
"DisplayText": "right",
"Type": "PureToken",
"Token": "RIGHT"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer",
"Token": "COUNT",
"ArgumentFlags": "Optional"
}
]
},
{
"Command": "BLPOP",
"Name": "BLPOP",
Expand Down
22 changes: 22 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@
}
]
},
{
"Command": "BLMPOP",
"Name": "BLMPOP",
"Arity": -5,
"Flags": "Blocking, MovableKeys, Write",
"AclCategories": "Blocking, List, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysKeyNum",
"KeyNumIdx": 0,
"FirstKey": 1,
"KeyStep": 1
},
"Flags": "RW, Access, Delete"
}
]
},
{
"Command": "BLPOP",
"Name": "BLPOP",
Expand Down
53 changes: 37 additions & 16 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Tsavorite.core;
Expand Down Expand Up @@ -54,11 +53,12 @@ public class CollectionItemBroker : IDisposable
/// <param name="keys">Keys of objects to observe</param>
/// <param name="session">Calling session instance</param>
/// <param name="timeoutInSeconds">Timeout of operation (in seconds, 0 for waiting indefinitely)</param>
/// <param name="cmdArgs">Additional arguments for command</param>
/// <returns>Result of operation</returns>
internal async Task<CollectionItemResult> GetCollectionItemAsync(RespCommand command, byte[][] keys,
RespServerSession session, double timeoutInSeconds)
RespServerSession session, double timeoutInSeconds, ArgSlice[] cmdArgs = null)
{
var observer = new CollectionItemObserver(session, command);
var observer = new CollectionItemObserver(session, command, cmdArgs);
return await this.GetCollectionItemAsync(observer, keys, timeoutInSeconds);
}

Expand Down Expand Up @@ -223,12 +223,12 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys)
// If the key already has a non-empty observer queue, it does not have an item to retrieve
// Otherwise, try to retrieve next available item
if ((KeysToObservers.ContainsKey(key) && KeysToObservers[key].Count > 0) ||
!TryGetNextItem(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
out _, out var nextItem)) continue;
!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
out _, out var result)) continue;

// An item was found - set the observer result and return
SessionIdToObserver.TryRemove(observer.Session.ObjectStoreSessionID, out _);
observer.HandleSetResult(new CollectionItemResult(key, nextItem));
observer.HandleSetResult(result);
return;
}

Expand Down Expand Up @@ -279,8 +279,8 @@ private bool TryAssignItemFromKey(byte[] key)
}

// Try to get next available item from object stored in key
if (!TryGetNextItem(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
out var currCount, out var nextItem))
if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
out var currCount, out var result))
{
// If unsuccessful getting next item but there is at least one item in the collection,
// continue to next observer in the queue, otherwise return
Expand All @@ -292,7 +292,7 @@ private bool TryAssignItemFromKey(byte[] key)
observers.TryDequeue(out observer);

SessionIdToObserver.TryRemove(observer!.Session.ObjectStoreSessionID, out _);
observer.HandleSetResult(new CollectionItemResult(key, nextItem));
observer.HandleSetResult(result);

return true;
}
Expand Down Expand Up @@ -412,17 +412,17 @@ private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespComman
/// <param name="command">RESP command</param>
/// <param name="cmdArgs">Additional command arguments</param>
/// <param name="currCount">Collection size</param>
/// <param name="nextItem">Retrieved item</param>
/// <param name="result">Result of command</param>
/// <returns>True if found available item</returns>
private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out byte[] nextItem)
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
{
currCount = default;
nextItem = default;
result = default;
var createTransaction = false;

var objectType = command switch
{
RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE => GarnetObjectType.List,
RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List,
_ => throw new NotSupportedException()
};

Expand Down Expand Up @@ -472,11 +472,15 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma
case ListObject listObj:
currCount = listObj.LnkList.Count;
if (objectType != GarnetObjectType.List) return false;
if (currCount == 0) return false;

switch (command)
{
case RespCommand.BLPOP:
case RespCommand.BRPOP:
return TryGetNextListItem(listObj, command, out nextItem);
var isSuccessful = TryGetNextListItem(listObj, command, out var nextItem);
result = new CollectionItemResult(key, nextItem);
return isSuccessful;
case RespCommand.BLMOVE:
ListObject dstList;
var newObj = false;
Expand All @@ -491,8 +495,9 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma
}
else return false;

var isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0],
isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0],
(OperationDirection)cmdArgs[2].ReadOnlySpan[0], out nextItem);
result = new CollectionItemResult(key, nextItem);

if (isSuccessful && newObj)
{
Expand All @@ -501,13 +506,29 @@ private bool TryGetNextItem(byte[] key, StorageSession storageSession, RespComma
}

return isSuccessful;
case RespCommand.BLMPOP:
var popDirection = (OperationDirection)cmdArgs[0].ReadOnlySpan[0];
var popCount = *(int*)(cmdArgs[1].ptr);
popCount = Math.Min(popCount, listObj.LnkList.Count);

var items = new byte[popCount][];
for (var i = 0; i < popCount; i++)
{
var _ = TryGetNextListItem(listObj, popDirection == OperationDirection.Left ? RespCommand.BLPOP : RespCommand.BRPOP, out items[i]); // Return can be ignored because it is guaranteed to return true
}

result = new CollectionItemResult(key, items);
return true;
default:
return false;
}
case SortedSetObject setObj:
currCount = setObj.Dictionary.Count;
if (objectType != GarnetObjectType.SortedSet) return false;
return TryGetNextSetObject(setObj, command, out nextItem);

var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem);
result = new CollectionItemResult(key, sortedSetNextItem);
return hasValue;
default:
return false;
}
Expand Down
25 changes: 21 additions & 4 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,20 @@ namespace Garnet.server
/// <summary>
/// Result of item retrieved from observed collection
/// </summary>
internal readonly struct CollectionItemResult(byte[] key, byte[] item)
internal readonly struct CollectionItemResult
{
public CollectionItemResult(byte[] key, byte[] item)
{
Key = key;
Item = item;
}

public CollectionItemResult(byte[] key, byte[][] items)
{
Key = key;
Items = items;
}

/// <summary>
/// True if item was found
/// </summary>
Expand All @@ -16,16 +28,21 @@ internal readonly struct CollectionItemResult(byte[] key, byte[] item)
/// <summary>
/// Key of collection from which item was retrieved
/// </summary>
internal byte[] Key { get; } = key;
internal byte[] Key { get; }

/// <summary>
/// Item retrieved from collection
/// </summary>
internal byte[] Item { get; }

/// <summary>
/// Item retrieved from collection
/// </summary>
internal byte[] Item { get; } = item;
internal byte[][] Items { get; }

/// <summary>
/// Instance of empty result
/// </summary>
internal static readonly CollectionItemResult Empty = new(null, null);
internal static readonly CollectionItemResult Empty = new(null, item: null);
}
}
103 changes: 103 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -901,5 +901,108 @@ public bool ListSet<TGarnetApi>(ref TGarnetApi storageApi)

return true;
}

/// <summary>
/// BLMPOP timeout numkeys key [key ...] LEFT|RIGHT [COUNT count]
/// </summary>
/// <returns></returns>
private unsafe bool ListBlockingPopMultiple()
{
if (parseState.Count < 4)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.BLMPOP));
}

var currTokenId = 0;

// Read timeout
if (!parseState.TryGetDouble(currTokenId++, out var timeout))
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT);
}

// Read count of keys
if (!parseState.TryGetInt(currTokenId++, out var numKeys))
{
var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "numkeys");
return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err));
}

if (parseState.Count != numKeys + 3 && parseState.Count != numKeys + 5)
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}

// Get the keys for Lists
var keysBytes = new byte[numKeys][];
for (var i = 0; i < keysBytes.Length; i++)
{
keysBytes[i] = parseState.GetArgSliceByRef(currTokenId++).SpanByte.ToByteArray();
}

var cmdArgs = new ArgSlice[2];

// Get the direction
var dir = parseState.GetArgSliceByRef(currTokenId++);
var popDirection = GetOperationDirection(dir);

if (popDirection == OperationDirection.Unknown)
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}
cmdArgs[0] = new ArgSlice((byte*)&popDirection, 1);

var popCount = 1;

// Get the COUNT keyword & parameter value, if specified
if (parseState.Count == numKeys + 5)
{
var countKeyword = parseState.GetArgSliceByRef(currTokenId++);

if (!countKeyword.ReadOnlySpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.COUNT))
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}

// Read count
if (!parseState.TryGetInt(currTokenId, out popCount) || popCount < 1)
{
var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "count");
return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err));
}
}

cmdArgs[1] = new ArgSlice((byte*)&popCount, sizeof(int));

if (storeWrapper.itemBroker == null)
throw new GarnetException("Object store is disabled");

var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BLMPOP, keysBytes, this, timeout, cmdArgs).Result;

if (!result.Found)
{
while (!RespWriteUtils.WriteNull(ref dcurr, dend))
SendAndReset();
return true;
}

while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend))
SendAndReset();

while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend))
SendAndReset();

var elements = result.Items;
while (!RespWriteUtils.WriteArrayLength(elements.Length, ref dcurr, dend))
SendAndReset();

foreach (var element in elements)
{
while (!RespWriteUtils.WriteBulkString(element, ref dcurr, dend))
SendAndReset();
}

return true;
}
}
}
Loading

0 comments on commit cf2f89e

Please sign in to comment.