Skip to content

Commit

Permalink
SKYEDEN-3271 | consumer group cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Nov 28, 2024
1 parent f4099e4 commit e57248b
Show file tree
Hide file tree
Showing 27 changed files with 545 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR);

private final int httpCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ZookeeperPaths {
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";

private final String basePath;

Expand Down Expand Up @@ -182,6 +184,16 @@ public String offlineRetransmissionPath(String taskId) {
.join(basePath, OFFLINE_RETRANSMISSION_PATH, OFFLINE_RETRANSMISSION_TASKS_PATH, taskId);
}

public String consumerGroupToDeletePath() {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS);
}

public String consumerGroupToDeletePath(String taskId) {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS, taskId);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import pl.allegro.tech.hermes.common.kafka.KafkaConsumerPoolConfig;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.management.config.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.subscription.SubscriptionProperties;
import pl.allegro.tech.hermes.management.config.TopicProperties;
import pl.allegro.tech.hermes.management.domain.dc.DatacenterBoundRepositoryHolder;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
Expand All @@ -58,8 +57,6 @@ public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {

@Autowired SubscriptionProperties subscriptionProperties;

@Autowired CompositeMessageContentWrapper compositeMessageContentWrapper;

@Autowired ZookeeperRepositoryManager zookeeperRepositoryManager;

@Autowired MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
Expand Down Expand Up @@ -104,6 +101,7 @@ MultiDCAwareService multiDCAwareService(
new KafkaSingleMessageReader(
kafkaRawMessageReader, schemaRepository, jsonAvroConverter);
return new BrokersClusterService(
kafkaProperties.getDatacenter(),
kafkaProperties.getQualifiedClusterName(),
messageReader,
retransmissionService,
Expand All @@ -112,7 +110,8 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createConsumerGroupManager(
kafkaProperties, kafkaNamesMapper, brokerAdminClient),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());
Expand All @@ -126,13 +125,16 @@ MultiDCAwareService multiDCAwareService(
}

private ConsumerGroupManager createConsumerGroupManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper,
AdminClient kafkaAdminClient) {
return subscriptionProperties.isCreateConsumerGroupManuallyEnabled()
? new KafkaConsumerGroupManager(
kafkaNamesMapper,
kafkaProperties.getQualifiedClusterName(),
kafkaProperties.getBrokerList(),
kafkaProperties)
kafkaProperties,
kafkaAdminClient)
: new NoOpConsumerGroupManager();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import static java.util.stream.Collectors.toList;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand Down Expand Up @@ -123,11 +124,13 @@ public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
Clock clock) {
return new SubscriptionRemover(
auditor,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
subscriptionRepository,
clock);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import org.springframework.boot.context.properties.ConfigurationProperties;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.config;
package pl.allegro.tech.hermes.management.config.subscription;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupCleanUpService;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.ConsumerGroupToDeleteRepository;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperRepositoryManager;

@Configuration
@EnableConfigurationProperties(ConsumerGroupCleanUpProperties.class)
@ConditionalOnProperty(value = "consumer-group-clean-up.enabled", havingValue = "true")
public class ConsumerGroupCleanUpConfig {

@Autowired
ZookeeperRepositoryManager zookeeperRepositoryManager;

@Bean
ConsumerGroupCleanUpService consumerGroupCleanUpService(
MultiDCAwareService multiDCAwareService,
SubscriptionService subscriptionService,
ConsumerGroupCleanUpProperties properties,
Clock clock
) {
return new ConsumerGroupCleanUpService(
multiDCAwareService,
zookeeperRepositoryManager.getRepositoriesByType(ConsumerGroupToDeleteRepository.class),
subscriptionService,
properties,
clock
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pl.allegro.tech.hermes.management.config.subscription.consumergroup;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "consumer-group-clean-up")
public class ConsumerGroupCleanUpProperties {
private boolean enabled = true;
private Duration interval = Duration.ofMinutes(5);
private Duration initialDelay = Duration.ofMinutes(1);
private Duration timeout = Duration.ofHours(24);

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getInterval() {
return interval;
}

public void setInterval(Duration interval) {
this.interval = interval;
}

public Duration getInitialDelay() {
return initialDelay;
}

public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}

public Duration getTimeout() {
return timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package pl.allegro.tech.hermes.management.domain.subscription;

import java.time.Clock;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
Expand All @@ -12,6 +15,7 @@
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.subscription.consumergroup.command.ScheduleConsumerGroupToDeleteCommand;

public class SubscriptionRemover {

Expand All @@ -20,16 +24,19 @@ public class SubscriptionRemover {
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;
private final Clock clock;

public SubscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
SubscriptionRepository subscriptionRepository,
Clock clock) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
this.clock = clock;
}

public void removeSubscription(
Expand All @@ -40,6 +47,10 @@ public void removeSubscription(
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);
multiDcExecutor.executeByUser(
new ScheduleConsumerGroupToDeleteCommand(
new SubscriptionName(subscriptionName, topicName), Instant.now(clock)),
removedBy);
auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,8 @@ private List<SubscriptionNameWithMetrics> getSubscriptionsMetrics(
})
.collect(toList());
}

public boolean subscriptionExists(SubscriptionName subscriptionName) {
return subscriptionRepository.subscriptionExists(subscriptionName.getTopicName(), subscriptionName.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.allegro.tech.hermes.management.domain.subscription.consumergroup;

import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.management.domain.ManagementException;

import static java.lang.String.format;
import static pl.allegro.tech.hermes.api.ErrorCode.OTHER;

public class ConsumerGroupAlreadyScheduledToDeleteException extends ManagementException {

public ConsumerGroupAlreadyScheduledToDeleteException(
ConsumerGroupToDelete consumerGroupToDelete, Throwable e) {
super(
format(
"Consumer group already scheduled to delete, for subscription %s ",
consumerGroupToDelete.subscriptionName().getQualifiedName()),
e);
}

@Override
public ErrorCode getCode() {
return OTHER;
}
}
Loading

0 comments on commit e57248b

Please sign in to comment.