Skip to content

Commit

Permalink
[Feature] Support Cluster Snapshot Backup: deletion control (part4) (…
Browse files Browse the repository at this point in the history
…backport #54980) (#55206)

Co-authored-by: srlch <[email protected]>
  • Loading branch information
mergify[bot] and srlch authored Jan 20, 2025
1 parent 2479c0a commit e34b448
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private void clearExpireFinishedOrCancelledAlterJobsV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = alterJobsV2.entrySet().iterator();
while (iterator.hasNext()) {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isExpire()) {
long validDeletionTimeMs = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.getValidDeletionTimeMsByAutomatedSnapshot();
if (alterJobV2.isExpire() && alterJobV2.getFinishedTimeMs() < validDeletionTimeMs) {
iterator.remove();
RemoveAlterJobV2OperationLog log =
new RemoveAlterJobV2OperationLog(alterJobV2.getJobId(), alterJobV2.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ public synchronized void recycleTable(long dbId, Table table, boolean recoverabl
nameToTableInfo.remove(dbId, table.getName());
nameToTableInfo.put(dbId, oldTable.getName(), oldTableInfo);
// Speed up the deletion of this renamed table by modifying its recycle time to zero
idToRecycleTime.put(oldTable.getId(), 0L);
idToRecycleTime.put(oldTable.getId(), System.currentTimeMillis());
}

// If the table was force dropped, set recycle time to zero so that this table will be deleted immediately
// in the next cleanup round.
idToRecycleTime.put(table.getId(), !recoverable ? 0 : System.currentTimeMillis());
idToRecycleTime.put(table.getId(), System.currentTimeMillis());
idToTableInfo.put(dbId, table.getId(), newTableInfo);
nameToTableInfo.put(dbId, table.getName(), newTableInfo);

Expand Down Expand Up @@ -228,8 +228,7 @@ public synchronized void recyclePartition(RecyclePartitionInfo recyclePartitionI

disableRecoverPartitionWithSameName(dbId, tableId, partitionName);

long recycleTime = recyclePartitionInfo.isRecoverable() ? System.currentTimeMillis() : 0;
idToRecycleTime.put(partitionId, recycleTime);
idToRecycleTime.put(partitionId, System.currentTimeMillis());
idToPartition.put(partitionId, recyclePartitionInfo);
LOG.info("Finished put partition '{}' to recycle bin. dbId: {} tableId: {} partitionId: {} recoverable: {}",
partitionName, dbId, tableId, partitionId, recyclePartitionInfo.isRecoverable());
Expand Down Expand Up @@ -300,12 +299,42 @@ public synchronized List<PhysicalPartition> getPhysicalPartitions(long tableId)
.collect(Collectors.toList());
}

private synchronized boolean checkValidDeletionByClusterSnapshot(long id) {
Long originalRecycleTime = idToRecycleTime.get(id);
if (originalRecycleTime == null) {
return true;
}
return originalRecycleTime < GlobalStateMgr.getCurrentState()
.getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot();
}

private synchronized long getAdjustedRecycleTimestamp(long id) {
Map<Long, RecycleTableInfo> idToRecycleTableInfo = Maps.newHashMap();
for (Map<Long, RecycleTableInfo> tableEntry : idToTableInfo.rowMap().values()) {
for (Map.Entry<Long, RecycleTableInfo> entry : tableEntry.entrySet()) {
idToRecycleTableInfo.put(entry.getKey(), entry.getValue());
}
}

RecycleTableInfo tableInfo = idToRecycleTableInfo.get(id);
if (tableInfo != null && !tableInfo.isRecoverable()) {
return 0;
}

RecyclePartitionInfo partitionInfo = idToPartition.get(id);
if (partitionInfo != null && !partitionInfo.isRecoverable()) {
return 0;
}

return idToRecycleTime.get(id);
}

/**
* if we can erase this instance, we should check if anyone enable erase later.
* Only used by main loop.
*/
private synchronized boolean timeExpired(long id, long currentTimeMs) {
long latencyMs = currentTimeMs - idToRecycleTime.get(id);
long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id);
long expireMs = max(Config.catalog_trash_expire_second * 1000L, MIN_ERASE_LATENCY);
if (enableEraseLater.contains(id)) {
// if enableEraseLater is set, extend the timeout by LATE_RECYCLE_INTERVAL_SECONDS
Expand All @@ -315,6 +344,10 @@ private synchronized boolean timeExpired(long id, long currentTimeMs) {
}

private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(tableInfo.getTable().getId())) {
return false;
}

if (timeExpired(tableInfo.getTable().getId(), currentTimeMs)) {
return true;
}
Expand All @@ -327,6 +360,10 @@ private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long curr
}

private synchronized boolean canErasePartition(RecyclePartitionInfo partitionInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(partitionInfo.getPartition().getId())) {
return false;
}

if (timeExpired(partitionInfo.getPartition().getId(), currentTimeMs)) {
return true;
}
Expand Down Expand Up @@ -355,11 +392,15 @@ public synchronized boolean ensureEraseLater(long id, long currentTimeMs) {
return false;
}
// 2. will expire after quite a long time, don't worry
long latency = currentTimeMs - idToRecycleTime.get(id);
long latency = currentTimeMs - getAdjustedRecycleTimestamp(id);
if (latency < (Config.catalog_trash_expire_second - LATE_RECYCLE_INTERVAL_SECONDS) * 1000L) {
return true;
}
// 3. already expired, sorry.
// 3. check valid by cluster snapshot
if (!checkValidDeletionByClusterSnapshot(id)) {
return true;
}
// 4. already expired, sorry.
if (latency > Config.catalog_trash_expire_second * 1000L) {
return false;
}
Expand All @@ -374,7 +415,7 @@ protected synchronized void eraseDatabase(long currentTimeMs) {
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
if (timeExpired(db.getId(), currentTimeMs)) {
if (timeExpired(db.getId(), currentTimeMs) && checkValidDeletionByClusterSnapshot(db.getId())) {
// erase db
dbIter.remove();
removeRecycleMarkers(entry.getKey());
Expand Down Expand Up @@ -635,7 +676,7 @@ private synchronized void disableRecoverPartitionWithSameName(long dbId, long ta
continue;
}
partitionInfo.setRecoverable(false);
idToRecycleTime.replace(partitionInfo.getPartition().getId(), 0L);
idToRecycleTime.replace(partitionInfo.getPartition().getId(), System.currentTimeMillis());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ private List<Long> getAllPartitionShardGroupId() {
locker.lockDatabase(db.getId(), LockType.READ);
try {
for (Table table : GlobalStateMgr.getCurrentState().getLocalMetastore().getTablesIncludeRecycleBin(db)) {
if (table.isCloudNativeTableOrMaterializedView()) {
if (table.isCloudNativeTableOrMaterializedView() &&
GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.checkValidDeletionForTableFromAlterJob(table.getId())) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getAllPartitionsIncludeRecycleBin((OlapTable) table)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,25 @@ public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL }
private ClusterSnapshotType type;
@SerializedName(value = "storageVolumeName")
private String storageVolumeName;
@SerializedName(value = "createdTime")
private long createdTime;
@SerializedName(value = "finishedTime")
private long finishedTime;
@SerializedName(value = "createdTimeMs")
private long createdTimeMs;
@SerializedName(value = "finishedTimeMs")
private long finishedTimeMs;
@SerializedName(value = "feJournalId")
private long feJournalId;
@SerializedName(value = "starMgrJournal")
private long starMgrJournalId;

public ClusterSnapshot() {}

public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTime,
long finishedTime, long feJournalId, long starMgrJournalId) {
public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTimeMs,
long finishedTimeMs, long feJournalId, long starMgrJournalId) {
this.id = id;
this.snapshotName = snapshotName;
this.type = ClusterSnapshotType.AUTOMATED;
this.storageVolumeName = storageVolumeName;
this.createdTime = createdTime;
this.finishedTime = finishedTime;
this.createdTimeMs = createdTimeMs;
this.finishedTimeMs = finishedTimeMs;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}
Expand All @@ -59,8 +59,8 @@ public void setJournalIds(long feJournalId, long starMgrJournalId) {
this.starMgrJournalId = starMgrJournalId;
}

public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
public void setFinishedTimeMs(long finishedTimeMs) {
this.finishedTimeMs = finishedTimeMs;
}

public String getSnapshotName() {
Expand All @@ -71,12 +71,12 @@ public String getStorageVolumeName() {
return storageVolumeName;
}

public long getCreatedTime() {
return createdTime;
public long getCreatedTimeMs() {
return createdTimeMs;
}

public long getFinishedTime() {
return finishedTime;
public long getFinishedTimeMs() {
return finishedTimeMs;
}

public long getFeJournalId() {
Expand All @@ -95,8 +95,8 @@ public TClusterSnapshotsItem getInfo() {
TClusterSnapshotsItem item = new TClusterSnapshotsItem();
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createdTime / 1000);
item.setFinished_time(finishedTime / 1000);
item.setCreated_time(createdTimeMs / 1000);
item.setFinished_time(finishedTimeMs / 1000);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINI
@SerializedName(value = "errMsg")
private String errMsg;

public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTime) {
this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTime, -1, 0, 0);
public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTimeMs) {
this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTimeMs, -1, 0, 0);
this.state = ClusterSnapshotJobState.INITIALIZING;
this.errMsg = "";
}

public void setState(ClusterSnapshotJobState state) {
this.state = state;
if (state == ClusterSnapshotJobState.FINISHED) {
snapshot.setFinishedTime(System.currentTimeMillis());
snapshot.setFinishedTimeMs(System.currentTimeMillis());
}
}

Expand All @@ -75,12 +75,12 @@ public String getStorageVolumeName() {
return snapshot.getStorageVolumeName();
}

public long getCreatedTime() {
return snapshot.getCreatedTime();
public long getCreatedTimeMs() {
return snapshot.getCreatedTimeMs();
}

public long getFinishedTime() {
return snapshot.getFinishedTime();
public long getFinishedTimeMs() {
return snapshot.getFinishedTimeMs();
}

public long getFeJournalId() {
Expand All @@ -105,6 +105,10 @@ public boolean isUnFinishedState() {
state == ClusterSnapshotJobState.FINISHED;
}

public boolean isFinished() {
return state == ClusterSnapshotJobState.FINISHED;
}

public void logJob() {
ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setSnapshotJob(this);
Expand All @@ -123,8 +127,8 @@ public TClusterSnapshotJobsItem getInfo() {
TClusterSnapshotJobsItem item = new TClusterSnapshotJobsItem();
item.setSnapshot_name(getSnapshotName());
item.setJob_id(getId());
item.setCreated_time(getCreatedTime() / 1000);
item.setFinished_time(getFinishedTime() / 1000);
item.setCreated_time(getCreatedTimeMs() / 1000);
item.setFinished_time(getFinishedTimeMs() / 1000);
item.setState(state.name());
item.setDetail_info("");
item.setError_message(errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.alter.AlterJobV2;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState;
Expand Down Expand Up @@ -117,11 +118,11 @@ protected void addAutomatedClusterSnapshot(ClusterSnapshot newAutomatedClusterSn
}

public ClusterSnapshotJob createAutomatedSnapshotJob() {
long createTime = System.currentTimeMillis();
long createTimeMs = System.currentTimeMillis();
long id = GlobalStateMgr.getCurrentState().getNextId();
String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTime);
String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTimeMs);
String storageVolumeName = automatedSnapshotSvName;
ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTime);
ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTimeMs);
job.logJob();

addJob(job);
Expand All @@ -148,13 +149,53 @@ public boolean containsAutomatedSnapshot() {
}

public synchronized void addJob(ClusterSnapshotJob job) {
if (Config.max_historical_automated_cluster_snapshot_jobs >= 1 &&
historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) {
int maxSize = Math.max(Config.max_historical_automated_cluster_snapshot_jobs, 2);
if (historyAutomatedSnapshotJobs.size() == maxSize) {
historyAutomatedSnapshotJobs.pollFirstEntry();
}
historyAutomatedSnapshotJobs.put(job.getId(), job);
}

public synchronized long getValidDeletionTimeMsByAutomatedSnapshot() {
if (!isAutomatedSnapshotOn()) {
return Long.MAX_VALUE;
}

boolean findLastSuccess = false;
long previousAutomatedSnapshotCreatedTimsMs = 0;
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.descendingMap().entrySet()) {
ClusterSnapshotJob job = entry.getValue();
if (job.isFinished()) {
if (findLastSuccess) {
previousAutomatedSnapshotCreatedTimsMs = job.getCreatedTimeMs();
break;
}

findLastSuccess = true;
}
}

return previousAutomatedSnapshotCreatedTimsMs;
}

public synchronized boolean checkValidDeletionForTableFromAlterJob(long tableId) {
if (!isAutomatedSnapshotOn()) {
return true;
}

boolean valid = true;
Map<Long, AlterJobV2> alterJobs = GlobalStateMgr.getCurrentState().getRollupHandler().getAlterJobsV2();
alterJobs.putAll(GlobalStateMgr.getCurrentState().getSchemaChangeHandler().getAlterJobsV2());
for (Map.Entry<Long, AlterJobV2> entry : alterJobs.entrySet()) {
AlterJobV2 alterJob = entry.getValue();
if (alterJob.getTableId() == tableId) {
valid = (alterJob.getFinishedTimeMs() < getValidDeletionTimeMsByAutomatedSnapshot());
break;
}
}
return valid;
}

public TClusterSnapshotJobsResponse getAllJobsInfo() {
TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse();
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ private void vacuumPartitionImpl(Database db, OlapTable table, PhysicalPartition
vacuumRequest.minRetainVersion = minRetainVersion;
vacuumRequest.graceTimestamp =
startTime / MILLISECONDS_PER_SECOND - Config.lake_autovacuum_grace_period_minutes * 60;
vacuumRequest.graceTimestamp = Math.min(vacuumRequest.graceTimestamp,
Math.max(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.getValidDeletionTimeMsByAutomatedSnapshot() / MILLISECONDS_PER_SECOND, 1));
vacuumRequest.minActiveTxnId = minActiveTxnId;
vacuumRequest.partitionId = partition.getId();
vacuumRequest.deleteTxnLog = needDeleteTxnLog;
Expand Down
Loading

0 comments on commit e34b448

Please sign in to comment.