Skip to content

Commit

Permalink
[Enhancement] Introduce TxnStateDispatcher for merge commit sync mode S…
Browse files Browse the repository at this point in the history
…tarRocks#55001 (StarRocks#55071)

This is the second PR of merge commit sync mode optimization StarRocks#54995. Introduce TxnStateDispatcher on FE side. You can see StarRocks#54995 for details

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Jan 15, 2025
1 parent 3015cd7 commit d650aa8
Show file tree
Hide file tree
Showing 17 changed files with 846 additions and 37 deletions.
26 changes: 17 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3366,12 +3366,26 @@ public class Config extends ConfigBase {
public static int lake_remove_table_thread_num = 4;

@ConfField(mutable = true)
public static int batch_write_gc_check_interval_ms = 60000;
public static int merge_commit_gc_check_interval_ms = 60000;

@ConfField(mutable = true)
public static int batch_write_idle_ms = 3600000;
public static int merge_commit_idle_ms = 3600000;

public static int batch_write_executor_threads_num = 4096;
@ConfField(mutable = false)
public static int merge_commit_executor_threads_num = 4096;

@ConfField(mutable = true)
public static int merge_commit_txn_state_dispatch_retry_times = 3;

@ConfField(mutable = true)
public static int merge_commit_txn_state_dispatch_retry_interval_ms = 200;

@ConfField(mutable = true)
public static int merge_commit_be_assigner_schedule_interval_ms = 5000;

@ConfField(mutable = true, comment = "Defines the maximum balance factor allowed " +
"between any two nodes before triggering a balance")
public static double merge_commit_be_assigner_balance_factor_threshold = 0.1;

/**
* Enable Arrow Flight SQL server only when the port is set to positive value.
Expand All @@ -3385,12 +3399,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int arrow_token_cache_expire = 3600;

public static int batch_write_be_assigner_schedule_interval_ms = 5000;

@ConfField(mutable = true, comment = "Defines the maximum balance factor allowed " +
"between any two nodes before triggering a balance")
public static double batch_write_be_assigner_balance_factor_threshold = 0.1;

@ConfField(mutable = false)
public static int query_deploy_threadpool_size = max(50, getRuntime().availableProcessors() * 10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ public class BatchWriteMgr extends FrontendDaemon {
// A thread pool executor for executing batch write tasks.
private final ThreadPoolExecutor threadPoolExecutor;

private final TxnStateDispatcher txnStateDispatcher;

public BatchWriteMgr() {
super("group-commit-mgr", Config.batch_write_gc_check_interval_ms);
super("merge-commit-mgr", Config.merge_commit_gc_check_interval_ms);
this.idGenerator = new AtomicLong(0L);
this.isomorphicBatchWriteMap = new ConcurrentHashMap<>();
this.lock = new ReentrantReadWriteLock();
this.coordinatorBackendAssigner = new CoordinatorBackendAssignerImpl();
this.threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(
Config.batch_write_executor_threads_num, "batch-write-load", true);
Config.merge_commit_executor_threads_num, "batch-write-load", true);
this.txnStateDispatcher = new TxnStateDispatcher(threadPoolExecutor);
}

@Override
Expand All @@ -79,7 +82,7 @@ public synchronized void start() {

@Override
protected void runAfterCatalogReady() {
setInterval(Config.batch_write_gc_check_interval_ms);
setInterval(Config.merge_commit_gc_check_interval_ms);
cleanupInactiveBatchWrite();
}

Expand Down Expand Up @@ -194,7 +197,7 @@ private Pair<TStatus, IsomorphicBatchWrite> getOrCreateTableBatchWrite(TableId t
long id = idGenerator.getAndIncrement();
IsomorphicBatchWrite newLoad = new IsomorphicBatchWrite(
id, tableId, warehouseName, streamLoadInfo, batchWriteIntervalMs, batchWriteParallel,
params, coordinatorBackendAssigner, threadPoolExecutor);
params, coordinatorBackendAssigner, threadPoolExecutor, txnStateDispatcher);
coordinatorBackendAssigner.registerBatchWrite(id, newLoad.getWarehouseId(), tableId,
newLoad.getBatchWriteParallel());
return newLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void runSchedule() {
checkIntervalMs = Integer.MAX_VALUE;
LOG.info("Disable periodical schedule because there is no load");
} else {
checkIntervalMs = Math.max(MIN_CHECK_INTERVAL_MS, Config.batch_write_be_assigner_schedule_interval_ms);
checkIntervalMs = Math.max(MIN_CHECK_INTERVAL_MS, Config.merge_commit_be_assigner_schedule_interval_ms);
LOG.debug("Set schedule interval to {} ms", checkIntervalMs);
}
task = taskPriorityQueue.poll(checkIntervalMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -379,7 +379,7 @@ void runPeriodicalCheck() {
LOG.info("Remove empty warehouse {}", warehouseMeta.warehouseId);
} else {
checkNodeStatusAndReassignment(warehouseMeta);
doBalanceIfNeeded(warehouseMeta, Config.batch_write_be_assigner_balance_factor_threshold);
doBalanceIfNeeded(warehouseMeta, Config.merge_commit_be_assigner_balance_factor_threshold);
if (LOG.isDebugEnabled()) {
logStatistics(warehouseMeta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ public class IsomorphicBatchWrite implements LoadExecuteCallback {

private static final Logger LOG = LoggerFactory.getLogger(IsomorphicBatchWrite.class);

private static final String LABEL_PREFIX = "batch_write_";
private static final String LABEL_PREFIX = "merge_commit_";

private final long id;
private final TableId tableId;
private final String warehouseName;
private final StreamLoadInfo streamLoadInfo;
private final int batchWriteIntervalMs;
private final int batchWriteParallel;
private final boolean asyncMode;
private final StreamLoadKvParams loadParameters;

/**
Expand All @@ -69,6 +70,9 @@ public class IsomorphicBatchWrite implements LoadExecuteCallback {
*/
private final Executor executor;

/** Update the transaction state of the backend if this is a sync mode. */
private final TxnStateDispatcher txnUpdateDispatch;

/**
* The factory to create query coordinators.
*/
Expand All @@ -95,16 +99,19 @@ public IsomorphicBatchWrite(
int batchWriteParallel,
StreamLoadKvParams loadParameters,
CoordinatorBackendAssigner coordinatorBackendAssigner,
Executor executor) {
Executor executor,
TxnStateDispatcher txnUpdateDispatch) {
this.id = id;
this.tableId = tableId;
this.warehouseName = warehouseName;
this.streamLoadInfo = streamLoadInfo;
this.batchWriteIntervalMs = batchWriteIntervalMs;
this.batchWriteParallel = batchWriteParallel;
this.asyncMode = loadParameters.getBatchWriteAsync().orElse(false);
this.loadParameters = loadParameters;
this.coordinatorBackendAssigner = coordinatorBackendAssigner;
this.executor = executor;
this.txnUpdateDispatch = txnUpdateDispatch;
this.queryCoordinatorFactory = new DefaultCoordinator.Factory();
this.loadExecutorMap = new ConcurrentHashMap<>();
this.lock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -244,17 +251,29 @@ public RequestLoadResult requestLoad(long backendId, String backendHost) {
*/
public boolean isActive() {
long idleTime = System.currentTimeMillis() - lastLoadCreateTimeMs.get();
return !loadExecutorMap.isEmpty() || idleTime < Config.batch_write_idle_ms;
return !loadExecutorMap.isEmpty() || idleTime < Config.merge_commit_idle_ms;
}

@Override
public void finishLoad(String label) {
public void finishLoad(LoadExecutor executor) {
lock.writeLock().lock();
try {
loadExecutorMap.remove(label);
loadExecutorMap.remove(executor.getLabel());
} finally {
lock.writeLock().unlock();
}

long txnId = executor.getTxnId();
if (!asyncMode && txnId > 0) {
for (long backendId : executor.getBackendIds()) {
try {
txnUpdateDispatch.submitTask(tableId.getDbName(), txnId, backendId);
} catch (Exception e) {
LOG.error("Fail to submit transaction state update task, db: {}, txn_id: {}, backend id: {}",
tableId.getDbName(), txnId, backendId, e);
}
}
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface LoadExecuteCallback {
/**
* Called when the load operation is finished.
*
* @param label The label associated with the load operation.
* @param loadExecutor The executor associated with the load operation.
*/
void finishLoad(String label);
void finishLoad(LoadExecutor loadExecutor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void run() {
LOG.error("Failed to execute load, label: {}, load id: {}, txn id: {}",
label, DebugUtil.printId(loadId), txnId, e);
} finally {
loadExecuteCallback.finishLoad(label);
loadExecuteCallback.finishLoad(this);
timeTrace.finishTimeMs = System.currentTimeMillis();
LOG.debug("Finish load, label: {}, load id: {}, txn_id: {}, {}",
label, DebugUtil.printId(loadId), txnId, timeTrace.summary());
Expand All @@ -125,6 +126,14 @@ public String getLabel() {
return label;
}

public long getTxnId() {
return txnId;
}

public Set<Long> getBackendIds() {
return Collections.unmodifiableSet(coordinatorBackendIds);
}

/**
* Checks if the given backend id is contained in the coordinator backend IDs.
*/
Expand Down Expand Up @@ -159,11 +168,11 @@ private void beginTxn() throws Exception {

private void commitAndPublishTxn() throws Exception {
timeTrace.commitTxnTimeMs = System.currentTimeMillis();
Pair<Database, OlapTable> pair = getDbAndTable();
Database database = getDb();
long publishTimeoutMs =
streamLoadInfo.getTimeout() * 1000L - (timeTrace.commitTxnTimeMs - timeTrace.beginTxnTimeMs);
boolean publishSuccess = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().commitAndPublishTransaction(
pair.first, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null);
database, txnId, tabletCommitInfo, tabletFailInfo, publishTimeoutMs, null);
if (!publishSuccess) {
LOG.warn("Publish timeout, txn_id: {}, label: {}, total timeout: {} ms, publish timeout: {} ms",
txnId, label, streamLoadInfo.getTimeout() * 1000, publishTimeoutMs);
Expand All @@ -175,9 +184,9 @@ private void abortTxn(Throwable reason) {
return;
}
try {
Pair<Database, OlapTable> pair = getDbAndTable();
Database database = getDb();
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction(
pair.first.getId(), txnId, reason == null ? "" : reason.getMessage());
database.getId(), txnId, reason == null ? "" : reason.getMessage());
} catch (Exception e) {
LOG.error("Failed to abort transaction {}", txnId, e);
}
Expand Down Expand Up @@ -253,6 +262,16 @@ private void executeLoad() throws Exception {
}
}

private Database getDb() throws Exception {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName());
if (db == null) {
throw new LoadException(String.format("Database %s does not exist", tableId.getDbName()));
}

return db;
}

private Pair<Database, OlapTable> getDbAndTable() throws Exception {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Database db = globalStateMgr.getLocalMetastore().getDb(tableId.getDbName());
Expand Down
Loading

0 comments on commit d650aa8

Please sign in to comment.