Skip to content

Commit

Permalink
[fix](backup) Load backup meta and job info bytes from disk #43276 (#…
Browse files Browse the repository at this point in the history
…43518)

cherry pick from #43276
  • Loading branch information
w41ter authored Nov 8, 2024
1 parent 4b3aa2e commit a843ca3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) {

public abstract boolean isCancelled();

public abstract boolean isFinished();

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ public class BackupHandler extends MasterDaemon implements Writable {

private Env env;

// map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
// this map not present in persist && only in fe master memory
// map to store backup info, key is label name, value is the BackupJob
// this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
private final Map<String, Snapshot> localSnapshots = new HashMap<>();
private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();

public BackupHandler() {
Expand Down Expand Up @@ -166,6 +166,7 @@ private boolean init() {
return false;
}
}

isInit = true;
return true;
}
Expand Down Expand Up @@ -484,11 +485,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
return;
}

List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
jobs.removeFirst();
AbstractJob removedJob = jobs.removeFirst();
if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
removedLabels.add(removedJob.getLabel());
}
}
AbstractJob lastJob = jobs.peekLast();

Expand All @@ -501,6 +506,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
} finally {
jobLock.unlock();
}

if (job.isFinished() && job instanceof BackupJob) {
// Save snapshot to local repo, when reload backupHandler from image.
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
for (String label : removedLabels) {
removeSnapshot(label);
}
}

private List<AbstractJob> getAllCurrentJobs() {
Expand Down Expand Up @@ -737,22 +753,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum,
return false;
}

public void addSnapshot(String labelName, Snapshot snapshot) {
public void addSnapshot(String labelName, BackupJob backupJob) {
assert backupJob.isFinished();

LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.put(labelName, snapshot);
localSnapshots.put(labelName, backupJob);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public void removeSnapshot(String labelName) {
LOG.info("remove snapshot {} from local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public Snapshot getSnapshot(String labelName) {
BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
return localSnapshots.get(labelName);
backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}

if (backupJob == null) {
return null;
}

return backupJob.getSnapshot();
}

public static BackupHandler read(DataInput in) throws IOException {
Expand Down
45 changes: 30 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ public enum BackupJobState {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();

private byte[] metaInfoBytes = null;
private byte[] jobInfoBytes = null;

public BackupJob() {
super(JobType.BACKUP);
}
Expand Down Expand Up @@ -333,11 +330,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas

@Override
public synchronized void replayRun() {
LOG.info("replay run backup job: {}", this);
if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
}
// nothing to do
}

@Override
Expand All @@ -355,6 +348,11 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == BackupJobState.FINISHED;
}

// Polling the job state and do the right things.
@Override
public synchronized void run() {
Expand Down Expand Up @@ -792,8 +790,6 @@ private void saveMetaInfo() {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
// read meta info to metaInfoBytes
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());

// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
Expand All @@ -818,8 +814,6 @@ private void saveMetaInfo() {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
// read job info to jobInfoBytes
jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
Expand Down Expand Up @@ -873,7 +867,6 @@ private void uploadMetaAndJobInfoFile() {
}
}


finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;

Expand All @@ -882,8 +875,7 @@ private void uploadMetaAndJobInfoFile() {
LOG.info("job is finished. {}", this);

if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
env.getBackupHandler().addSnapshot(label, this);
return;
}
}
Expand Down Expand Up @@ -976,6 +968,29 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public boolean isLocalSnapshot() {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}

// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
return null;
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
return null;
}
}

public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == RestoreJobState.FINISHED;
}

@Override
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2803,15 +2803,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
}

// Step 3: get snapshot
String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs("snapshot not exist");
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
label, snapshot.getMeta().length, snapshot.getJobInfo().length);
}

return result;
Expand Down

0 comments on commit a843ca3

Please sign in to comment.