Skip to content

Commit

Permalink
feat(backpressure): back pressure by system load (#2161)
Browse files Browse the repository at this point in the history
* feat(backpressure): init backpressure module

Signed-off-by: Ning Yu <[email protected]>

* feat(backpressure): implement `DefaultBackPressureManager`

Signed-off-by: Ning Yu <[email protected]>

* test(backpressure): test `DefaultBackPressureManager`

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 15, 2024
1 parent 4597b74 commit bcdd7e1
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 4 deletions.
24 changes: 24 additions & 0 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

package kafka.log.stream.s3;

import com.automq.stream.s3.backpressure.BackPressureManager;
import com.automq.stream.s3.backpressure.DefaultBackPressureManager;
import com.automq.stream.s3.backpressure.Regulator;
import kafka.autobalancer.metricsreporter.metric.Derivator;
import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.log.stream.s3.network.ControllerRequestSender;
Expand Down Expand Up @@ -86,6 +89,8 @@ public class DefaultS3Client implements Client {

protected CompactionManager compactionManager;

protected BackPressureManager backPressureManager;

protected S3StreamClient streamClient;

protected KVClient kvClient;
Expand Down Expand Up @@ -146,6 +151,7 @@ public void start() {
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
this.backPressureManager = new DefaultBackPressureManager(backPressureRegulator());
this.writeAheadLog = buildWAL();
StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
Expand All @@ -162,11 +168,13 @@ public void start() {

this.storage.startup();
this.compactionManager.start();
this.backPressureManager.start();
LOGGER.info("S3Client started");
}

@Override
public void shutdown() {
this.backPressureManager.shutdown();
this.compactionManager.shutdown();
this.streamClient.shutdown();
this.storage.shutdown();
Expand Down Expand Up @@ -227,6 +235,22 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
this::getAutoMQVersion, failoverMode);
}

protected Regulator backPressureRegulator() {
return new Regulator() {
@Override
public void increase() {
}

@Override
public void decrease() {
}

@Override
public void minimize() {
}
};
}

protected Failover failover() {
return new Failover(new FailoverFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;

import java.util.function.Supplier;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
public interface BackPressureManager {

/**
* Start the back pressure manager.
*/
void start();

/**
* Register a checker to check the load level of the system.
* Note: It should be called after {@link #start()} and before {@link #shutdown()}.
*
* @param source The source of the checker, which should be unique to identify the checker.
* @param checker The checker to check the load level of the system.
* @param intervalMs The interval in milliseconds to check the load level of the system.
*/
void registerChecker(String source, Supplier<LoadLevel> checker, long intervalMs);

/**
* Shutdown the back pressure manager, and release all resources.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;

import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBackPressureManager implements BackPressureManager {

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(20);

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

private final Regulator regulator;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private final long cooldownMs;

/**
* The scheduler to schedule the checker periodically.
* Package-private for testing.
*/
ScheduledExecutorService checkerScheduler;
/**
* The map to store the source and the most recent load level from the checker.
* Note: It should only be accessed in the {@link #checkerScheduler} thread.
*/
private final Map<String, LoadLevel> loadLevels = new HashMap<>();
/**
* The last time to trigger the regulator.
* Note: It should only be accessed in the {@link #checkerScheduler} thread.
*/
private long lastRegulateTime = System.currentTimeMillis();

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
}

public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
this.regulator = regulator;
this.cooldownMs = cooldownMs;
}

@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
}

@Override
public void registerChecker(String source, Supplier<LoadLevel> checker, long intervalMs) {
checkerScheduler.scheduleAtFixedRate(() -> {
loadLevels.put(source, checker.get());
maybeRegulate();
}, 0, intervalMs, TimeUnit.MILLISECONDS);
}

@Override
public void shutdown() {
ThreadUtils.shutdownExecutor(checkerScheduler, 1, TimeUnit.SECONDS);
}

private void maybeRegulate() {
maybeRegulate(false);
}

/**
* Regulate the system if necessary, which means
* <ul>
* <li>the system is in a {@link LoadLevel#CRITICAL} state.</li>
* <li>the cooldown time has passed.</li>
* </ul>
*
* @param isInternal True if it is an internal call, which means it should not schedule the next regulate action.
*/
private void maybeRegulate(boolean isInternal) {
LoadLevel loadLevel = currentLoadLevel();
long now = System.currentTimeMillis();

if (LoadLevel.CRITICAL.equals(loadLevel)) {
// Regulate immediately regardless of the cooldown time.
regulate(loadLevel, now);
return;
}

long timeElapsed = now - lastRegulateTime;
if (timeElapsed < cooldownMs) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
// Schedule the next regulate action if it is not an internal call.
checkerScheduler.schedule(() -> maybeRegulate(true), cooldownMs - timeElapsed, TimeUnit.MILLISECONDS);
}
return;
}

regulate(loadLevel, now);
}

/**
* Get the current load level of the system, which is, the maximum load level from all checkers.
*/
private LoadLevel currentLoadLevel() {
return loadLevels.values().stream()
.max(LoadLevel::compareTo)
.orElse(LoadLevel.NORMAL);
}

private void regulate(LoadLevel loadLevel, long now) {
loadLevel.regulate(regulator);
lastRegulateTime = now;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;

/**
* Represents the load level of the system.
* {@link BackPressureManager} will take actions based on the load level.
* Note: It MUST be ordered by the severity.
*/
public enum LoadLevel {
/**
* The system is in a normal state.
*/
NORMAL {
@Override
public void regulate(Regulator regulator) {
regulator.increase();
}
},
/**
* The system is in a high load state, and some actions should be taken to reduce the load.
*/
HIGH {
@Override
public void regulate(Regulator regulator) {
regulator.decrease();
}
},
/**
* The system is in a critical state, and the most severe actions should be taken.
*/
CRITICAL {
@Override
public void regulate(Regulator regulator) {
regulator.minimize();
}
};

/**
* Take actions based on the load level.
*/
public abstract void regulate(Regulator regulator);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;

/**
* The Regulator class is responsible for controlling and limiting the rate of external requests.
* It provides methods to increase, decrease, and minimize the flow of incoming requests.
*/
public interface Regulator {

/**
* Increase the rate of incoming requests.
* If the rate is already at the maximum, this method does nothing.
*/
void increase();

/**
* Decrease the rate of incoming requests.
* If the rate is already at the minimum, this method does nothing.
*/
void decrease();

/**
* Minimize the rate of incoming requests.
*/
void minimize();
}
38 changes: 34 additions & 4 deletions s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@

package com.automq.stream.utils;

import org.slf4j.Logger;

import io.netty.util.concurrent.FastThreadLocalThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.helpers.NOPLogger;

/**
* Utilities for working with threads.
Expand Down Expand Up @@ -79,4 +80,33 @@ public static Runnable wrapRunnable(Runnable runnable, Logger logger) {
}
};
}

/**
* A wrapper of {@link #shutdownExecutor} without logging.
*/
public static void shutdownExecutor(ExecutorService executorService, long timeout, TimeUnit timeUnit) {
shutdownExecutor(executorService, timeout, timeUnit, NOPLogger.NOP_LOGGER);
}

/**
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
* After the timeout/on interrupt, the service is forcefully closed.
*/
public static void shutdownExecutor(ExecutorService executorService, long timeout, TimeUnit timeUnit,
Logger logger) {
if (null == executorService) {
return;
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(timeout, timeUnit)) {
executorService.shutdownNow();
logger.error("Executor {} did not terminate in time, forcefully shutting down", executorService);
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Loading

0 comments on commit bcdd7e1

Please sign in to comment.