Skip to content

Commit

Permalink
Add progress listeners to the long-running operations (#323)
Browse files Browse the repository at this point in the history
- Defines progress tracker and listener interface
- Implements progress tracker
- Implements logging progress listener
- Fixes log patterns to make logs easier to read
- Integrates progress tracking into backup, restore, merge, inspect operations
- Adds new tests
- Fixes the order of modifiers for the whole codebase
- Fixes an issue causing incorrect backup type to be reported in case of forced full backups
- Changes approach for passing input parameters to controllers
- Defines new parameter VOs with builders
- Updates tests and documentation

Resolves #297
{major}

Signed-off-by: Esta Nagy <[email protected]>
  • Loading branch information
nagyesta authored Aug 23, 2024
1 parent a4c2b8c commit 6bfcfd4
Show file tree
Hide file tree
Showing 119 changed files with 2,071 additions and 1,022 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/gradle-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ jobs:

steps:
# Set up build environment
- name: Prepare git
if: ${{ matrix.os != 'ubuntu-latest' }}
run: git config --global core.autocrlf false
- name: Checkout
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
Expand Down
7 changes: 3 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ subprojects {
excludes = mutableListOf(
"com.github.nagyesta.filebarj.core.backup.FileParseException",
"com.github.nagyesta.filebarj.core.backup.worker.WindowsFileMetadataParser",
"com.github.nagyesta.filebarj.core.progress.NoOpProgressTracker",
"com.github.nagyesta.filebarj.core.restore.worker.WindowsFileMetadataSetter",
"com.github.nagyesta.filebarj.job.Main",
"com.github.nagyesta.filebarj.job.Controller"
Expand Down Expand Up @@ -220,10 +221,8 @@ subprojects {
setOutputFormat("json")
//noinspection UnnecessaryQualifiedReference
val attachmentText = org.cyclonedx.model.AttachmentText()
attachmentText.setText(
Base64.getEncoder().encodeToString(
file("${project.rootProject.projectDir}/LICENSE").readBytes()
)
attachmentText.text = Base64.getEncoder().encodeToString(
file("${project.rootProject.projectDir}/LICENSE").readBytes()
)
attachmentText.encoding = "base64"
attachmentText.contentType = "text/plain"
Expand Down
44 changes: 32 additions & 12 deletions file-barj-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ final var configuration = BackupJobConfiguration.builder()
.chunkSizeMebibyte(1)
.encryptionKey(null)
.build();
final var backupController = new BackupController(configuration, false);
final var backupParameters = BackupParameters.builder()
.job(configuration)
.forceFull(false)
.build();
final var backupController = new BackupController(backupParameters);

//executing the backup
backupController.execute(1);
Expand All @@ -66,13 +70,14 @@ backupController.execute(1);
### Merging increments

```java
final var mergeController = new MergeController(
Path.of("/tmp/backup"),
"prefix",
null, //optional key encryption key
123L, //Backup start epoch seconds for the first file of the range (inclusive)
234L //Backup start epoch seconds for the last file of the range (inclusive)
);
final var mergeParameters = MergeParameters.builder()
.backupDirectory(Path.of("/tmp/backup"))
.fileNamePrefix("prefix")
.kek(null) //optional key encryption key
.rangeStartEpochSeconds(123L) //Backup start epoch seconds for the first file of the range (inclusive)
.rangeEndEpochSeconds(234L) //Backup start epoch seconds for the last file of the range (inclusive)
.build();
final var mergeController = new MergeController(mergeParameters);

mergeController.execute(false);
```
Expand All @@ -91,8 +96,13 @@ final var restoreTask = RestoreTask.builder()
.includedPath(BackupPath.of("/source/dir")) //optional path filter
.permissionComparisonStrategy(PermissionComparisonStrategy.STRICT) //optional
.build();
final var pointInTime = 123456L;
final var restoreController = new RestoreController(Path.of("/tmp/backup"), "test", null, pointInTime);
final var restoreParameters = RestoreParameters.builder()
.backupDirectory(Path.of("/tmp/backup"))
.fileNamePrefix("test")
.kek(null)
.atPointInTime(123456L)
.build();
final var restoreController = new RestoreController(restoreParameters);

//executing the restore
restoreController.execute(restoreTask);
Expand All @@ -104,7 +114,12 @@ restoreController.execute(restoreTask);
//configuring the inspection job
final var backupDir = Path.of("/backup/directory");
final var outputFile = Path.of("/backup/directory");
final var controller = new IncrementInspectionController(backupDir, "file-prefix", null);
final var inspectParameters = InspectParameters.builder()
.backupDirectory(backupDir)
.fileNamePrefix("file-prefix")
.kek(null)
.build();
final var controller = new IncrementInspectionController(inspectParameters);

//list the summary of the available increments
controller.inspectIncrements(System.out);
Expand All @@ -119,7 +134,12 @@ controller.inspectContent(Long.MAX_VALUE, outputFile);
//configuring the deletion job
final var backupDir = Path.of("/backup/directory");
final var outputFile = Path.of("/backup/directory");
final var controller = new IncrementDeletionController(backupDir, "file-prefix", null);
final var deletionParameters = IncrementDeletionParameters.builder()
.backupDirectory(backupDir)
.fileNamePrefix("file-prefix")
.kek(null)
.build();
final var controller = new IncrementDeletionController(deletionParameters);

//Delete all backup increments:
// - starting with the one created at 123456
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParser;
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParserFactory;
import com.github.nagyesta.filebarj.core.common.*;
import com.github.nagyesta.filebarj.core.config.BackupJobConfiguration;
import com.github.nagyesta.filebarj.core.model.BackupIncrementManifest;
import com.github.nagyesta.filebarj.core.model.BackupPath;
import com.github.nagyesta.filebarj.core.model.FileMetadata;
import com.github.nagyesta.filebarj.core.model.enums.BackupType;
import com.github.nagyesta.filebarj.core.model.enums.FileType;
import com.github.nagyesta.filebarj.core.progress.ObservableProgressTracker;
import com.github.nagyesta.filebarj.core.progress.ProgressStep;
import com.github.nagyesta.filebarj.core.progress.ProgressTracker;
import com.github.nagyesta.filebarj.core.util.LogUtil;
import com.github.nagyesta.filebarj.io.stream.BarjCargoArchiverFileOutputStream;
import lombok.Getter;
Expand All @@ -22,10 +24,10 @@
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static com.github.nagyesta.filebarj.core.progress.ProgressStep.*;
import static com.github.nagyesta.filebarj.core.util.TimerUtil.toProcessSummary;
import static com.github.nagyesta.filebarj.io.stream.internal.ChunkingFileOutputStream.MEBIBYTE;

Expand All @@ -36,8 +38,9 @@
@Slf4j
public class BackupController {
private static final int BATCH_SIZE = 250000;
private static final List<ProgressStep> PROGRESS_STEPS = List.of(LOAD_MANIFESTS, SCAN_FILES, PARSE_METADATA, BACKUP);
private final FileMetadataParser metadataParser = FileMetadataParserFactory.newInstance();
private final ManifestManager manifestManager = new ManifestManagerImpl();
private final ManifestManager manifestManager;
@Getter
private final BackupIncrementManifest manifest;
private final SortedMap<Integer, BackupIncrementManifest> previousManifests;
Expand All @@ -47,22 +50,31 @@ public class BackupController {
private final ReentrantLock executionLock = new ReentrantLock();
private FileMetadataChangeDetector changeDetector;
private ForkJoinPool threadPool;
private final ProgressTracker progressTracker;

/**
* Creates a new instance and initializes it for the specified job.
*
* @param job the job configuration
* @param forceFull whether to force a full backup (overriding the configuration)
* @param parameters The parameters
*/
public BackupController(@NonNull final BackupJobConfiguration job, final boolean forceFull) {
public BackupController(final @NonNull BackupParameters parameters) {
this.progressTracker = new ObservableProgressTracker(PROGRESS_STEPS);
progressTracker.registerListener(parameters.getProgressListener());
this.manifestManager = new ManifestManagerImpl(progressTracker);

final var job = parameters.getJob();
var backupType = job.getBackupType();
this.previousManifests = new TreeMap<>();
final var forceFull = parameters.isForceFull();
if (!forceFull && backupType != BackupType.FULL) {
this.previousManifests.putAll(manifestManager.loadPreviousManifestsForBackup(job));
if (previousManifests.isEmpty()) {
backupType = BackupType.FULL;
}
}
if (forceFull) {
backupType = BackupType.FULL;
}
this.manifest = manifestManager.generateManifest(job, backupType, previousManifests.size());
}

Expand Down Expand Up @@ -107,19 +119,20 @@ private void listAffectedFilesFromBackupSources() {
if (uniquePaths.isEmpty()) {
throw new IllegalStateException("No files found in backup sources!");
}
progressTracker.completeStep(SCAN_FILES);
detectCaseInsensitivityIssues(uniquePaths);
log.info("Found {} unique paths in backup sources. Parsing metadata...", uniquePaths.size());
final var doneCount = new AtomicInteger(0);
progressTracker.estimateStepSubtotal(PARSE_METADATA, uniquePaths.size());
this.filesFound = threadPool.submit(() -> uniquePaths.parallelStream()
.map(path -> {
final var fileMetadata = metadataParser.parse(path.toFile(), manifest.getConfiguration());
LogUtil.logIfThresholdReached(doneCount.incrementAndGet(), uniquePaths.size(),
(done, total) -> log.info("Parsed {} of {} unique paths.", done, total));
progressTracker.recordProgressInSubSteps(PARSE_METADATA);
return fileMetadata;
})
.collect(Collectors.toList())).join();
LogUtil.logStatistics(filesFound,
(type, count) -> log.info("Found {} {} items in backup sources.", count, type));
progressTracker.completeStep(PARSE_METADATA);
}

private void detectCaseInsensitivityIssues(final SortedSet<Path> uniquePaths) {
Expand Down Expand Up @@ -182,6 +195,7 @@ private void executeBackup(final int threads) {
final var totalBackupSize = backupFileSet.values().stream()
.mapToLong(FileMetadata::getOriginalSizeBytes)
.sum();
progressTracker.estimateStepSubtotal(BACKUP, totalBackupSize);
final var totalSize = totalBackupSize / MEBIBYTE;
log.info("Backing up delta for {} files ({} MiB)", backupFileSet.size(), totalSize);
try (var pipeline = getPipeline(threads)) {
Expand Down Expand Up @@ -215,13 +229,14 @@ private void executeBackup(final int threads) {
manifest.setIndexFileName(pipeline.getIndexFileWritten().getFileName().toString());
final var endTimeMillis = System.currentTimeMillis();
final var durationMillis = endTimeMillis - startTimeMillis;
progressTracker.completeStep(BACKUP);
log.info("Archive write completed. Archive write took: {}", toProcessSummary(durationMillis, totalBackupSize));
} catch (final Exception e) {
throw new ArchivalException("Archival process failed.", e);
}
}

private void findPreviousVersionToReuseOrAddToBackupFileSet(@NotNull final FileMetadata file) {
private void findPreviousVersionToReuseOrAddToBackupFileSet(final @NotNull FileMetadata file) {
if (file.getFileType() == FileType.DIRECTORY) {
updateDirectoryChangeStatus(file);
manifest.getFiles().put(file.getId(), file);
Expand All @@ -239,25 +254,28 @@ private void findPreviousVersionToReuseOrAddToBackupFileSet(@NotNull final FileM
backupFileSet.put(file.getAbsolutePath(), file);
}

private void updateDirectoryChangeStatus(@NotNull final FileMetadata file) {
private void updateDirectoryChangeStatus(final @NotNull FileMetadata file) {
final var previousVersion = changeDetector.findPreviousVersionByAbsolutePath(file.getAbsolutePath());
if (previousVersion != null) {
final var change = changeDetector.classifyChange(previousVersion, file);
file.setStatus(change);
}
}

@NotNull
private BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> getPipeline(
private @NotNull BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> getPipeline(
final int threads) throws IOException {
final BaseBackupPipeline<? extends BarjCargoArchiverFileOutputStream> pipeline;
if (threads == 1) {
return new BackupPipeline(manifest);
pipeline = new BackupPipeline(manifest);
} else {
return new ParallelBackupPipeline(manifest, threads);
pipeline = new ParallelBackupPipeline(manifest, threads);
}
pipeline.setProgressTracker(progressTracker);
return pipeline;
}

private void saveManifest() {
manifestManager.persist(manifest);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.nagyesta.filebarj.core.backup.pipeline;

import com.github.nagyesta.filebarj.core.config.BackupJobConfiguration;
import com.github.nagyesta.filebarj.core.progress.LoggingProgressListener;
import com.github.nagyesta.filebarj.core.progress.ProgressListener;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;

@Data
@Builder
public class BackupParameters {
private final @NonNull BackupJobConfiguration job;
@Builder.Default
private final boolean forceFull = false;
@Builder.Default
private final @NonNull ProgressListener progressListener = LoggingProgressListener.INSTANCE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ public class BackupPipeline extends BaseBackupPipeline<BarjCargoArchiverFileOutp
* @param manifest The manifest
* @throws IOException When the stream cannot be created due to an I/O error
*/
public BackupPipeline(@NotNull final BackupIncrementManifest manifest) throws IOException {
public BackupPipeline(final @NotNull BackupIncrementManifest manifest) throws IOException {
super(manifest, convert(manifest));
}

@NonNull
private static BarjCargoArchiverFileOutputStream convert(
@NonNull final BackupIncrementManifest manifest) throws IOException {
private static @NonNull BarjCargoArchiverFileOutputStream convert(
final @NonNull BackupIncrementManifest manifest) throws IOException {
return new BarjCargoArchiverFileOutputStream(
BarjCargoOutputStreamConfiguration.builder()
.folder(manifest.getConfiguration().getDestinationDirectory())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
import com.github.nagyesta.filebarj.core.model.BackupIncrementManifest;
import com.github.nagyesta.filebarj.core.model.FileMetadata;
import com.github.nagyesta.filebarj.core.model.enums.FileType;
import com.github.nagyesta.filebarj.core.progress.NoOpProgressTracker;
import com.github.nagyesta.filebarj.core.progress.ProgressStep;
import com.github.nagyesta.filebarj.core.progress.ProgressTracker;
import com.github.nagyesta.filebarj.io.stream.BarjCargoArchiverFileOutputStream;
import com.github.nagyesta.filebarj.io.stream.BarjCargoBoundarySource;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.jetbrains.annotations.NotNull;
Expand All @@ -28,6 +32,8 @@ public class BaseBackupPipeline<T extends BarjCargoArchiverFileOutputStream> imp

private final BackupIncrementManifest manifest;
private final T outputStream;
@Setter
private @NonNull ProgressTracker progressTracker = new NoOpProgressTracker();

/**
* Creates a new instance for the manifest that must be used for the backup.
Expand All @@ -36,8 +42,8 @@ public class BaseBackupPipeline<T extends BarjCargoArchiverFileOutputStream> imp
* @param outputStream The stream to write to
*/
protected BaseBackupPipeline(
@NotNull final BackupIncrementManifest manifest,
@NotNull final T outputStream) {
final @NotNull BackupIncrementManifest manifest,
final @NotNull T outputStream) {
this.manifest = manifest;
this.outputStream = outputStream;
manifest.getVersions().forEach(version -> {
Expand Down Expand Up @@ -77,7 +83,7 @@ public Path getIndexFileWritten() {
* @throws ArchivalException When the file cannot be archived due to an I/O error from the stream
*/
public List<ArchivedFileMetadata> storeEntries(
@NonNull final List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
final @NonNull List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
return groupedFileMetadataList.stream().map(fileMetadataList -> {
if (fileMetadataList == null || fileMetadataList.isEmpty()) {
throw new IllegalArgumentException("File metadata list cannot be null or empty");
Expand All @@ -92,6 +98,7 @@ public List<ArchivedFileMetadata> storeEntries(
warnIfHashDoesNotMatch(duplicate, archivedFileMetadata);
archivedFileMetadata.getFiles().add(duplicate.getId());
duplicate.setArchiveMetadataId(archivedFileMetadata.getId());
reportProgress(duplicate);
});
return archivedFileMetadata;
} catch (final Exception e) {
Expand Down Expand Up @@ -163,6 +170,7 @@ private void archiveContentAndUpdateMetadata(
warnIfHashDoesNotMatch(fileMetadata, archivedFileMetadata);
//commit
fileMetadata.setArchiveMetadataId(archivedFileMetadata.getId());
reportProgress(fileMetadata);
}

/**
Expand All @@ -173,12 +181,18 @@ private void archiveContentAndUpdateMetadata(
*/
protected void warnIfHashDoesNotMatch(final FileMetadata fileMetadata, final ArchivedFileMetadata archivedFileMetadata) {
if (!Objects.equals(archivedFileMetadata.getOriginalHash(), fileMetadata.getOriginalHash())) {
log.warn("The hash changed between delta calculation and archival for: " + fileMetadata.getAbsolutePath()
+ " The archive might contain corrupt data for the file.");
log.warn("The hash changed between delta calculation and archival for: {} The archive might contain corrupt data for the file.",
fileMetadata.getAbsolutePath());
fileMetadata.setError("The hash changed between delta calculation and archival.");
}
}

protected void reportProgress(final FileMetadata fileMetadata) {
if (fileMetadata.getOriginalSizeBytes() > 0) {
progressTracker.recordProgressInSubSteps(ProgressStep.BACKUP, fileMetadata.getOriginalSizeBytes());
}
}

private ArchiveEntryLocator createArchiveEntryLocator(final UUID archiveId) {
return ArchiveEntryLocator.builder()
.entryName(archiveId)
Expand Down
Loading

0 comments on commit 6bfcfd4

Please sign in to comment.