Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Cluster Snapshot Backup: deletion control (part4) (backport #54980) #55206

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ private void clearExpireFinishedOrCancelledAlterJobsV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = alterJobsV2.entrySet().iterator();
while (iterator.hasNext()) {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isExpire()) {
long validDeletionTimeMs = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.getValidDeletionTimeMsByAutomatedSnapshot();
if (alterJobV2.isExpire() && alterJobV2.getFinishedTimeMs() < validDeletionTimeMs) {
iterator.remove();
RemoveAlterJobV2OperationLog log =
new RemoveAlterJobV2OperationLog(alterJobV2.getJobId(), alterJobV2.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ public synchronized void recycleTable(long dbId, Table table, boolean recoverabl
nameToTableInfo.remove(dbId, table.getName());
nameToTableInfo.put(dbId, oldTable.getName(), oldTableInfo);
// Speed up the deletion of this renamed table by modifying its recycle time to zero
idToRecycleTime.put(oldTable.getId(), 0L);
idToRecycleTime.put(oldTable.getId(), System.currentTimeMillis());
}

// If the table was force dropped, set recycle time to zero so that this table will be deleted immediately
// in the next cleanup round.
idToRecycleTime.put(table.getId(), !recoverable ? 0 : System.currentTimeMillis());
idToRecycleTime.put(table.getId(), System.currentTimeMillis());
idToTableInfo.put(dbId, table.getId(), newTableInfo);
nameToTableInfo.put(dbId, table.getName(), newTableInfo);

Expand Down Expand Up @@ -228,8 +228,7 @@ public synchronized void recyclePartition(RecyclePartitionInfo recyclePartitionI

disableRecoverPartitionWithSameName(dbId, tableId, partitionName);

long recycleTime = recyclePartitionInfo.isRecoverable() ? System.currentTimeMillis() : 0;
idToRecycleTime.put(partitionId, recycleTime);
idToRecycleTime.put(partitionId, System.currentTimeMillis());
idToPartition.put(partitionId, recyclePartitionInfo);
LOG.info("Finished put partition '{}' to recycle bin. dbId: {} tableId: {} partitionId: {} recoverable: {}",
partitionName, dbId, tableId, partitionId, recyclePartitionInfo.isRecoverable());
Expand Down Expand Up @@ -300,12 +299,42 @@ public synchronized List<PhysicalPartition> getPhysicalPartitions(long tableId)
.collect(Collectors.toList());
}

private synchronized boolean checkValidDeletionByClusterSnapshot(long id) {
Long originalRecycleTime = idToRecycleTime.get(id);
if (originalRecycleTime == null) {
return true;
}
return originalRecycleTime < GlobalStateMgr.getCurrentState()
.getClusterSnapshotMgr().getValidDeletionTimeMsByAutomatedSnapshot();
}

private synchronized long getAdjustedRecycleTimestamp(long id) {
Map<Long, RecycleTableInfo> idToRecycleTableInfo = Maps.newHashMap();
for (Map<Long, RecycleTableInfo> tableEntry : idToTableInfo.rowMap().values()) {
for (Map.Entry<Long, RecycleTableInfo> entry : tableEntry.entrySet()) {
idToRecycleTableInfo.put(entry.getKey(), entry.getValue());
}
}

RecycleTableInfo tableInfo = idToRecycleTableInfo.get(id);
if (tableInfo != null && !tableInfo.isRecoverable()) {
return 0;
}

RecyclePartitionInfo partitionInfo = idToPartition.get(id);
if (partitionInfo != null && !partitionInfo.isRecoverable()) {
return 0;
}

return idToRecycleTime.get(id);
}

/**
* if we can erase this instance, we should check if anyone enable erase later.
* Only used by main loop.
*/
private synchronized boolean timeExpired(long id, long currentTimeMs) {
long latencyMs = currentTimeMs - idToRecycleTime.get(id);
long latencyMs = currentTimeMs - getAdjustedRecycleTimestamp(id);
long expireMs = max(Config.catalog_trash_expire_second * 1000L, MIN_ERASE_LATENCY);
if (enableEraseLater.contains(id)) {
// if enableEraseLater is set, extend the timeout by LATE_RECYCLE_INTERVAL_SECONDS
Expand All @@ -315,6 +344,10 @@ private synchronized boolean timeExpired(long id, long currentTimeMs) {
}

private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(tableInfo.getTable().getId())) {
return false;
}

if (timeExpired(tableInfo.getTable().getId(), currentTimeMs)) {
return true;
}
Expand All @@ -327,6 +360,10 @@ private synchronized boolean canEraseTable(RecycleTableInfo tableInfo, long curr
}

private synchronized boolean canErasePartition(RecyclePartitionInfo partitionInfo, long currentTimeMs) {
if (!checkValidDeletionByClusterSnapshot(partitionInfo.getPartition().getId())) {
return false;
}

if (timeExpired(partitionInfo.getPartition().getId(), currentTimeMs)) {
return true;
}
Expand Down Expand Up @@ -355,11 +392,15 @@ public synchronized boolean ensureEraseLater(long id, long currentTimeMs) {
return false;
}
// 2. will expire after quite a long time, don't worry
long latency = currentTimeMs - idToRecycleTime.get(id);
long latency = currentTimeMs - getAdjustedRecycleTimestamp(id);
if (latency < (Config.catalog_trash_expire_second - LATE_RECYCLE_INTERVAL_SECONDS) * 1000L) {
return true;
}
// 3. already expired, sorry.
// 3. check valid by cluster snapshot
if (!checkValidDeletionByClusterSnapshot(id)) {
return true;
}
// 4. already expired, sorry.
if (latency > Config.catalog_trash_expire_second * 1000L) {
return false;
}
Expand All @@ -374,7 +415,7 @@ protected synchronized void eraseDatabase(long currentTimeMs) {
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
if (timeExpired(db.getId(), currentTimeMs)) {
if (timeExpired(db.getId(), currentTimeMs) && checkValidDeletionByClusterSnapshot(db.getId())) {
// erase db
dbIter.remove();
removeRecycleMarkers(entry.getKey());
Expand Down Expand Up @@ -635,7 +676,7 @@ private synchronized void disableRecoverPartitionWithSameName(long dbId, long ta
continue;
}
partitionInfo.setRecoverable(false);
idToRecycleTime.replace(partitionInfo.getPartition().getId(), 0L);
idToRecycleTime.replace(partitionInfo.getPartition().getId(), System.currentTimeMillis());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ private List<Long> getAllPartitionShardGroupId() {
locker.lockDatabase(db.getId(), LockType.READ);
try {
for (Table table : GlobalStateMgr.getCurrentState().getLocalMetastore().getTablesIncludeRecycleBin(db)) {
if (table.isCloudNativeTableOrMaterializedView()) {
if (table.isCloudNativeTableOrMaterializedView() &&
GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.checkValidDeletionForTableFromAlterJob(table.getId())) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getAllPartitionsIncludeRecycleBin((OlapTable) table)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,25 @@ public enum ClusterSnapshotType { AUTOMATED, MANUAL, INCREMENTAL }
private ClusterSnapshotType type;
@SerializedName(value = "storageVolumeName")
private String storageVolumeName;
@SerializedName(value = "createdTime")
private long createdTime;
@SerializedName(value = "finishedTime")
private long finishedTime;
@SerializedName(value = "createdTimeMs")
private long createdTimeMs;
@SerializedName(value = "finishedTimeMs")
private long finishedTimeMs;
@SerializedName(value = "feJournalId")
private long feJournalId;
@SerializedName(value = "starMgrJournal")
private long starMgrJournalId;

public ClusterSnapshot() {}

public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTime,
long finishedTime, long feJournalId, long starMgrJournalId) {
public ClusterSnapshot(long id, String snapshotName, String storageVolumeName, long createdTimeMs,
long finishedTimeMs, long feJournalId, long starMgrJournalId) {
this.id = id;
this.snapshotName = snapshotName;
this.type = ClusterSnapshotType.AUTOMATED;
this.storageVolumeName = storageVolumeName;
this.createdTime = createdTime;
this.finishedTime = finishedTime;
this.createdTimeMs = createdTimeMs;
this.finishedTimeMs = finishedTimeMs;
this.feJournalId = feJournalId;
this.starMgrJournalId = starMgrJournalId;
}
Expand All @@ -59,8 +59,8 @@ public void setJournalIds(long feJournalId, long starMgrJournalId) {
this.starMgrJournalId = starMgrJournalId;
}

public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
public void setFinishedTimeMs(long finishedTimeMs) {
this.finishedTimeMs = finishedTimeMs;
}

public String getSnapshotName() {
Expand All @@ -71,12 +71,12 @@ public String getStorageVolumeName() {
return storageVolumeName;
}

public long getCreatedTime() {
return createdTime;
public long getCreatedTimeMs() {
return createdTimeMs;
}

public long getFinishedTime() {
return finishedTime;
public long getFinishedTimeMs() {
return finishedTimeMs;
}

public long getFeJournalId() {
Expand All @@ -95,8 +95,8 @@ public TClusterSnapshotsItem getInfo() {
TClusterSnapshotsItem item = new TClusterSnapshotsItem();
item.setSnapshot_name(snapshotName);
item.setSnapshot_type(type.name());
item.setCreated_time(createdTime / 1000);
item.setFinished_time(finishedTime / 1000);
item.setCreated_time(createdTimeMs / 1000);
item.setFinished_time(finishedTimeMs / 1000);
item.setFe_jouranl_id(feJournalId);
item.setStarmgr_jouranl_id(starMgrJournalId);
item.setProperties("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ public enum ClusterSnapshotJobState { INITIALIZING, SNAPSHOTING, UPLOADING, FINI
@SerializedName(value = "errMsg")
private String errMsg;

public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTime) {
this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTime, -1, 0, 0);
public ClusterSnapshotJob(long id, String snapshotName, String storageVolumeName, long createdTimeMs) {
this.snapshot = new ClusterSnapshot(id, snapshotName, storageVolumeName, createdTimeMs, -1, 0, 0);
this.state = ClusterSnapshotJobState.INITIALIZING;
this.errMsg = "";
}

public void setState(ClusterSnapshotJobState state) {
this.state = state;
if (state == ClusterSnapshotJobState.FINISHED) {
snapshot.setFinishedTime(System.currentTimeMillis());
snapshot.setFinishedTimeMs(System.currentTimeMillis());
}
}

Expand All @@ -75,12 +75,12 @@ public String getStorageVolumeName() {
return snapshot.getStorageVolumeName();
}

public long getCreatedTime() {
return snapshot.getCreatedTime();
public long getCreatedTimeMs() {
return snapshot.getCreatedTimeMs();
}

public long getFinishedTime() {
return snapshot.getFinishedTime();
public long getFinishedTimeMs() {
return snapshot.getFinishedTimeMs();
}

public long getFeJournalId() {
Expand All @@ -105,6 +105,10 @@ public boolean isUnFinishedState() {
state == ClusterSnapshotJobState.FINISHED;
}

public boolean isFinished() {
return state == ClusterSnapshotJobState.FINISHED;
}

public void logJob() {
ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setSnapshotJob(this);
Expand All @@ -123,8 +127,8 @@ public TClusterSnapshotJobsItem getInfo() {
TClusterSnapshotJobsItem item = new TClusterSnapshotJobsItem();
item.setSnapshot_name(getSnapshotName());
item.setJob_id(getId());
item.setCreated_time(getCreatedTime() / 1000);
item.setFinished_time(getFinishedTime() / 1000);
item.setCreated_time(getCreatedTimeMs() / 1000);
item.setFinished_time(getFinishedTimeMs() / 1000);
item.setState(state.name());
item.setDetail_info("");
item.setError_message(errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.alter.AlterJobV2;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState;
Expand Down Expand Up @@ -117,11 +118,11 @@ protected void addAutomatedClusterSnapshot(ClusterSnapshot newAutomatedClusterSn
}

public ClusterSnapshotJob createAutomatedSnapshotJob() {
long createTime = System.currentTimeMillis();
long createTimeMs = System.currentTimeMillis();
long id = GlobalStateMgr.getCurrentState().getNextId();
String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTime);
String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTimeMs);
String storageVolumeName = automatedSnapshotSvName;
ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTime);
ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTimeMs);
job.logJob();

addJob(job);
Expand All @@ -148,13 +149,53 @@ public boolean containsAutomatedSnapshot() {
}

public synchronized void addJob(ClusterSnapshotJob job) {
if (Config.max_historical_automated_cluster_snapshot_jobs >= 1 &&
historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) {
int maxSize = Math.max(Config.max_historical_automated_cluster_snapshot_jobs, 2);
if (historyAutomatedSnapshotJobs.size() == maxSize) {
historyAutomatedSnapshotJobs.pollFirstEntry();
}
historyAutomatedSnapshotJobs.put(job.getId(), job);
}

public synchronized long getValidDeletionTimeMsByAutomatedSnapshot() {
if (!isAutomatedSnapshotOn()) {
return Long.MAX_VALUE;
}

boolean findLastSuccess = false;
long previousAutomatedSnapshotCreatedTimsMs = 0;
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.descendingMap().entrySet()) {
ClusterSnapshotJob job = entry.getValue();
if (job.isFinished()) {
if (findLastSuccess) {
previousAutomatedSnapshotCreatedTimsMs = job.getCreatedTimeMs();
break;
}

findLastSuccess = true;
}
}

return previousAutomatedSnapshotCreatedTimsMs;
}

public synchronized boolean checkValidDeletionForTableFromAlterJob(long tableId) {
if (!isAutomatedSnapshotOn()) {
return true;
}

boolean valid = true;
Map<Long, AlterJobV2> alterJobs = GlobalStateMgr.getCurrentState().getRollupHandler().getAlterJobsV2();
alterJobs.putAll(GlobalStateMgr.getCurrentState().getSchemaChangeHandler().getAlterJobsV2());
for (Map.Entry<Long, AlterJobV2> entry : alterJobs.entrySet()) {
AlterJobV2 alterJob = entry.getValue();
if (alterJob.getTableId() == tableId) {
valid = (alterJob.getFinishedTimeMs() < getValidDeletionTimeMsByAutomatedSnapshot());
break;
}
}
return valid;
}

public TClusterSnapshotJobsResponse getAllJobsInfo() {
TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse();
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ private void vacuumPartitionImpl(Database db, OlapTable table, PhysicalPartition
vacuumRequest.minRetainVersion = minRetainVersion;
vacuumRequest.graceTimestamp =
startTime / MILLISECONDS_PER_SECOND - Config.lake_autovacuum_grace_period_minutes * 60;
vacuumRequest.graceTimestamp = Math.min(vacuumRequest.graceTimestamp,
Math.max(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr()
.getValidDeletionTimeMsByAutomatedSnapshot() / MILLISECONDS_PER_SECOND, 1));
vacuumRequest.minActiveTxnId = minActiveTxnId;
vacuumRequest.partitionId = partition.getId();
vacuumRequest.deleteTxnLog = needDeleteTxnLog;
Expand Down
Loading
Loading