Skip to content

Commit

Permalink
SKYEDEN-3271 | new approach
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Nov 26, 2024
1 parent 9a666af commit 94c237b
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);

private final int httpCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ public String toString() {
return "Subscription(" + getQualifiedName() + ")";
}

public static SubscriptionName getSubscriptionNameFromString(String input) {
if (!input.startsWith("Subscription(") || !input.endsWith(")")) {
throw new IllegalArgumentException("Invalid input: " + input);
}
return SubscriptionName.fromString(
input.replaceFirst("^Subscription\\(", "").replaceFirst("\\)$", ""));
}

public enum State {
PENDING,
ACTIVE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pl.allegro.tech.hermes.management.api;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import org.springframework.beans.factory.annotation.Autowired;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.infrastructure.audit.AuditEvent;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.management.infrastructure.audit.AuditEventType.REMOVED;

@Path("subscriptions/termination-cleanup")
public class SubscriptionTerminationCleanupEndpoint {

private final SubscriptionService subscriptionService;

@Autowired
public SubscriptionTerminationCleanupEndpoint(SubscriptionService subscriptionService) {
this.subscriptionService = subscriptionService;
}

@POST
@Consumes(APPLICATION_JSON)
@Path("/from-audit-event")
public Response subscriptionTerminationCleanup(AuditEvent auditEvent) {
if (auditEvent.getEventType() != REMOVED
|| !auditEvent.getPayloadClass().equals(Subscription.class.getName())) {
return Response.ok().build();
}

SubscriptionName subscriptionName =
Subscription.getSubscriptionNameFromString(auditEvent.getResourceName());
subscriptionService.subscriptionTerminationCleanup(subscriptionName);
return Response.ok().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,9 @@ public SubscriptionService subscriptionService(
public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDCAwareService multiDCAwareService,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
return new SubscriptionRemover(
auditor,
multiDCAwareService,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
auditor, multiDCAwareService, subscriptionOwnerCache, subscriptionRepository);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package pl.allegro.tech.hermes.management.domain.subscription;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;

public interface ConsumerGroupManager {

void createConsumerGroup(Topic topic, Subscription subscription);

void deleteConsumerGroup(Topic topic, Subscription subscription);
void deleteConsumerGroup(SubscriptionName subscriptionName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,43 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.topic.TopicNotEmptyException;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

public class SubscriptionRemover {

private static final Logger logger = LoggerFactory.getLogger(SubscriptionRemover.class);
private final Auditor auditor;
private final MultiDCAwareService multiDCAwareService;
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;

public SubscriptionRemover(
Auditor auditor,
MultiDCAwareService multiDCAwareService,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
this.auditor = auditor;
this.multiDCAwareService = multiDCAwareService;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
}

public void removeSubscription(Topic topic, Subscription subscription, RequestUser removedBy) {
public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
auditor.beforeObjectRemoval(
removedBy.getUsername(), Subscription.class.getSimpleName(), subscription.getName());
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topic.getName(), subscription.getName()),
removedBy);
multiDCAwareService.deleteConsumerGroups(topic, subscription); // to modify
removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName);
Subscription subscription =
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
multiDCAwareService.removeSubscription(topicName, subscriptionName, removedBy);
auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscription.getName(), topic.getName());
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
}

public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) {
Expand All @@ -53,13 +49,17 @@ public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy)
logger.info(
"Removing subscriptions of topic: {}, subscriptions: {}", topic.getName(), subscriptions);
long start = System.currentTimeMillis();
subscriptions.forEach(sub -> removeSubscription(topic, sub, removedBy));
subscriptions.forEach(sub -> removeSubscription(topic.getName(), sub.getName(), removedBy));
logger.info(
"Removed subscriptions of topic: {} in {} ms",
topic.getName(),
System.currentTimeMillis() - start);
}

public void cleanupAfterSubscription(SubscriptionName subscriptionName) {
multiDCAwareService.deleteConsumerGroups(subscriptionName);
}

private void ensureSubscriptionsHaveAutoRemove(
List<Subscription> subscriptions, TopicName topicName) {
boolean anySubscriptionWithoutAutoRemove =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,19 @@ private Set<Subscription.State> loadSubscriptionStatesFromAllDc(

public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
Topic topic = topicService.getTopicDetails(topicName);
Subscription subscription =
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
subscriptionRemover.removeSubscription(topic, subscription, removedBy);
subscriptionRemover.removeSubscription(topicName, subscriptionName, removedBy);
}

public void subscriptionTerminationCleanup(SubscriptionName subscriptionName) {
if (subscriptionRepository.subscriptionExists(
subscriptionName.getTopicName(), subscriptionName.getName())) {
logger.info(
"Aborting subscription termination cleanup for subscription {} as it still exists.",
subscriptionName);
return;
}

subscriptionRemover.cleanupAfterSubscription(subscriptionName);
}

public void updateSubscription(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.management.domain.ManagementException;

import static java.lang.String.format;
import static pl.allegro.tech.hermes.api.ErrorCode.CONSUMER_GROUP_DELETION_ERROR;

public class ConsumerGroupDeletionException extends ManagementException {

public ConsumerGroupDeletionException(SubscriptionName subscriptionName, Throwable e) {
super(format("Failed to delete consumer group, for subscription %s ", subscriptionName), e);
}

@Override
public ErrorCode getCode() {
return CONSUMER_GROUP_DELETION_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
Expand Down Expand Up @@ -113,12 +115,18 @@ public void removeTopicByName(String topicName) {
clusters.forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName));
}

public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);
}

public void createConsumerGroups(Topic topic, Subscription subscription) {
clusters.forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription));
}

public void deleteConsumerGroups(Topic topic, Subscription subscription) {
clusters.forEach(clusterService -> clusterService.deleteConsumerGroup(topic, subscription));
public void deleteConsumerGroups(SubscriptionName subscriptionName) {
clusters.forEach(clusterService -> clusterService.deleteConsumerGroup(subscriptionName));
}

private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
consumerGroupManager.createConsumerGroup(topic, subscription);
}

public void deleteConsumerGroup(Topic topic, Subscription subscription) {
consumerGroupManager.deleteConsumerGroup(topic, subscription);
public void deleteConsumerGroup(SubscriptionName subscriptionName) {
consumerGroupManager.deleteConsumerGroup(subscriptionName);
}

public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscriptionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.ConsumerGroupDeletionException;

public class KafkaConsumerGroupManager implements ConsumerGroupManager {

Expand Down Expand Up @@ -88,29 +92,38 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
}

@Override
public void deleteConsumerGroup(Topic topic, Subscription subscription) {
public void deleteConsumerGroup(SubscriptionName subscriptionName)
throws ConsumerGroupDeletionException {
logger.info(
"Deleting consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
clusterName);
"Deleting consumer group for subscription {}, cluster: {}", subscriptionName, clusterName);

try {
ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscription.getQualifiedName());
ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscriptionName);
kafkaAdminClient
.deleteConsumerGroups(Collections.singletonList(groupId.asString()))
.all()
.get();

logger.info(
"Successfully deleted consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
subscriptionName,
clusterName);
} catch (Exception e) {

} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof GroupIdNotFoundException) {
logger.info(
"Consumer group for subscription {} not found, cluster: {}",
subscriptionName,
clusterName);
return;
}

logger.error(
"Failed to delete consumer group for subscription {}, cluster: {}",
subscription.getQualifiedName(),
subscriptionName,
clusterName,
e);
throw new ConsumerGroupDeletionException(subscriptionName, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;

Expand All @@ -11,7 +12,7 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
}

@Override
public void deleteConsumerGroup(Topic topic, Subscription subscription) {
public void deleteConsumerGroup(SubscriptionName subscriptionName) {
// no operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ class KafkaConsumerGroupManagerSpec extends Specification {
consumerGroupManager.createConsumerGroup(topic, subscriptionToDelete)

when:
consumerGroupManager.deleteConsumerGroup(topic, subscriptionToDelete)
consumerGroupManager.deleteConsumerGroup(subscriptionToDelete.getQualifiedName())

then:
adminClient.listConsumerGroups().all().get().collect { it.groupId() } == [consumerGroupId.asString()]

cleanup:
consumerGroupManager.deleteConsumerGroup(topic, subscription)
consumerGroupManager.deleteConsumerGroup(subscription.getQualifiedName())
deleteKafkaTopic(kafkaTopicName)
}

Expand All @@ -195,7 +195,7 @@ class KafkaConsumerGroupManagerSpec extends Specification {
consumerGroupManager.createConsumerGroup(topic, subscription)

when:
consumerGroupManager.deleteConsumerGroup(topic, createTestSubscription(topic, "test-subscription-to-delete"))
consumerGroupManager.deleteConsumerGroup(createTestSubscription(topic, "test-subscription-to-delete").getQualifiedName())

then:
noExceptionThrown()
Expand All @@ -213,7 +213,7 @@ class KafkaConsumerGroupManagerSpec extends Specification {
Subscription subscription = createTestSubscription(topic, "test-subscription-to-delete")

when:
consumerGroupManager.deleteConsumerGroup(topic, subscription)
consumerGroupManager.deleteConsumerGroup(subscription.getQualifiedName())

then:
noExceptionThrown()
Expand Down

0 comments on commit 94c237b

Please sign in to comment.