Skip to content

Commit

Permalink
Fix meaningless thread creation every time checkpoint mysql load
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Oct 30, 2023
1 parent b4dba69 commit bd6c3f7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,8 @@ private void startMasterOnlyDaemonThreads() {

// start threads that should running on all FE
private void startNonMasterDaemonThreads() {
// start load manager thread
loadManager.start();
tabletStatMgr.start();
// load and export job label cleaner thread
labelCleaner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public LoadManager(LoadJobScheduler loadJobScheduler) {
this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
}

public void start() {
tokenManager.start();
mysqlLoadManager.start();
}

/**
* This method will be invoked by the broker load(v2) now.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
public class MysqlLoadManager {
private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);

private final ThreadPoolExecutor mysqlLoadPool;
private ThreadPoolExecutor mysqlLoadPool;
private final TokenManager tokenManager;

private static class MySqlLoadContext {
Expand Down Expand Up @@ -138,16 +138,19 @@ public boolean isExpired() {
}

private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
private EvictingQueue<MySqlLoadFailRecord> failedRecords;
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1,
new CustomThreadFactory("mysql-load-fail-record-cleaner"));

public MysqlLoadManager(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}

public void start() {
int poolSize = Config.mysql_load_thread_pool;
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
"Mysql Load", true);
this.tokenManager = tokenManager;
this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record);
this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
public class TokenManager {
private static final Logger LOG = LogManager.getLogger(TokenManager.class);

private final int thriftTimeoutMs = 300 * 1000;
private final EvictingQueue<String> tokenQueue;
private final ScheduledExecutorService tokenGenerator;
private int thriftTimeoutMs = 300 * 1000;
private EvictingQueue<String> tokenQueue;
private ScheduledExecutorService tokenGenerator;

public TokenManager() {
}

public void start() {
this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
// init one token to avoid async issue.
this.tokenQueue.offer(generateNewToken());
Expand Down

0 comments on commit bd6c3f7

Please sign in to comment.