Skip to content

Commit

Permalink
refactor(backpressure): introduce interface Checker (#2162)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 15, 2024
1 parent bcdd7e1 commit 7d6c2c9
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

package com.automq.stream.s3.backpressure;

import java.util.function.Supplier;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
Expand All @@ -26,13 +24,9 @@ public interface BackPressureManager {

/**
* Register a checker to check the load level of the system.
* Note: It should be called after {@link #start()} and before {@link #shutdown()}.
*
* @param source The source of the checker, which should be unique to identify the checker.
* @param checker The checker to check the load level of the system.
* @param intervalMs The interval in milliseconds to check the load level of the system.
* Note: It should be called between {@link #start()} and {@link #shutdown()}.
*/
void registerChecker(String source, Supplier<LoadLevel> checker, long intervalMs);
void registerChecker(Checker checker);

/**
* Shutdown the back pressure manager, and release all resources.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;

/**
* A checker to check the load level of the system periodically.
*/
public interface Checker {

/**
* The source of the checker, which should be unique to identify the checker.
*/
String source();

/**
* Check the load level of the system.
*/
LoadLevel check();

/**
* The interval in milliseconds to check the load level of the system.
*/
long intervalMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,11 +63,11 @@ public void start() {
}

@Override
public void registerChecker(String source, Supplier<LoadLevel> checker, long intervalMs) {
public void registerChecker(Checker checker) {
checkerScheduler.scheduleAtFixedRate(() -> {
loadLevels.put(source, checker.get());
loadLevels.put(checker.source(), checker.check());
maybeRegulate();
}, 0, intervalMs, TimeUnit.MILLISECONDS);
}, 0, checker.intervalMs(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,22 @@ private void initManager(long cooldownMs) {
}

private void callChecker(String source, LoadLevel level) {
manager.registerChecker(source, () -> level, 1);
manager.registerChecker(new Checker() {
@Override
public String source() {
return source;
}

@Override
public LoadLevel check() {
return level;
}

@Override
public long intervalMs() {
return 1;
}
});
}

private void assertRegulatorCalled(int increase, int decrease, int minimize) {
Expand Down

0 comments on commit 7d6c2c9

Please sign in to comment.