Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch committed Jan 14, 2025
1 parent ffb9b98 commit 3ec6546
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,15 @@ public synchronized List<PhysicalPartition> getPhysicalPartitions(long tableId)
.collect(Collectors.toList());
}

private synchronized long getAdjustedRecycleTimestamp(long id, long currentTimeMs) {
long originalRecycleTime = idToRecycleTime.get(id);
if (originalRecycleTime >
GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot()) {
return currentTimeMs; // can not be erased
private synchronized boolean checkValidDeletionByClusterSnapshot(long id) {
if (!idToRecycleTime.containsKey(id)) {
return true;
}
return idToRecycleTime.get(id) > GlobalStateMgr.getCurrentState()
.getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot();
}

private synchronized long getAdjustedRecycleTimestamp(long id) {
Map<Long, RecycleTableInfo> idToRecycleTableInfo = Maps.newHashMap();
for (Map<Long, RecycleTableInfo> tableEntry : idToTableInfo.rowMap().values()) {
for (Map.Entry<Long, RecycleTableInfo> entry : tableEntry.entrySet()) {
Expand All @@ -323,15 +325,15 @@ private synchronized long getAdjustedRecycleTimestamp(long id, long currentTimeM
return 0;
}

return originalRecycleTime;
return idToRecycleTime.get(id);
}

/**
* if we can erase this instance, we should check if anyone enable erase later.
* Only used by main loop.
*/
private synchronized boolean timeExpired(long id, long currentTimeMs) {
long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id, currentTimeMs);
long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id);
long expireMs = max(Config.catalog_trash_expire_second * 1000L, MIN_ERASE_LATENCY);
if (enableEraseLater.contains(id)) {
// if enableEraseLater is set, extend the timeout by LATE_RECYCLE_INTERVAL_SECONDS
Expand All @@ -341,6 +343,10 @@ private synchronized boolean timeExpired(long id, long currentTimeMs) {
}

private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(tableInfo.getTable().getId())) {
return false;
}

if (timeExpired(tableInfo.getTable().getId(), currentTimeMs)) {
return true;
}
Expand All @@ -353,6 +359,10 @@ private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long curr
}

private synchronized boolean canErasePartition(RecyclePartitionInfo partitionInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(partitionInfo.getPartition().getId())) {
return false;
}

if (timeExpired(partitionInfo.getPartition().getId(), currentTimeMs)) {
return true;
}
Expand Down Expand Up @@ -381,11 +391,15 @@ public synchronized boolean ensureEraseLater(long id, long currentTimeMs) {
return false;
}
// 2. will expire after quite a long time, don't worry
long latency = currentTimeMs - getAdjustedRecycleTimestamp(id, currentTimeMs);
long latency = currentTimeMs - getAdjustedRecycleTimestamp(id);
if (latency < (Config.catalog_trash_expire_second - LATE_RECYCLE_INTERVAL_SECONDS) * 1000L) {
return true;
}
// 3. already expired, sorry.
// 3. check valid by cluster snapshot
if (!checkValidDeletionByClusterSnapshot(id)) {
return true;
}
// 4. already expired, sorry.
if (latency > Config.catalog_trash_expire_second * 1000L) {
return false;
}
Expand All @@ -400,7 +414,7 @@ protected synchronized void eraseDatabase(long currentTimeMs) {
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
if (timeExpired(db.getId(), currentTimeMs)) {
if (timeExpired(db.getId(), currentTimeMs) && checkValidDeletionByClusterSnapshot(db.getId())) {
// erase db
dbIter.remove();
removeRecycleMarkers(entry.getKey());
Expand Down Expand Up @@ -982,6 +996,7 @@ protected void runAfterCatalogReady() {
long currentTimeMs = System.currentTimeMillis();
// should follow the partition/table/db order
// in case of partition(table) is still in recycle bin but table(db) is missing
LOG.info("catalog recycle begin");
try {
erasePartition(currentTimeMs);
// synchronized is unfair lock, sleep here allows other high-priority operations to obtain a lock
Expand All @@ -993,6 +1008,7 @@ protected void runAfterCatalogReady() {
} catch (InterruptedException e) {
LOG.warn("Failed to execute runAfterCatalogReady", e);
}
LOG.info("catalog recycle finished");
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClusterSnapshotCheckpointScheduler extends FrontendDaemon {
private final CheckpointController feController;
private final CheckpointController starMgrController;

private boolean hasfinished = false;

public ClusterSnapshotCheckpointScheduler(CheckpointController feController, CheckpointController starMgrController) {
super("cluster_snapshot_checkpoint_scheduler", Config.automated_cluster_snapshot_interval_seconds * 1000L);
this.feController = feController;
Expand All @@ -45,6 +47,10 @@ protected void runAfterCatalogReady() {
return;
}

if (hasfinished) {
return;
}

CheckpointController.exclusiveLock();
try {
runCheckpointScheduler();
Expand Down Expand Up @@ -115,6 +121,7 @@ protected void runCheckpointScheduler() {
job.addAutomatedClusterSnapshot();
LOG.info("Finish Cluster Snapshot checkpoint, FE checkpoint journal Id: {}, StarMgr checkpoint journal Id: {}",
job.getFeJournalId(), job.getStarMgrJournalId());
hasfinished = true;
}
}

Expand Down

0 comments on commit 3ec6546

Please sign in to comment.