From 65fa51f040ec1ebd546eea86389e25ac63f95146 Mon Sep 17 00:00:00 2001 From: kurenchuksergey Date: Fri, 6 Oct 2023 00:00:49 +0200 Subject: [PATCH 1/4] Usability improvements: New SuperStream builder builder provide a way to configure: - maxAge - maxLength - maxSegmentSize --- .../rabbit/stream/config/SuperStream.java | 39 ++++- .../stream/config/SuperStreamBuilder.java | 151 ++++++++++++++++++ 2 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java index 0d74183d79..9a1a276f9e 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -37,7 +38,6 @@ * * @author Gary Russell * @since 3.0 - * */ public class SuperStream extends Declarables { @@ -47,9 +47,21 @@ public class SuperStream extends Declarables { * @param partitions the number of partitions. */ public SuperStream(String name, int partitions) { + this(name, partitions, Map.of()); + } + + /** + * Create a Super Stream with the provided parameters. + * @param name the stream name. + * @param partitions the number of partitions. + * @param arguments the stream arguments + */ + public SuperStream(String name, int partitions, Map arguments) { this(name, partitions, (q, i) -> IntStream.range(0, i) .mapToObj(String::valueOf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + arguments + ); } /** @@ -61,19 +73,36 @@ public SuperStream(String name, int partitions) { * partitions, the returned list must have a size equal to the partitions. */ public SuperStream(String name, int partitions, BiFunction> routingKeyStrategy) { - super(declarables(name, partitions, routingKeyStrategy)); + this(name, partitions, routingKeyStrategy, Map.of()); + } + + /** + * Create a Super Stream with the provided parameters. + * @param name the stream name. + * @param partitions the number of partitions. + * @param routingKeyStrategy a strategy to determine routing keys to use for the + * partitions. The first parameter is the queue name, the second the number of + * partitions, the returned list must have a size equal to the partitions. + * @param arguments the stream arguments + */ + public SuperStream(String name, int partitions, BiFunction> routingKeyStrategy, Map arguments) { + super(declarables(name, partitions, routingKeyStrategy, arguments)); } private static Collection declarables(String name, int partitions, - BiFunction> routingKeyStrategy) { + BiFunction> routingKeyStrategy, + Map arguments) { List declarables = new ArrayList<>(); List rks = routingKeyStrategy.apply(name, partitions); Assert.state(rks.size() == partitions, () -> "Expected " + partitions + " routing keys, not " + rks.size()); declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true))); + + Map argumentsCopy = new HashMap<>(arguments); + argumentsCopy.put("x-queue-type", "stream"); for (int i = 0; i < partitions; i++) { String rk = rks.get(i); - Queue q = new Queue(name + "-" + i, true, false, false, Map.of("x-queue-type", "stream")); + Queue q = new Queue(name + "-" + i, true, false, false, argumentsCopy); declarables.add(q); declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk, Map.of("x-stream-partition-order", i))); diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java new file mode 100644 index 0000000000..fafeb94abf --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java @@ -0,0 +1,151 @@ +package org.springframework.rabbit.stream.config; + +import org.springframework.util.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Builds a Spring AMQP Super Stream using a fluent API. + * Based on Streams documentation + * + * @author Sergei Kurenchuk + * @since 3.1.0 + */ +public class SuperStreamBuilder { + private final Map arguments = new HashMap<>(); + private String name; + private int partitions = -1; + + private BiFunction> routingKeyStrategy; + + /** + * Creates a builder for Super Stream + * + * @param name stream name + * @return the builder + */ + public static SuperStreamBuilder superStream(String name) { + SuperStreamBuilder builder = new SuperStreamBuilder(); + builder.name(name); + return builder; + } + + /** + * Creates a builder for Super Stream + * + * @param name stream name + * @param partitions partitions number + * @return the builder + */ + public static SuperStreamBuilder superStream(String name, int partitions) { + return superStream(name).partitions(partitions); + } + + /** + * Set the maximum age retention per stream, which will remove the oldest data. + * + * @param maxAge valid units: Y, M, D, h, m, s + * e.g. 7D for a week + * @return the builder + */ + public SuperStreamBuilder maxAge(String maxAge) { + return withArgument("x-max-age", maxAge); + } + + /** + * Set the maximum log size as the retention configuration for each stream, + * which will truncate the log based on the data size. + * + * @param bytes the max total size in bytes + * @return the builder + */ + public SuperStreamBuilder maxLength(int bytes) { + return withArgument("max-length-bytes", bytes); + } + + /** + * Set the maximum size limit for segment file. + * + * @param bytes the max segments size in bytes + * @return the builder + */ + public SuperStreamBuilder maxSegmentSize(int bytes) { + return withArgument("x-stream-max-segment-size-bytes", bytes); + } + + /** + * Set extra argument which is not covered by builder's methods. + * + * @param key argument name + * @param value argument value + * @return the builder + */ + public SuperStreamBuilder withArgument(String key, Object value) { + if ("x-queue-type".equals(key) && !"stream".equals(value)) { + throw new IllegalArgumentException("Changing x-queue-type argument is not permitted"); + } + arguments.put(key, value); + return this; + } + + /** + * Set the stream name + * + * @param name the stream name. + * @return the builder + */ + public SuperStreamBuilder name(String name) { + this.name = name; + return this; + } + + /** + * Set the partitions number + * + * @param partitions the partitions number + * @return the builder + */ + public SuperStreamBuilder partitions(int partitions) { + this.partitions = partitions; + return this; + } + + /** + * Set a strategy to determine routing keys to use for the + * partitions. The first parameter is the queue name, the second the number of + * partitions, the returned list must have a size equal to the partitions. + * + * @param routingKeyStrategy the strategy + * @return the builder + */ + public SuperStreamBuilder routingKeyStrategy(BiFunction> routingKeyStrategy) { + this.routingKeyStrategy = routingKeyStrategy; + return this; + } + + /** + * Builds a final Super Stream. + * + * @return the Super Stream instance + */ + public SuperStream build() { + if (!StringUtils.hasText(name)) { + throw new IllegalArgumentException("Stream name can't be empty"); + } + + if (partitions <= 0) { + throw new IllegalArgumentException( + String.format("Partitions number should be great then zero. Current value; %d", partitions) + ); + } + + if (routingKeyStrategy == null) { + return new SuperStream(name, partitions, arguments); + } + + return new SuperStream(this.name, this.partitions, routingKeyStrategy, arguments); + } +} From 2aba995f2f03f3bd0e665e02d2cd6ecb5843142f Mon Sep 17 00:00:00 2001 From: kurenchuksergey Date: Fri, 6 Oct 2023 09:44:53 +0200 Subject: [PATCH 2/4] Usability improvements: New SuperStream builder License --- .../rabbit/stream/config/SuperStreamBuilder.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java index fafeb94abf..0c1df39c15 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java @@ -1,3 +1,19 @@ +/* + * Copyright 2021-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.rabbit.stream.config; import org.springframework.util.StringUtils; From aa4ac8ed1e21d3a00fb74dbddb26d33f836b0533 Mon Sep 17 00:00:00 2001 From: kurenchuksergey Date: Tue, 10 Oct 2023 19:10:43 +0200 Subject: [PATCH 3/4] Usability improvements: New SuperStream builder Fix style tests and add a new one for the super stream builder --- .../rabbit/stream/config/SuperStream.java | 2 + .../stream/config/SuperStreamBuilder.java | 28 +-- .../config/SuperStreamConfigurationTest.java | 163 ++++++++++++++++++ 3 files changed, 179 insertions(+), 14 deletions(-) create mode 100644 spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamConfigurationTest.java diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java index 9a1a276f9e..9341c8eed8 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java @@ -55,6 +55,7 @@ public SuperStream(String name, int partitions) { * @param name the stream name. * @param partitions the number of partitions. * @param arguments the stream arguments + * @since 3.1 */ public SuperStream(String name, int partitions, Map arguments) { this(name, partitions, (q, i) -> IntStream.range(0, i) @@ -84,6 +85,7 @@ public SuperStream(String name, int partitions, BiFunction> routingKeyStrategy, Map arguments) { super(declarables(name, partitions, routingKeyStrategy, arguments)); diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java index 0c1df39c15..dd92af379f 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java @@ -16,19 +16,19 @@ package org.springframework.rabbit.stream.config; -import org.springframework.util.StringUtils; - import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; +import org.springframework.util.StringUtils; + /** * Builds a Spring AMQP Super Stream using a fluent API. * Based on Streams documentation * * @author Sergei Kurenchuk - * @since 3.1.0 + * @since 3.1 */ public class SuperStreamBuilder { private final Map arguments = new HashMap<>(); @@ -38,7 +38,7 @@ public class SuperStreamBuilder { private BiFunction> routingKeyStrategy; /** - * Creates a builder for Super Stream + * Creates a builder for Super Stream. * * @param name stream name * @return the builder @@ -50,7 +50,7 @@ public static SuperStreamBuilder superStream(String name) { } /** - * Creates a builder for Super Stream + * Creates a builder for Super Stream. * * @param name stream name * @param partitions partitions number @@ -103,12 +103,12 @@ public SuperStreamBuilder withArgument(String key, Object value) { if ("x-queue-type".equals(key) && !"stream".equals(value)) { throw new IllegalArgumentException("Changing x-queue-type argument is not permitted"); } - arguments.put(key, value); + this.arguments.put(key, value); return this; } /** - * Set the stream name + * Set the stream name. * * @param name the stream name. * @return the builder @@ -119,7 +119,7 @@ public SuperStreamBuilder name(String name) { } /** - * Set the partitions number + * Set the partitions number. * * @param partitions the partitions number * @return the builder @@ -148,20 +148,20 @@ public SuperStreamBuilder routingKeyStrategy(BiFunction testArguments = Map.of(argKey, argValue); + SuperStream superStream = new SuperStream("stream", partitions, testArguments); + + List streams = superStream.getDeclarablesByType(Queue.class); + Assertions.assertEquals(partitions, streams.size()); + + streams.forEach( + it -> { + Object value = it.getArguments().get(argKey); + Assertions.assertNotNull(value, "Arg value should be present"); + Assertions.assertEquals(argValue, value, "Value should be the same"); + } + ); + } + + @Test + void testCustomPartitionsRoutingStrategy() { + var streamName = "test-super-stream-name"; + var partitions = 3; + var names = List.of("test.stream.1", "test.stream.2", "test.stream.3"); + + SuperStream superStream = SuperStreamBuilder.superStream(streamName, partitions) + .routingKeyStrategy((name, partition) -> names) + .build(); + + List bindings = superStream.getDeclarablesByType(Binding.class); + Set routingKeys = bindings.stream().map(Binding::getRoutingKey).collect(Collectors.toSet()); + Assertions.assertTrue(routingKeys.containsAll(names)); + } + + @Test + void builderMustSetupNameAndPartitionsNumber() { + var name = "test-super-stream-name"; + var partitions = 3; + SuperStream superStream = SuperStreamBuilder.superStream(name, partitions).build(); + List streams = superStream.getDeclarablesByType(Queue.class); + Assertions.assertEquals(partitions, streams.size()); + + streams.forEach(it -> Assertions.assertTrue(it.getName().startsWith(name))); + } + + @Test + void builderMustSetupArguments() { + var finalPartitionsNumber = 4; + var finalName = "test-name"; + var maxAge = "1D"; + var maxLength = 10_000_000; + var maxSegmentsSize = 100_000; + + var testArgName = "test-key"; + var testArgValue = "test-value"; + + SuperStream superStream = SuperStreamBuilder.superStream("name", 3) + .partitions(finalPartitionsNumber) + .maxAge(maxAge) + .maxLength(maxLength) + .maxSegmentSize(maxSegmentsSize) + .name(finalName) + .withArgument(testArgName, testArgValue) + .build(); + + List streams = superStream.getDeclarablesByType(Queue.class); + + Assertions.assertEquals(finalPartitionsNumber, streams.size()); + streams.forEach( + it -> { + Assertions.assertTrue(it.getName().startsWith(finalName)); + Assertions.assertEquals(maxAge, it.getArguments().get("x-max-age")); + Assertions.assertEquals(maxLength, it.getArguments().get("max-length-bytes")); + Assertions.assertEquals(maxSegmentsSize, it.getArguments().get("x-stream-max-segment-size-bytes")); + Assertions.assertEquals(testArgValue, it.getArguments().get(testArgName)); + } + ); + } + + @Test + void builderShouldForbidInternalArgumentsChanges() { + SuperStreamBuilder builder = SuperStreamBuilder.superStream("name", 3); + + Assertions.assertThrows(IllegalArgumentException.class, () -> builder.withArgument("x-queue-type", "quorum")); + } + + @Test + void nameCantBeEmpty() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("", 3).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 3).name("").build() + ); + + Assertions.assertDoesNotThrow( + () -> SuperStreamBuilder.superStream("testName", 3).build() + ); + } + + @Test + void partitionsNumberShouldBeGreatThenZero() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 0).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", -1).build() + ); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> SuperStreamBuilder.superStream("testName", 1).partitions(0).build() + ); + + Assertions.assertDoesNotThrow( + () -> SuperStreamBuilder.superStream("testName", 1).build() + ); + } + +} From 0a2a5219d766292307d0a4775f1a0698c0f925fb Mon Sep 17 00:00:00 2001 From: kurenchuksergey Date: Wed, 11 Oct 2023 18:21:01 +0200 Subject: [PATCH 4/4] Usability improvements: New SuperStream builder Covered x-initial-cluster-size. Fixes after review --- .../rabbit/stream/config/SuperStream.java | 3 ++- .../stream/config/SuperStreamBuilder.java | 26 +++++++++---------- ...ava => SuperStreamConfigurationTests.java} | 5 +++- 3 files changed, 18 insertions(+), 16 deletions(-) rename spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/{SuperStreamConfigurationTest.java => SuperStreamConfigurationTests.java} (95%) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java index 9341c8eed8..6523ee8814 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ * Create Super Stream Topology {@link Declarable}s. * * @author Gary Russell + * @author Sergei Kurenchuk * @since 3.0 */ public class SuperStream extends Declarables { diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java index dd92af379f..b2a4b574a7 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStreamBuilder.java @@ -39,7 +39,6 @@ public class SuperStreamBuilder { /** * Creates a builder for Super Stream. - * * @param name stream name * @return the builder */ @@ -51,8 +50,7 @@ public static SuperStreamBuilder superStream(String name) { /** * Creates a builder for Super Stream. - * - * @param name stream name + * @param name stream name * @param partitions partitions number * @return the builder */ @@ -62,9 +60,7 @@ public static SuperStreamBuilder superStream(String name, int partitions) { /** * Set the maximum age retention per stream, which will remove the oldest data. - * - * @param maxAge valid units: Y, M, D, h, m, s - * e.g. 7D for a week + * @param maxAge valid units: Y, M, D, h, m, s. For example: "7D" for a week * @return the builder */ public SuperStreamBuilder maxAge(String maxAge) { @@ -74,7 +70,6 @@ public SuperStreamBuilder maxAge(String maxAge) { /** * Set the maximum log size as the retention configuration for each stream, * which will truncate the log based on the data size. - * * @param bytes the max total size in bytes * @return the builder */ @@ -84,7 +79,6 @@ public SuperStreamBuilder maxLength(int bytes) { /** * Set the maximum size limit for segment file. - * * @param bytes the max segments size in bytes * @return the builder */ @@ -92,10 +86,18 @@ public SuperStreamBuilder maxSegmentSize(int bytes) { return withArgument("x-stream-max-segment-size-bytes", bytes); } + /** + * Set initial replication factor for each partition. + * @param count number of nodes per partition + * @return the builder + */ + public SuperStreamBuilder initialClusterSize(int count) { + return withArgument("x-initial-cluster-size", count); + } + /** * Set extra argument which is not covered by builder's methods. - * - * @param key argument name + * @param key argument name * @param value argument value * @return the builder */ @@ -109,7 +111,6 @@ public SuperStreamBuilder withArgument(String key, Object value) { /** * Set the stream name. - * * @param name the stream name. * @return the builder */ @@ -120,7 +121,6 @@ public SuperStreamBuilder name(String name) { /** * Set the partitions number. - * * @param partitions the partitions number * @return the builder */ @@ -133,7 +133,6 @@ public SuperStreamBuilder partitions(int partitions) { * Set a strategy to determine routing keys to use for the * partitions. The first parameter is the queue name, the second the number of * partitions, the returned list must have a size equal to the partitions. - * * @param routingKeyStrategy the strategy * @return the builder */ @@ -144,7 +143,6 @@ public SuperStreamBuilder routingKeyStrategy(BiFunction