Skip to content

Commit

Permalink
KAFKA-15045: (KIP-924 pt. 6) Post process new assignment structure (a…
Browse files Browse the repository at this point in the history
…pache#16002)

This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
apourchet authored May 21, 2024
1 parent 4cc99cb commit 6339e3a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
Expand Down Expand Up @@ -493,6 +495,15 @@ private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> c
);
}

private static void processStreamsPartitionAssignment(final Map<UUID, ClientMetadata> clientMetadataMap,
final TaskAssignment taskAssignment) {
taskAssignment.assignment().forEach(kafkaStreamsAssignment -> {
final ProcessId processId = kafkaStreamsAssignment.processId();
final ClientMetadata clientMetadata = clientMetadataMap.get(processId.id());
clientMetadata.state.setAssignedTasks(kafkaStreamsAssignment);
});
}

/**
* Verify the subscription versions are within the expected bounds and check for version probing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.SortedMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +42,8 @@
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingLong;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
import static org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;

public class ClientState {
Expand Down Expand Up @@ -473,6 +476,17 @@ public SortedMap<String, Set<TaskId>> taskIdsByPreviousConsumer() {
return new TreeMap<>(consumerToPreviousStatefulTaskIds);
}

public void setAssignedTasks(final KafkaStreamsAssignment assignment) {
final Set<TaskId> activeTasks = assignment.assignment().stream()
.filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
final Set<TaskId> standbyTasks = assignment.assignment().stream()
.filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
assignedActiveTasks.taskIds(activeTasks);
assignedStandbyTasks.taskIds(standbyTasks);
}

public String currentAssignment() {
return "[activeTasks: (" + assignedActiveTasks.taskIds() +
") standbyTasks: (" + assignedStandbyTasks.taskIds() + ")]";
Expand Down

0 comments on commit 6339e3a

Please sign in to comment.