From 6339e3a6bf52fdaf1521b84fd80cd5273fd190fc Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 21 May 2024 14:14:39 -0600 Subject: [PATCH] KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure (#16002) This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs. Reviewers: Anna Sophie Blee-Goldman --- .../internals/StreamsPartitionAssignor.java | 11 +++++++++++ .../internals/assignment/ClientState.java | 14 ++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index c33f0b281f4f..4312445bae58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -38,6 +38,8 @@ import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment; import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; @@ -493,6 +495,15 @@ private ApplicationState buildApplicationState(final Map c ); } + private static void processStreamsPartitionAssignment(final Map clientMetadataMap, + final TaskAssignment taskAssignment) { + taskAssignment.assignment().forEach(kafkaStreamsAssignment -> { + final ProcessId processId = kafkaStreamsAssignment.processId(); + final ClientMetadata clientMetadata = clientMetadataMap.get(processId.id()); + clientMetadata.state.setAssignedTasks(kafkaStreamsAssignment); + }); + } + /** * Verify the subscription versions are within the expected bounds and check for version probing. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 8978628cc1fe..1d9d8c47a4e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -19,6 +19,7 @@ import java.util.SortedMap; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; import org.apache.kafka.streams.processor.internals.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,8 @@ import static java.util.Comparator.comparing; import static java.util.Comparator.comparingLong; import static org.apache.kafka.common.utils.Utils.union; +import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.ACTIVE; +import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.STANDBY; import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; public class ClientState { @@ -473,6 +476,17 @@ public SortedMap> taskIdsByPreviousConsumer() { return new TreeMap<>(consumerToPreviousStatefulTaskIds); } + public void setAssignedTasks(final KafkaStreamsAssignment assignment) { + final Set activeTasks = assignment.assignment().stream() + .filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id) + .collect(Collectors.toSet()); + final Set standbyTasks = assignment.assignment().stream() + .filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id) + .collect(Collectors.toSet()); + assignedActiveTasks.taskIds(activeTasks); + assignedStandbyTasks.taskIds(standbyTasks); + } + public String currentAssignment() { return "[activeTasks: (" + assignedActiveTasks.taskIds() + ") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";