diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index d3be2d12f3..127422ef70 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -34,6 +34,7 @@ import io.lettuce.core.output.KeyValueStreamingChannel; import io.lettuce.core.output.ScoredValueStreamingChannel; import io.lettuce.core.output.ValueStreamingChannel; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.Command; import io.lettuce.core.protocol.CommandArgs; @@ -390,7 +391,7 @@ public RedisFuture clientTracking(TrackingArgs args) { } @Override - public RedisFuture> clientTrackinginfo() { + public RedisFuture clientTrackinginfo() { return dispatch(commandBuilder.clientTrackinginfo()); } diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 92fb5246d0..019103e013 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -34,6 +34,7 @@ import io.lettuce.core.output.KeyValueStreamingChannel; import io.lettuce.core.output.ScoredValueStreamingChannel; import io.lettuce.core.output.ValueStreamingChannel; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.Command; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; @@ -408,8 +409,8 @@ public Mono clientTracking(TrackingArgs args) { } @Override - public Flux clientTrackinginfo() { - return createDissolvingFlux(commandBuilder::clientTrackinginfo); + public Mono clientTrackinginfo() { + return createMono(commandBuilder::clientTrackinginfo); } @Override diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 8371607b38..091cb0861d 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -28,6 +28,7 @@ import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; import io.lettuce.core.output.*; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.BaseRedisCommandBuilder; import io.lettuce.core.protocol.Command; import io.lettuce.core.protocol.CommandArgs; @@ -528,10 +529,10 @@ Command clientTracking(TrackingArgs trackingArgs) { return createCommand(CLIENT, new StatusOutput<>(codec), args); } - Command> clientTrackinginfo() { + Command clientTrackinginfo() { CommandArgs args = new CommandArgs<>(codec).add(TRACKINGINFO); - return new Command<>(CLIENT, new ArrayOutput<>(codec), args); + return new Command<>(CLIENT, new DynamicAggregateOutput<>(codec), args); } Command clientUnblock(long id, UnblockType type) { diff --git a/src/main/java/io/lettuce/core/api/async/RedisServerAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisServerAsyncCommands.java index bb8294e5d4..3010e4df7a 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisServerAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisServerAsyncCommands.java @@ -30,6 +30,7 @@ import io.lettuce.core.ShutdownArgs; import io.lettuce.core.TrackingArgs; import io.lettuce.core.UnblockType; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; /** @@ -180,12 +181,12 @@ public interface RedisServerAsyncCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List<Object> array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - RedisFuture> clientTrackinginfo(); + RedisFuture clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfo.java b/src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfo.java similarity index 73% rename from src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfo.java rename to src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfo.java index e3ceec2436..892be765ec 100644 --- a/src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfo.java +++ b/src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfo.java @@ -1,8 +1,29 @@ -package io.lettuce.core.cluster.models.tracking; +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.api.parsers.tracking; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import java.util.List; @@ -57,6 +78,26 @@ public List getPrefixes() { return Collections.unmodifiableList(prefixes); } + @Override + public String toString() { + return "TrackingInfo{" + "flags=" + flags + ", redirect=" + redirect + ", prefixes=" + prefixes + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TrackingInfo that = (TrackingInfo) o; + return redirect == that.redirect && Objects.equals(flags, that.flags) && Objects.equals(prefixes, that.prefixes); + } + + @Override + public int hashCode() { + return Objects.hash(flags, redirect, prefixes); + } + /** * CLIENT TRACKINGINFO flags * diff --git a/src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfoParser.java b/src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfoParser.java new file mode 100644 index 0000000000..eaa5da890c --- /dev/null +++ b/src/main/java/io/lettuce/core/api/parsers/tracking/TrackingInfoParser.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.api.parsers.tracking; + +import io.lettuce.core.output.data.DynamicAggregateData; +import io.lettuce.core.protocol.CommandKeyword; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Parser for Redis CLIENT TRACKINGINFO command output. + * + * @author Tihomir Mateev + * @since 7.0 + */ +public class TrackingInfoParser { + + /** + * Utility constructor. + */ + private TrackingInfoParser() { + } + + /** + * Parse the output of the Redis CLIENT TRACKINGINFO command and convert it to a {@link TrackingInfo} + * + * @param trackinginfoOutput output of CLIENT TRACKINGINFO command + * @return an {@link TrackingInfo} instance + */ + public static TrackingInfo parse(DynamicAggregateData trackinginfoOutput) { + + verifyStructure(trackinginfoOutput); + + Map data = trackinginfoOutput.getDynamicMap(); + Set flags = ((DynamicAggregateData) data.get(CommandKeyword.FLAGS.toString().toLowerCase())).getDynamicSet(); + Long clientId = (Long) data.get(CommandKeyword.REDIRECT.toString().toLowerCase()); + List prefixes = ((DynamicAggregateData) data.get(CommandKeyword.PREFIXES.toString().toLowerCase())).getDynamicList(); + + Set parsedFlags = new HashSet<>(); + List parsedPrefixes = new ArrayList<>(); + + for (Object flag : flags) { + String toParse = (String) flag; + parsedFlags.add(TrackingInfo.TrackingFlag.from(toParse)); + } + + for (Object prefix : prefixes) { + parsedPrefixes.add((String) prefix); + } + + return new TrackingInfo(parsedFlags, clientId, parsedPrefixes); + } + + private static void verifyStructure(DynamicAggregateData trackinginfoOutput) { + + if (trackinginfoOutput == null || trackinginfoOutput.getDynamicMap().isEmpty()) { + throw new IllegalArgumentException("trackinginfoOutput must not be null or empty"); + } + + Map data = trackinginfoOutput.getDynamicMap(); + + if (!data.containsKey(CommandKeyword.FLAGS.toString().toLowerCase()) + || !data.containsKey(CommandKeyword.REDIRECT.toString().toLowerCase()) + || !data.containsKey(CommandKeyword.PREFIXES.toString().toLowerCase())) { + throw new IllegalArgumentException("trackinginfoOutput has missing flags"); + } + } + +} diff --git a/src/main/java/io/lettuce/core/api/parsers/tracking/package-info.java b/src/main/java/io/lettuce/core/api/parsers/tracking/package-info.java new file mode 100644 index 0000000000..11ff697bb7 --- /dev/null +++ b/src/main/java/io/lettuce/core/api/parsers/tracking/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Model and parser for the {@code CLIENT TRACKINGINFO} output. + */ +package io.lettuce.core.api.parsers.tracking; diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisServerReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisServerReactiveCommands.java index 772cf45690..51fb1f3b1f 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisServerReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisServerReactiveCommands.java @@ -28,6 +28,7 @@ import io.lettuce.core.ShutdownArgs; import io.lettuce.core.TrackingArgs; import io.lettuce.core.UnblockType; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -180,12 +181,12 @@ public interface RedisServerReactiveCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return Object array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - Flux clientTrackinginfo(); + Mono clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/main/java/io/lettuce/core/api/sync/RedisServerCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisServerCommands.java index 287e0ea408..20c76789cd 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisServerCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisServerCommands.java @@ -29,6 +29,7 @@ import io.lettuce.core.ShutdownArgs; import io.lettuce.core.TrackingArgs; import io.lettuce.core.UnblockType; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; /** @@ -179,12 +180,12 @@ public interface RedisServerCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List<Object> array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - List clientTrackinginfo(); + DynamicAggregateData clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionServerAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionServerAsyncCommands.java index f6054f9414..d3668a4b28 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionServerAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionServerAsyncCommands.java @@ -28,6 +28,7 @@ import io.lettuce.core.KillArgs; import io.lettuce.core.TrackingArgs; import io.lettuce.core.UnblockType; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; /** @@ -178,12 +179,12 @@ public interface NodeSelectionServerAsyncCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List<Object> array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - AsyncExecutions> clientTrackinginfo(); + AsyncExecutions clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionServerCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionServerCommands.java index b57bb18232..c1b1f26b4d 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionServerCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionServerCommands.java @@ -28,6 +28,7 @@ import io.lettuce.core.KillArgs; import io.lettuce.core.TrackingArgs; import io.lettuce.core.UnblockType; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; /** @@ -178,12 +179,12 @@ public interface NodeSelectionServerCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List<Object> array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - Executions> clientTrackinginfo(); + Executions clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParser.java b/src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParser.java deleted file mode 100644 index b34cf55116..0000000000 --- a/src/main/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParser.java +++ /dev/null @@ -1,75 +0,0 @@ -package io.lettuce.core.cluster.models.tracking; - -import io.lettuce.core.protocol.CommandKeyword; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Parser for Redis CLIENT TRACKINGINFO command output. - * - * @author Tihomir Mateev - * @since 7.0 - */ -public class TrackingInfoParser { - - /** - * Utility constructor. - */ - private TrackingInfoParser() { - } - - /** - * Parse the output of the Redis CLIENT TRACKINGINFO command and convert it to a {@link TrackingInfo} - * - * @param trackinginfoOutput output of CLIENT TRACKINGINFO command - * @return an {@link TrackingInfo} instance - */ - public static TrackingInfo parse(List trackinginfoOutput) { - - verifyStructure(trackinginfoOutput); - - List flags = (List) trackinginfoOutput.get(1); - Long clientId = (Long) trackinginfoOutput.get(3); - List prefixes = (List) trackinginfoOutput.get(5); - - Set parsedFlags = new HashSet<>(); - List parsedPrefixes = new ArrayList<>(); - - for (Object flag : flags) { - String toParse = (String) flag; - parsedFlags.add(TrackingInfo.TrackingFlag.from(toParse)); - } - - for (Object prefix : prefixes) { - parsedPrefixes.add((String) prefix); - } - - return new TrackingInfo(parsedFlags, clientId, parsedPrefixes); - } - - private static void verifyStructure(List trackinginfoOutput) { - - if (trackinginfoOutput == null || trackinginfoOutput.isEmpty()) { - throw new IllegalArgumentException("trackinginfoOutput must not be null or empty"); - } - - if (trackinginfoOutput.size() != 6) { - throw new IllegalArgumentException("trackinginfoOutput has wrong number of elements"); - } - - if (!CommandKeyword.FLAGS.toString().equalsIgnoreCase(trackinginfoOutput.get(0).toString()) - || !CommandKeyword.REDIRECT.toString().equalsIgnoreCase(trackinginfoOutput.get(2).toString()) - || !CommandKeyword.PREFIXES.toString().equalsIgnoreCase(trackinginfoOutput.get(4).toString())) { - throw new IllegalArgumentException("trackinginfoOutput has unsupported argument order"); - } - - if (!(trackinginfoOutput.get(1) instanceof List) || !(trackinginfoOutput.get(3) instanceof Long) - || !(trackinginfoOutput.get(5) instanceof List)) { - throw new IllegalArgumentException("trackinginfoOutput has unsupported argument format"); - } - } - -} diff --git a/src/main/java/io/lettuce/core/cluster/models/tracking/package-info.java b/src/main/java/io/lettuce/core/cluster/models/tracking/package-info.java deleted file mode 100644 index 3ace85df5c..0000000000 --- a/src/main/java/io/lettuce/core/cluster/models/tracking/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Model and parser for the {@code CLIENT TRACKINGINFO} output. - */ -package io.lettuce.core.cluster.models.tracking; diff --git a/src/main/java/io/lettuce/core/output/DynamicAggregateOutput.java b/src/main/java/io/lettuce/core/output/DynamicAggregateOutput.java new file mode 100644 index 0000000000..2e4fd5c8d0 --- /dev/null +++ b/src/main/java/io/lettuce/core/output/DynamicAggregateOutput.java @@ -0,0 +1,158 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.output; + +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.internal.LettuceFactories; +import io.lettuce.core.output.data.DynamicAggregateData; +import io.lettuce.core.output.data.ArrayAggregateData; +import io.lettuce.core.output.data.MapAggregateData; +import io.lettuce.core.output.data.SetAggregateData; + +import java.nio.ByteBuffer; +import java.util.Deque; + +/** + * An implementation of the {@link CommandOutput} that heuristically attempts to parse any response the Redis server could + * provide, leaving out the user to provide the knowledge of how this data is to be processed + *

+ * The Redis server could return, besides the simple types such as boolean, long, String, etc. also aggregations of the former, + * inside aggregate data structures such as arrays, maps and sets, defined in the specification for both the + * RESP2 and + * RESP3 protocol. + *

+ * Commands typically result in simple types, however some of the commands could return complex nested structures. A simple + * solution to parse such a structure is to have a dynamic data type and leave the user to parse the result to a domain object. + * If there is some breach of contract then the code consuming the driver could simply stop using the provided parser and start + * parsing the new dynamic data itself, without having to change the version of the library. This allows a certain degree of + * stability against change. Consult the members of the {@link io.lettuce.core.api.parsers} package for details on how aggregate + * Objects could be parsed. + *

+ * The {@link DynamicAggregateOutput} is supposed to be compatible with all Redis commands. + * + * @see DynamicAggregateData + * @author Tihomir Mateev + * @since 7.0 + */ +public class DynamicAggregateOutput extends CommandOutput { + + private final Deque dataStack; + + /** + * Constructs a new instance of the {@link DynamicAggregateOutput} + * + * @param codec the {@link RedisCodec} to be applied + */ + public DynamicAggregateOutput(RedisCodec codec) { + super(codec, null); + dataStack = LettuceFactories.newSpScQueue(); + } + + @Override + public void set(long integer) { + if (output == null) { + throw new RuntimeException("Invalid output received for dynamic aggregate output." + + "Integer value should have been preceded by some sort of aggregation."); + } + + output.store(integer); + } + + @Override + public void set(double number) { + if (output == null) { + throw new RuntimeException("Invalid output received for dynamic aggregate output." + + "Double value should have been preceded by some sort of aggregation."); + } + + output.store(number); + } + + @Override + public void set(boolean value) { + if (output == null) { + throw new RuntimeException("Invalid output received for dynamic aggregate output." + + "Double value should have been preceded by some sort of aggregation."); + } + + output.store(value); + } + + @Override + public void set(ByteBuffer bytes) { + if (output == null) { + throw new RuntimeException("Invalid output received for dynamic aggregate output." + + "ByteBuffer value should have been preceded by some sort of aggregation."); + } + + output.storeObject(bytes == null ? null : codec.decodeValue(bytes)); + } + + @Override + public void setSingle(ByteBuffer bytes) { + if (output == null) { + throw new RuntimeException("Invalid output received for dynamic aggregate output." + + "String value should have been preceded by some sort of aggregation."); + } + + output.store(bytes == null ? null : StringCodec.UTF8.decodeValue(bytes)); + } + + @Override + public void complete(int depth) { + if (!dataStack.isEmpty() && depth == dataStack.size()) { + output = dataStack.pop(); + } + } + + private void multi(DynamicAggregateData data) { + // if there is no data set, then we are at the root object + if (output == null) { + output = data; + return; + } + + // otherwise we need to nest the provided structure + output.storeObject(data); + dataStack.push(output); + output = data; + } + + @Override + public void multiSet(int count) { + SetAggregateData dynamicData = new SetAggregateData(count); + multi(dynamicData); + } + + @Override + public void multiArray(int count) { + ArrayAggregateData dynamicData = new ArrayAggregateData(count); + multi(dynamicData); + } + + @Override + public void multiMap(int count) { + MapAggregateData dynamicData = new MapAggregateData(count); + multi(dynamicData); + } + +} diff --git a/src/main/java/io/lettuce/core/output/data/ArrayAggregateData.java b/src/main/java/io/lettuce/core/output/data/ArrayAggregateData.java new file mode 100644 index 0000000000..259e711239 --- /dev/null +++ b/src/main/java/io/lettuce/core/output/data/ArrayAggregateData.java @@ -0,0 +1,93 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.output.data; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An implementation of the {@link DynamicAggregateData} that handles arrays. + *

+ * For RESP2 calling the {@link DynamicAggregateData#getDynamicMap()} would heuristically go over the list of elements assuming + * every odd element is a key and every even object is the value and then adding them to an {@link Map}. The logic would follow + * the same order that was used when the elements were added to the {@link ArrayAggregateData}. Similarly calling the + * {@link DynamicAggregateData#getDynamicSet()} would return a set of all the elements, adding them in the same order. If - for + * some reason - duplicate elements exist they would be overwritten. + *

+ * All data structures that the implementation returns are unmodifiable + * + * @see DynamicAggregateData + * @author Tihomir Mateev + * @since 7.0 + */ +public class ArrayAggregateData extends DynamicAggregateData { + + private final List data; + + public ArrayAggregateData(int count) { + data = new ArrayList<>(count); + } + + @Override + public void storeObject(Object value) { + data.add(value); + } + + @Override + public List getDynamicList() { + return Collections.unmodifiableList(data); + } + + @Override + public Set getDynamicSet() { + // RESP2 compatibility mode - assuming the caller is aware that the array really contains a set (because in RESP2 we + // lack support for this data type) we make the conversion here + Set set = new LinkedHashSet<>(data); + return Collections.unmodifiableSet(set); + } + + @Override + public Map getDynamicMap() { + // RESP2 compatibility mode - assuming the caller is aware that the array really contains a map (because in RESP2 we + // lack support for this data type) we make the conversion here + Map map = new LinkedHashMap<>(); + final Boolean[] isKey = { true }; + final Object[] key = new Object[1]; + + data.forEach(element -> { + if (isKey[0]) { + key[0] = element; + isKey[0] = false; + } else { + map.put(key[0], element); + isKey[0] = true; + } + }); + + return Collections.unmodifiableMap(map); + } + +} diff --git a/src/main/java/io/lettuce/core/output/data/DynamicAggregateData.java b/src/main/java/io/lettuce/core/output/data/DynamicAggregateData.java new file mode 100644 index 0000000000..dc494b3bdf --- /dev/null +++ b/src/main/java/io/lettuce/core/output/data/DynamicAggregateData.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.output.data; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The base type of all aggregate data, collected by a {@link io.lettuce.core.output.DynamicAggregateOutput} + *

+ * Commands typically result in simple types, however some of the commands could return complex nested structures. A simple + * solution to parse such a structure is to have a dynamic data type and leave the user to parse the result to a domain object. + * If there is some breach of contract then the code consuming the driver could simply stop using the provided parser and start + * parsing the new dynamic data itself, without having to change the version of the library. This allows a certain degree of + * stability against change. Consult the members of the {@link io.lettuce.core.api.parsers} package for details on how aggregate + * Objects could be parsed. + *

+ * An aggregate data object could contain multiple (or no) units of simple data, as per the + * RESP2 and + * RESP3 protocol definitions. + *

+ * For RESP2 the only possible aggregation is an array. RESP2 commands could also return sets (obviously, by simply making sure + * the elements of the array are unique) or maps (by sending the keys as odd elements and their values as even elements in the + * right order one after another. + *

+ * For RESP3 all the three aggregate types are supported (and indicated with special characters when the result is returned by + * the server). + *

+ * Aggregate data types could also be nested by using the {@link DynamicAggregateData#storeObject(Object)} call. + *

+ * Implementations of this class override the {@link DynamicAggregateData#getDynamicSet()}, + * {@link DynamicAggregateData#getDynamicList()} and {@link DynamicAggregateData#getDynamicMap()} methods to return the data + * received in the server in a implementation of the Collections framework. If a given implementation could not do the + * conversion in a meaningful way an {@link UnsupportedOperationException} would be thrown. + * + * @see io.lettuce.core.output.DynamicAggregateOutput + * @see ArrayAggregateData + * @see SetAggregateData + * @see MapAggregateData + * @author Tihomir Mateev + * @since 7.0 + */ +public abstract class DynamicAggregateData { + + /** + * Store a long value in the underlying data structure + * + * @param value the value to store + */ + public void store(long value) { + storeObject(value); + } + + /** + * Store a double value in the underlying data structure + * + * @param value the value to store + */ + public void store(double value) { + storeObject(value); + } + + /** + * Store a boolean value in the underlying data structure + * + * @param value the value to store + */ + public void store(boolean value) { + storeObject(value); + } + + /** + * Store an {@link Object} value in the underlying data structure. This method should be used when nesting one instance of + * {@link DynamicAggregateData} inside another + * + * @param value the value to store + */ + public abstract void storeObject(Object value); + + public void store(String value) { + storeObject(value); + } + + /** + * Returns the aggregate data in a {@link List} form. If the underlying implementation could not convert the data structure + * to a {@link List} then an {@link UnsupportedOperationException} would be thrown. + * + * @return a {@link List} of {@link Object} values + */ + public List getDynamicList() { + throw new UnsupportedOperationException("The type of data stored in this dynamic object is not a list"); + } + + /** + * Returns the aggregate data in a {@link Set} form. If the underlying implementation could not convert the data structure + * to a {@link Set} then an {@link UnsupportedOperationException} would be thrown. + * + * @return a {@link Set} of {@link Object} values + */ + public Set getDynamicSet() { + throw new UnsupportedOperationException("The type of data stored in this dynamic object is not a set"); + } + + /** + * Returns the aggregate data in a {@link Map} form. If the underlying implementation could not convert the data structure + * to a {@link Map} then an {@link UnsupportedOperationException} would be thrown. + * + * @return a {@link Map} of {@link Object} values + */ + public Map getDynamicMap() { + throw new UnsupportedOperationException("The type of data stored in this dynamic object is not a map"); + } + +} diff --git a/src/main/java/io/lettuce/core/output/data/MapAggregateData.java b/src/main/java/io/lettuce/core/output/data/MapAggregateData.java new file mode 100644 index 0000000000..0f42baabd4 --- /dev/null +++ b/src/main/java/io/lettuce/core/output/data/MapAggregateData.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.output.data; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * An implementation of the {@link DynamicAggregateData} that handles maps. + *

+ * All data structures that the implementation returns are unmodifiable + * + * @see DynamicAggregateData + * @author Tihomir Mateev + * @since 7.0 + */ +public class MapAggregateData extends DynamicAggregateData { + + private final Map data; + + private Object key; + + public MapAggregateData(int count) { + data = new HashMap<>(count); + } + + @Override + public void storeObject(Object value) { + if (key == null) { + key = value; + } else { + data.put(key, value); + key = null; + } + } + + @Override + public Map getDynamicMap() { + return Collections.unmodifiableMap(data); + } + +} diff --git a/src/main/java/io/lettuce/core/output/data/SetAggregateData.java b/src/main/java/io/lettuce/core/output/data/SetAggregateData.java new file mode 100644 index 0000000000..3a0718ab9b --- /dev/null +++ b/src/main/java/io/lettuce/core/output/data/SetAggregateData.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.output.data; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * An implementation of the {@link DynamicAggregateData} that handles maps. + *

+ * All data structures that the implementation returns are unmodifiable + * + * @see DynamicAggregateData + * @author Tihomir Mateev + * @since 7.0 + */ +public class SetAggregateData extends DynamicAggregateData { + + private final Set data; + + public SetAggregateData(int count) { + data = new HashSet<>(count); + } + + @Override + public void storeObject(Object value) { + data.add(value); + } + + @Override + public Set getDynamicSet() { + return Collections.unmodifiableSet(data); + } + + @Override + public List getDynamicList() { + List list = new ArrayList<>(data.size()); + list.addAll(data); + return Collections.unmodifiableList(list); + } + +} diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommands.kt index a7488ba3d8..3bb92e8d3b 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommands.kt @@ -21,6 +21,7 @@ package io.lettuce.core.api.coroutines import io.lettuce.core.* +import io.lettuce.core.output.data.DynamicAggregateData import io.lettuce.core.protocol.CommandType import java.util.* @@ -172,12 +173,12 @@ interface RedisServerCoroutinesCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return @link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - suspend fun clientTrackinginfo(): List + suspend fun clientTrackinginfo(): DynamicAggregateData? /** * Unblock the specified blocked client. diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt index 1754bd87ba..e84b67807c 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt @@ -22,6 +22,7 @@ package io.lettuce.core.api.coroutines import io.lettuce.core.* import io.lettuce.core.api.reactive.RedisServerReactiveCommands +import io.lettuce.core.output.data.DynamicAggregateData import io.lettuce.core.protocol.CommandType import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.asFlow @@ -75,7 +76,7 @@ internal class RedisServerCoroutinesCommandsImpl(internal val override suspend fun clientTracking(args: TrackingArgs): String? = ops.clientTracking(args).awaitFirstOrNull() - override suspend fun clientTrackinginfo(): List = ops.clientTrackinginfo().asFlow().toList() + override suspend fun clientTrackinginfo(): DynamicAggregateData? = ops.clientTrackinginfo().awaitFirstOrNull() override suspend fun clientUnblock(id: Long, type: UnblockType): Long? = ops.clientUnblock(id, type).awaitFirstOrNull() diff --git a/src/main/templates/io/lettuce/core/api/RedisServerCommands.java b/src/main/templates/io/lettuce/core/api/RedisServerCommands.java index ef51ff9f2d..e3a4c699c7 100644 --- a/src/main/templates/io/lettuce/core/api/RedisServerCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisServerCommands.java @@ -178,12 +178,12 @@ public interface RedisServerCommands { /** * Returns information about the current client connection's use of the server assisted client side caching feature. * - * @return List<Object> array-list-reply, for more information check the documentation - * @see io.lettuce.core.cluster.models.tracking.TrackingInfoParser - * @see io.lettuce.core.cluster.models.tracking.TrackingInfo + * @return {@link DynamicAggregateData}, for more information check the documentation + * @see io.lettuce.core.api.parsers.tracking.TrackingInfoParser + * @see io.lettuce.core.api.parsers.tracking.TrackingInfo * @since 7.0 */ - List clientTrackinginfo(); + DynamicAggregateData clientTrackinginfo(); /** * Unblock the specified blocked client. diff --git a/src/test/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParserTest.java b/src/test/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParserTest.java index 1742c4e87a..d9499fdccb 100644 --- a/src/test/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParserTest.java +++ b/src/test/java/io/lettuce/core/cluster/models/tracking/TrackingInfoParserTest.java @@ -1,26 +1,34 @@ package io.lettuce.core.cluster.models.tracking; +import io.lettuce.core.api.parsers.tracking.TrackingInfo; +import io.lettuce.core.api.parsers.tracking.TrackingInfoParser; +import io.lettuce.core.output.data.ArrayAggregateData; +import io.lettuce.core.output.data.DynamicAggregateData; +import io.lettuce.core.output.data.MapAggregateData; +import io.lettuce.core.output.data.SetAggregateData; import io.lettuce.core.protocol.CommandKeyword; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; class TrackingInfoParserTest { @Test - void parse() { - List input = new ArrayList<>(); - input.add(CommandKeyword.FLAGS.toString()); - input.add(Arrays.asList(TrackingInfo.TrackingFlag.ON.toString(), TrackingInfo.TrackingFlag.OPTIN.toString())); - input.add(CommandKeyword.REDIRECT.toString()); - input.add(0L); - input.add(CommandKeyword.PREFIXES.toString()); - input.add(new ArrayList<>()); + void parseResp3() { + DynamicAggregateData flags = new SetAggregateData(2); + flags.store(TrackingInfo.TrackingFlag.ON.toString()); + flags.store(TrackingInfo.TrackingFlag.OPTIN.toString()); + + DynamicAggregateData prefixes = new ArrayAggregateData(0); + + DynamicAggregateData input = new MapAggregateData(3); + input.store(CommandKeyword.FLAGS.toString().toLowerCase()); + input.storeObject(flags); + input.store(CommandKeyword.REDIRECT.toString().toLowerCase()); + input.store(0L); + input.store(CommandKeyword.PREFIXES.toString().toLowerCase()); + input.storeObject(prefixes); TrackingInfo info = TrackingInfoParser.parse(input); @@ -31,7 +39,7 @@ void parse() { @Test void parseFailEmpty() { - List input = new ArrayList<>(); + DynamicAggregateData input = new MapAggregateData(0); Exception exception = assertThrows(IllegalArgumentException.class, () -> { TrackingInfo info = TrackingInfoParser.parse(input); @@ -40,25 +48,17 @@ void parseFailEmpty() { @Test void parseFailNumberOfElements() { - List input = new ArrayList<>(); - input.add(CommandKeyword.FLAGS.toString()); - input.add(Arrays.asList(TrackingInfo.TrackingFlag.ON.toString(), TrackingInfo.TrackingFlag.OPTIN.toString())); - input.add(CommandKeyword.REDIRECT.toString()); + DynamicAggregateData flags = new SetAggregateData(2); + flags.store(TrackingInfo.TrackingFlag.ON.toString()); + flags.store(TrackingInfo.TrackingFlag.OPTIN.toString()); - Exception exception = assertThrows(IllegalArgumentException.class, () -> { - TrackingInfo info = TrackingInfoParser.parse(input); - }); - } + DynamicAggregateData prefixes = new ArrayAggregateData(0); - @Test - void parseFailOrder() { - List input = new ArrayList<>(); - input.add(CommandKeyword.FLAGS.toString()); - input.add(Arrays.asList(TrackingInfo.TrackingFlag.ON.toString(), TrackingInfo.TrackingFlag.OPTIN.toString())); - input.add(CommandKeyword.PREFIXES.toString()); - input.add(new ArrayList<>()); - input.add(CommandKeyword.REDIRECT.toString()); - input.add(0L); + DynamicAggregateData input = new MapAggregateData(3); + input.store(CommandKeyword.FLAGS.toString().toLowerCase()); + input.storeObject(flags); + input.store(CommandKeyword.REDIRECT.toString().toLowerCase()); + input.store(-1L); Exception exception = assertThrows(IllegalArgumentException.class, () -> { TrackingInfo info = TrackingInfoParser.parse(input); @@ -66,18 +66,26 @@ void parseFailOrder() { } @Test - void parseFailTypes() { - List input = new ArrayList<>(); - input.add(CommandKeyword.FLAGS.toString()); - input.add(Arrays.asList(TrackingInfo.TrackingFlag.ON.toString(), TrackingInfo.TrackingFlag.OPTIN.toString())); - input.add(CommandKeyword.REDIRECT.toString()); - input.add(Boolean.FALSE); - input.add(CommandKeyword.PREFIXES.toString()); - input.add(new ArrayList<>()); + void parseResp2Compatibility() { + DynamicAggregateData flags = new ArrayAggregateData(2); + flags.store(TrackingInfo.TrackingFlag.ON.toString()); + flags.store(TrackingInfo.TrackingFlag.OPTIN.toString()); - Exception exception = assertThrows(IllegalArgumentException.class, () -> { - TrackingInfo info = TrackingInfoParser.parse(input); - }); + DynamicAggregateData prefixes = new ArrayAggregateData(0); + + DynamicAggregateData input = new ArrayAggregateData(3); + input.store(CommandKeyword.FLAGS.toString().toLowerCase()); + input.storeObject(flags); + input.store(CommandKeyword.REDIRECT.toString().toLowerCase()); + input.store(0L); + input.store(CommandKeyword.PREFIXES.toString().toLowerCase()); + input.storeObject(prefixes); + + TrackingInfo info = TrackingInfoParser.parse(input); + + assertThat(info.getFlags()).contains(TrackingInfo.TrackingFlag.ON, TrackingInfo.TrackingFlag.OPTIN); + assertThat(info.getRedirect()).isEqualTo(0L); + assertThat(info.getPrefixes()).isEmpty(); } } diff --git a/src/test/java/io/lettuce/core/commands/ServerCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/ServerCommandIntegrationTests.java index 3bc1bfce5c..43a21d551f 100644 --- a/src/test/java/io/lettuce/core/commands/ServerCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/ServerCommandIntegrationTests.java @@ -43,13 +43,14 @@ import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.push.PushMessage; import io.lettuce.core.api.sync.RedisCommands; -import io.lettuce.core.cluster.models.tracking.TrackingInfo; -import io.lettuce.core.cluster.models.tracking.TrackingInfoParser; +import io.lettuce.core.api.parsers.tracking.TrackingInfo; +import io.lettuce.core.api.parsers.tracking.TrackingInfoParser; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.models.command.CommandDetail; import io.lettuce.core.models.command.CommandDetailParser; import io.lettuce.core.models.role.RedisInstance; import io.lettuce.core.models.role.RoleParser; +import io.lettuce.core.output.data.DynamicAggregateData; import io.lettuce.core.protocol.CommandType; import io.lettuce.test.LettuceExtension; import io.lettuce.test.Wait; @@ -124,7 +125,7 @@ void clientCaching() { @Test void clientTrackinginfoDefaults() { - List rawTrackingInfo = redis.clientTrackinginfo(); + DynamicAggregateData rawTrackingInfo = redis.clientTrackinginfo(); TrackingInfo info = TrackingInfoParser.parse(rawTrackingInfo); @@ -137,7 +138,7 @@ void clientTrackinginfoDefaults() { void clientTrackinginfo() { try { redis.clientTracking(TrackingArgs.Builder.enabled(true).bcast().prefixes("usr:", "grp:")); - List rawTrackingInfo = redis.clientTrackinginfo(); + DynamicAggregateData rawTrackingInfo = redis.clientTrackinginfo(); TrackingInfo info = TrackingInfoParser.parse(rawTrackingInfo);