From 3ec6546aa913df44dd9d5792cff12fbda920840d Mon Sep 17 00:00:00 2001 From: srlch Date: Tue, 14 Jan 2025 14:17:43 +0800 Subject: [PATCH] fix Signed-off-by: srlch --- .../starrocks/catalog/CatalogRecycleBin.java | 36 +++++++++++++------ .../ClusterSnapshotCheckpointScheduler.java | 7 ++++ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/com/starrocks/catalog/CatalogRecycleBin.java index 689a053e8a9827..1c604cc3485dee 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/CatalogRecycleBin.java @@ -299,13 +299,15 @@ public synchronized List getPhysicalPartitions(long tableId) .collect(Collectors.toList()); } - private synchronized long getAdjustedRecycleTimestamp(long id, long currentTimeMs) { - long originalRecycleTime = idToRecycleTime.get(id); - if (originalRecycleTime > - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot()) { - return currentTimeMs; // can not be erased + private synchronized boolean checkValidDeletionByClusterSnapshot(long id) { + if (!idToRecycleTime.containsKey(id)) { + return true; } + return idToRecycleTime.get(id) > GlobalStateMgr.getCurrentState() + .getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot(); + } + private synchronized long getAdjustedRecycleTimestamp(long id) { Map idToRecycleTableInfo = Maps.newHashMap(); for (Map tableEntry : idToTableInfo.rowMap().values()) { for (Map.Entry entry : tableEntry.entrySet()) { @@ -323,7 +325,7 @@ private synchronized long getAdjustedRecycleTimestamp(long id, long currentTimeM return 0; } - return originalRecycleTime; + return idToRecycleTime.get(id); } /** @@ -331,7 +333,7 @@ private synchronized long getAdjustedRecycleTimestamp(long id, long currentTimeM * Only used by main loop. */ private synchronized boolean timeExpired(long id, long currentTimeMs) { - long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id, currentTimeMs); + long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id); long expireMs = max(Config.catalog_trash_expire_second * 1000L, MIN_ERASE_LATENCY); if (enableEraseLater.contains(id)) { // if enableEraseLater is set, extend the timeout by LATE_RECYCLE_INTERVAL_SECONDS @@ -341,6 +343,10 @@ private synchronized boolean timeExpired(long id, long currentTimeMs) { } private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long currentTimeMs) { + if (!checkValidDeletionByClusterSnapshot(tableInfo.getTable().getId())) { + return false; + } + if (timeExpired(tableInfo.getTable().getId(), currentTimeMs)) { return true; } @@ -353,6 +359,10 @@ private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long curr } private synchronized boolean canErasePartition(RecyclePartitionInfo partitionInfo, long currentTimeMs) { + if (!checkValidDeletionByClusterSnapshot(partitionInfo.getPartition().getId())) { + return false; + } + if (timeExpired(partitionInfo.getPartition().getId(), currentTimeMs)) { return true; } @@ -381,11 +391,15 @@ public synchronized boolean ensureEraseLater(long id, long currentTimeMs) { return false; } // 2. will expire after quite a long time, don't worry - long latency = currentTimeMs - getAdjustedRecycleTimestamp(id, currentTimeMs); + long latency = currentTimeMs - getAdjustedRecycleTimestamp(id); if (latency < (Config.catalog_trash_expire_second - LATE_RECYCLE_INTERVAL_SECONDS) * 1000L) { return true; } - // 3. already expired, sorry. + // 3. check valid by cluster snapshot + if (!checkValidDeletionByClusterSnapshot(id)) { + return true; + } + // 4. already expired, sorry. if (latency > Config.catalog_trash_expire_second * 1000L) { return false; } @@ -400,7 +414,7 @@ protected synchronized void eraseDatabase(long currentTimeMs) { Map.Entry entry = dbIter.next(); RecycleDatabaseInfo dbInfo = entry.getValue(); Database db = dbInfo.getDb(); - if (timeExpired(db.getId(), currentTimeMs)) { + if (timeExpired(db.getId(), currentTimeMs) && checkValidDeletionByClusterSnapshot(db.getId())) { // erase db dbIter.remove(); removeRecycleMarkers(entry.getKey()); @@ -982,6 +996,7 @@ protected void runAfterCatalogReady() { long currentTimeMs = System.currentTimeMillis(); // should follow the partition/table/db order // in case of partition(table) is still in recycle bin but table(db) is missing + LOG.info("catalog recycle begin"); try { erasePartition(currentTimeMs); // synchronized is unfair lock, sleep here allows other high-priority operations to obtain a lock @@ -993,6 +1008,7 @@ protected void runAfterCatalogReady() { } catch (InterruptedException e) { LOG.warn("Failed to execute runAfterCatalogReady", e); } + LOG.info("catalog recycle finished"); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotCheckpointScheduler.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotCheckpointScheduler.java index a27568c8c850cc..dc24b4323a2bf2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotCheckpointScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotCheckpointScheduler.java @@ -33,6 +33,8 @@ public class ClusterSnapshotCheckpointScheduler extends FrontendDaemon { private final CheckpointController feController; private final CheckpointController starMgrController; + private boolean hasfinished = false; + public ClusterSnapshotCheckpointScheduler(CheckpointController feController, CheckpointController starMgrController) { super("cluster_snapshot_checkpoint_scheduler", Config.automated_cluster_snapshot_interval_seconds * 1000L); this.feController = feController; @@ -45,6 +47,10 @@ protected void runAfterCatalogReady() { return; } + if (hasfinished) { + return; + } + CheckpointController.exclusiveLock(); try { runCheckpointScheduler(); @@ -115,6 +121,7 @@ protected void runCheckpointScheduler() { job.addAutomatedClusterSnapshot(); LOG.info("Finish Cluster Snapshot checkpoint, FE checkpoint journal Id: {}, StarMgr checkpoint journal Id: {}", job.getFeJournalId(), job.getStarMgrJournalId()); + hasfinished = true; } }