Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress listeners to the long-running operations #323

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/gradle-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ jobs:

steps:
# Set up build environment
- name: Prepare git
if: ${{ matrix.os != 'ubuntu-latest' }}
run: git config --global core.autocrlf false
- name: Checkout
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
Expand Down
7 changes: 3 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ subprojects {
excludes = mutableListOf(
"com.github.nagyesta.filebarj.core.backup.FileParseException",
"com.github.nagyesta.filebarj.core.backup.worker.WindowsFileMetadataParser",
"com.github.nagyesta.filebarj.core.progress.NoOpProgressTracker",
"com.github.nagyesta.filebarj.core.restore.worker.WindowsFileMetadataSetter",
"com.github.nagyesta.filebarj.job.Main",
"com.github.nagyesta.filebarj.job.Controller"
Expand Down Expand Up @@ -220,10 +221,8 @@ subprojects {
setOutputFormat("json")
//noinspection UnnecessaryQualifiedReference
val attachmentText = org.cyclonedx.model.AttachmentText()
attachmentText.setText(
Base64.getEncoder().encodeToString(
file("${project.rootProject.projectDir}/LICENSE").readBytes()
)
attachmentText.text = Base64.getEncoder().encodeToString(
file("${project.rootProject.projectDir}/LICENSE").readBytes()
)
attachmentText.encoding = "base64"
attachmentText.contentType = "text/plain"
Expand Down
44 changes: 32 additions & 12 deletions file-barj-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ final var configuration = BackupJobConfiguration.builder()
.chunkSizeMebibyte(1)
.encryptionKey(null)
.build();
final var backupController = new BackupController(configuration, false);
final var backupParameters = BackupParameters.builder()
.job(configuration)
.forceFull(false)
.build();
final var backupController = new BackupController(backupParameters);

//executing the backup
backupController.execute(1);
Expand All @@ -66,13 +70,14 @@ backupController.execute(1);
### Merging increments

```java
final var mergeController = new MergeController(
Path.of("/tmp/backup"),
"prefix",
null, //optional key encryption key
123L, //Backup start epoch seconds for the first file of the range (inclusive)
234L //Backup start epoch seconds for the last file of the range (inclusive)
);
final var mergeParameters = MergeParameters.builder()
.backupDirectory(Path.of("/tmp/backup"))
.fileNamePrefix("prefix")
.kek(null) //optional key encryption key
.rangeStartEpochSeconds(123L) //Backup start epoch seconds for the first file of the range (inclusive)
.rangeEndEpochSeconds(234L) //Backup start epoch seconds for the last file of the range (inclusive)
.build();
final var mergeController = new MergeController(mergeParameters);

mergeController.execute(false);
```
Expand All @@ -91,8 +96,13 @@ final var restoreTask = RestoreTask.builder()
.includedPath(BackupPath.of("/source/dir")) //optional path filter
.permissionComparisonStrategy(PermissionComparisonStrategy.STRICT) //optional
.build();
final var pointInTime = 123456L;
final var restoreController = new RestoreController(Path.of("/tmp/backup"), "test", null, pointInTime);
final var restoreParameters = RestoreParameters.builder()
.backupDirectory(Path.of("/tmp/backup"))
.fileNamePrefix("test")
.kek(null)
.atPointInTime(123456L)
.build();
final var restoreController = new RestoreController(restoreParameters);

//executing the restore
restoreController.execute(restoreTask);
Expand All @@ -104,7 +114,12 @@ restoreController.execute(restoreTask);
//configuring the inspection job
final var backupDir = Path.of("/backup/directory");
final var outputFile = Path.of("/backup/directory");
final var controller = new IncrementInspectionController(backupDir, "file-prefix", null);
final var inspectParameters = InspectParameters.builder()
.backupDirectory(backupDir)
.fileNamePrefix("file-prefix")
.kek(null)
.build();
final var controller = new IncrementInspectionController(inspectParameters);

//list the summary of the available increments
controller.inspectIncrements(System.out);
Expand All @@ -119,7 +134,12 @@ controller.inspectContent(Long.MAX_VALUE, outputFile);
//configuring the deletion job
final var backupDir = Path.of("/backup/directory");
final var outputFile = Path.of("/backup/directory");
final var controller = new IncrementDeletionController(backupDir, "file-prefix", null);
final var deletionParameters = IncrementDeletionParameters.builder()
.backupDirectory(backupDir)
.fileNamePrefix("file-prefix")
.kek(null)
.build();
final var controller = new IncrementDeletionController(deletionParameters);

//Delete all backup increments:
// - starting with the one created at 123456
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParser;
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParserFactory;
import com.github.nagyesta.filebarj.core.common.*;
import com.github.nagyesta.filebarj.core.config.BackupJobConfiguration;
import com.github.nagyesta.filebarj.core.model.BackupIncrementManifest;
import com.github.nagyesta.filebarj.core.model.BackupPath;
import com.github.nagyesta.filebarj.core.model.FileMetadata;
import com.github.nagyesta.filebarj.core.model.enums.BackupType;
import com.github.nagyesta.filebarj.core.model.enums.FileType;
import com.github.nagyesta.filebarj.core.progress.ObservableProgressTracker;
import com.github.nagyesta.filebarj.core.progress.ProgressStep;
import com.github.nagyesta.filebarj.core.progress.ProgressTracker;
import com.github.nagyesta.filebarj.core.util.LogUtil;
import com.github.nagyesta.filebarj.io.stream.BarjCargoArchiverFileOutputStream;
import lombok.Getter;
Expand All @@ -22,10 +24,10 @@
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static com.github.nagyesta.filebarj.core.progress.ProgressStep.*;
import static com.github.nagyesta.filebarj.core.util.TimerUtil.toProcessSummary;
import static com.github.nagyesta.filebarj.io.stream.internal.ChunkingFileOutputStream.MEBIBYTE;

Expand All @@ -36,8 +38,9 @@
@Slf4j
public class BackupController {
private static final int BATCH_SIZE = 250000;
private static final List<ProgressStep> PROGRESS_STEPS = List.of(LOAD_MANIFESTS, SCAN_FILES, PARSE_METADATA, BACKUP);
private final FileMetadataParser metadataParser = FileMetadataParserFactory.newInstance();
private final ManifestManager manifestManager = new ManifestManagerImpl();
private final ManifestManager manifestManager;
@Getter
private final BackupIncrementManifest manifest;
private final SortedMap<Integer, BackupIncrementManifest> previousManifests;
Expand All @@ -47,22 +50,31 @@ public class BackupController {
private final ReentrantLock executionLock = new ReentrantLock();
private FileMetadataChangeDetector changeDetector;
private ForkJoinPool threadPool;
private final ProgressTracker progressTracker;

/**
* Creates a new instance and initializes it for the specified job.
*
* @param job the job configuration
* @param forceFull whether to force a full backup (overriding the configuration)
* @param parameters The parameters
*/
public BackupController(@NonNull final BackupJobConfiguration job, final boolean forceFull) {
public BackupController(final @NonNull BackupParameters parameters) {
this.progressTracker = new ObservableProgressTracker(PROGRESS_STEPS);
progressTracker.registerListener(parameters.getProgressListener());
this.manifestManager = new ManifestManagerImpl(progressTracker);

final var job = parameters.getJob();
var backupType = job.getBackupType();
this.previousManifests = new TreeMap<>();
final var forceFull = parameters.isForceFull();
if (!forceFull && backupType != BackupType.FULL) {
this.previousManifests.putAll(manifestManager.loadPreviousManifestsForBackup(job));
if (previousManifests.isEmpty()) {
backupType = BackupType.FULL;
}
}
if (forceFull) {
backupType = BackupType.FULL;
}
this.manifest = manifestManager.generateManifest(job, backupType, previousManifests.size());
}

Expand Down Expand Up @@ -107,19 +119,20 @@ private void listAffectedFilesFromBackupSources() {
if (uniquePaths.isEmpty()) {
throw new IllegalStateException("No files found in backup sources!");
}
progressTracker.completeStep(SCAN_FILES);
detectCaseInsensitivityIssues(uniquePaths);
log.info("Found {} unique paths in backup sources. Parsing metadata...", uniquePaths.size());
final var doneCount = new AtomicInteger(0);
progressTracker.estimateStepSubtotal(PARSE_METADATA, uniquePaths.size());
this.filesFound = threadPool.submit(() -> uniquePaths.parallelStream()
.map(path -> {
final var fileMetadata = metadataParser.parse(path.toFile(), manifest.getConfiguration());
LogUtil.logIfThresholdReached(doneCount.incrementAndGet(), uniquePaths.size(),
(done, total) -> log.info("Parsed {} of {} unique paths.", done, total));
progressTracker.recordProgressInSubSteps(PARSE_METADATA);
return fileMetadata;
})
.collect(Collectors.toList())).join();
LogUtil.logStatistics(filesFound,
(type, count) -> log.info("Found {} {} items in backup sources.", count, type));
progressTracker.completeStep(PARSE_METADATA);
}

private void detectCaseInsensitivityIssues(final SortedSet<Path> uniquePaths) {
Expand Down Expand Up @@ -182,6 +195,7 @@ private void executeBackup(final int threads) {
final var totalBackupSize = backupFileSet.values().stream()
.mapToLong(FileMetadata::getOriginalSizeBytes)
.sum();
progressTracker.estimateStepSubtotal(BACKUP, totalBackupSize);
final var totalSize = totalBackupSize / MEBIBYTE;
log.info("Backing up delta for {} files ({} MiB)", backupFileSet.size(), totalSize);
try (var pipeline = getPipeline(threads)) {
Expand Down Expand Up @@ -215,13 +229,14 @@ private void executeBackup(final int threads) {
manifest.setIndexFileName(pipeline.getIndexFileWritten().getFileName().toString());
final var endTimeMillis = System.currentTimeMillis();
final var durationMillis = endTimeMillis - startTimeMillis;
progressTracker.completeStep(BACKUP);
log.info("Archive write completed. Archive write took: {}", toProcessSummary(durationMillis, totalBackupSize));
} catch (final Exception e) {
throw new ArchivalException("Archival process failed.", e);
}
}

private void findPreviousVersionToReuseOrAddToBackupFileSet(@NotNull final FileMetadata file) {
private void findPreviousVersionToReuseOrAddToBackupFileSet(final @NotNull FileMetadata file) {
if (file.getFileType() == FileType.DIRECTORY) {
updateDirectoryChangeStatus(file);
manifest.getFiles().put(file.getId(), file);
Expand All @@ -239,25 +254,28 @@ private void findPreviousVersionToReuseOrAddToBackupFileSet(@NotNull final FileM
backupFileSet.put(file.getAbsolutePath(), file);
}

private void updateDirectoryChangeStatus(@NotNull final FileMetadata file) {
private void updateDirectoryChangeStatus(final @NotNull FileMetadata file) {
final var previousVersion = changeDetector.findPreviousVersionByAbsolutePath(file.getAbsolutePath());
if (previousVersion != null) {
final var change = changeDetector.classifyChange(previousVersion, file);
file.setStatus(change);
}
}

@NotNull
private BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> getPipeline(
private @NotNull BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> getPipeline(
final int threads) throws IOException {
final BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> pipeline;
if (threads == 1) {
return new BackupPipeline(manifest);
pipeline = new BackupPipeline(manifest);
} else {
return new ParallelBackupPipeline(manifest, threads);
pipeline = new ParallelBackupPipeline(manifest, threads);
}
pipeline.setProgressTracker(progressTracker);
return pipeline;
}

private void saveManifest() {
manifestManager.persist(manifest);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.nagyesta.filebarj.core.backup.pipeline;

import com.github.nagyesta.filebarj.core.config.BackupJobConfiguration;
import com.github.nagyesta.filebarj.core.progress.LoggingProgressListener;
import com.github.nagyesta.filebarj.core.progress.ProgressListener;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;

@Data
@Builder
public class BackupParameters {
private final @NonNull BackupJobConfiguration job;
@Builder.Default
private final boolean forceFull = false;
@Builder.Default
private final @NonNull ProgressListener progressListener = LoggingProgressListener.INSTANCE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ public class BackupPipeline extends BaseBackupPipeline<BarjCargoArchiverFileOutp
* @param manifest The manifest
* @throws IOException When the stream cannot be created due to an I/O error
*/
public BackupPipeline(@NotNull final BackupIncrementManifest manifest) throws IOException {
public BackupPipeline(final @NotNull BackupIncrementManifest manifest) throws IOException {
super(manifest, convert(manifest));
}

@NonNull
private static BarjCargoArchiverFileOutputStream convert(
@NonNull final BackupIncrementManifest manifest) throws IOException {
private static @NonNull BarjCargoArchiverFileOutputStream convert(
final @NonNull BackupIncrementManifest manifest) throws IOException {
return new BarjCargoArchiverFileOutputStream(
BarjCargoOutputStreamConfiguration.builder()
.folder(manifest.getConfiguration().getDestinationDirectory())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import com.github.nagyesta.filebarj.core.model.BackupIncrementManifest;
import com.github.nagyesta.filebarj.core.model.FileMetadata;
import com.github.nagyesta.filebarj.core.model.enums.FileType;
import com.github.nagyesta.filebarj.core.progress.NoOpProgressTracker;
import com.github.nagyesta.filebarj.core.progress.ProgressStep;
import com.github.nagyesta.filebarj.core.progress.ProgressTracker;
import com.github.nagyesta.filebarj.io.stream.BarjCargoArchiverFileOutputStream;
import com.github.nagyesta.filebarj.io.stream.BarjCargoBoundarySource;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.jetbrains.annotations.NotNull;
Expand All @@ -28,6 +32,8 @@ public class BaseBackupPipeline<T extends BarjCargoArchiverFileOutputStream> imp

private final BackupIncrementManifest manifest;
private final T outputStream;
@Setter
private @NonNull ProgressTracker progressTracker = new NoOpProgressTracker();

/**
* Creates a new instance for the manifest that must be used for the backup.
Expand All @@ -36,8 +42,8 @@ public class BaseBackupPipeline<T extends BarjCargoArchiverFileOutputStream> imp
* @param outputStream The stream to write to
*/
protected BaseBackupPipeline(
@NotNull final BackupIncrementManifest manifest,
@NotNull final T outputStream) {
final @NotNull BackupIncrementManifest manifest,
final @NotNull T outputStream) {
this.manifest = manifest;
this.outputStream = outputStream;
manifest.getVersions().forEach(version -> {
Expand Down Expand Up @@ -77,7 +83,7 @@ public Path getIndexFileWritten() {
* @throws ArchivalException When the file cannot be archived due to an I/O error from the stream
*/
public List<ArchivedFileMetadata> storeEntries(
@NonNull final List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
final @NonNull List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
return groupedFileMetadataList.stream().map(fileMetadataList -> {
if (fileMetadataList == null || fileMetadataList.isEmpty()) {
throw new IllegalArgumentException("File metadata list cannot be null or empty");
Expand All @@ -92,6 +98,7 @@ public List<ArchivedFileMetadata> storeEntries(
warnIfHashDoesNotMatch(duplicate, archivedFileMetadata);
archivedFileMetadata.getFiles().add(duplicate.getId());
duplicate.setArchiveMetadataId(archivedFileMetadata.getId());
reportProgress(duplicate);
});
return archivedFileMetadata;
} catch (final Exception e) {
Expand Down Expand Up @@ -163,6 +170,7 @@ private void archiveContentAndUpdateMetadata(
warnIfHashDoesNotMatch(fileMetadata, archivedFileMetadata);
//commit
fileMetadata.setArchiveMetadataId(archivedFileMetadata.getId());
reportProgress(fileMetadata);
}

/**
Expand All @@ -173,12 +181,18 @@ private void archiveContentAndUpdateMetadata(
*/
protected void warnIfHashDoesNotMatch(final FileMetadata fileMetadata, final ArchivedFileMetadata archivedFileMetadata) {
if (!Objects.equals(archivedFileMetadata.getOriginalHash(), fileMetadata.getOriginalHash())) {
log.warn("The hash changed between delta calculation and archival for: " + fileMetadata.getAbsolutePath()
+ " The archive might contain corrupt data for the file.");
log.warn("The hash changed between delta calculation and archival for: {} The archive might contain corrupt data for the file.",
fileMetadata.getAbsolutePath());
fileMetadata.setError("The hash changed between delta calculation and archival.");
}
}

protected void reportProgress(final FileMetadata fileMetadata) {
if (fileMetadata.getOriginalSizeBytes() > 0) {
progressTracker.recordProgressInSubSteps(ProgressStep.BACKUP, fileMetadata.getOriginalSizeBytes());
}
}

private ArchiveEntryLocator createArchiveEntryLocator(final UUID archiveId) {
return ArchiveEntryLocator.builder()
.entryName(archiveId)
Expand Down
Loading