Skip to content

Commit

Permalink
Implement duplicate handling strategies (#99)
Browse files Browse the repository at this point in the history
- Removes KEEP_ONE_PER_INCREMENT constant as this duplication strategy does not make sense
- Implements duplication strategies comparing files as the configured hash algorithm dictates
- Adds error to the backup if the hash changed between parse and backup
- Restore pipeline logs warning when restoring an archived file with any errors saved to the backup
- Adds new test cases
- Updates documentation

Resolves #96
{minor}

Signed-off-by: Esta Nagy <[email protected]>
  • Loading branch information
nagyesta authored Jan 15, 2024
1 parent 4de65a1 commit fc3b13d
Show file tree
Hide file tree
Showing 16 changed files with 699 additions and 57 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ File BaRJ comes with the following features
- Backup archive splitting to configurable chunks
- Backup archive integrity checks
- Restore/unpack previous backup
- Duplicate handling (storing duplicates of the same file only once)

### Planned features

- Duplicate handling (storing duplicates of the same file only once)
- Merge previous backup increments
- UI for convenient configuration

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.nagyesta.filebarj.core.backup.pipeline;

import com.github.nagyesta.filebarj.core.backup.ArchivalException;
import com.github.nagyesta.filebarj.core.backup.worker.DefaultBackupScopePartitioner;
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParser;
import com.github.nagyesta.filebarj.core.backup.worker.FileMetadataParserLocal;
import com.github.nagyesta.filebarj.core.common.FileMetadataChangeDetector;
Expand Down Expand Up @@ -31,7 +32,7 @@
/**
* Controller implementation for the backup process.
*/
@SuppressWarnings({"checkstyle:TodoComment", "ConstantValue"})
@SuppressWarnings({"checkstyle:TodoComment"})
@Slf4j
public class BackupController {
private static final int BATCH_SIZE = 250000;
Expand Down Expand Up @@ -152,17 +153,20 @@ private void executeBackup(final int threads) {
.filter(metadata -> metadata.getStatus().isStoreContent())
.filter(metadata -> metadata.getFileType() == FileType.DIRECTORY)
.forEach(metadata -> manifest.getFiles().put(metadata.getId(), metadata));
final var scope = this.backupFileSet.values().stream()
.filter(metadata -> metadata.getStatus().isStoreContent())
.filter(metadata -> metadata.getFileType().isContentSource())
.toList();
final var config = manifest.getConfiguration();
final var duplicateStrategy = config.getDuplicateStrategy();
final var hashAlgorithm = config.getHashAlgorithm();
final var scope = new DefaultBackupScopePartitioner(BATCH_SIZE, duplicateStrategy, hashAlgorithm)
.partitionBackupScope(backupFileSet.values());
try {
for (var i = 0; i < scope.size(); i += BATCH_SIZE) {
final var batch = scope.subList(i, Math.min(i + BATCH_SIZE, scope.size()));
for (final var batch : scope) {
final var archived = pipeline.storeEntries(batch);
archived.forEach(entry -> manifest.getArchivedEntries().put(entry.getId(), entry));
}
scope.forEach(metadata -> manifest.getFiles().put(metadata.getId(), metadata));
scope.stream()
.flatMap(Collection::stream)
.flatMap(Collection::stream)
.forEach(metadata -> manifest.getFiles().put(metadata.getId(), metadata));
} catch (final Exception e) {
throw new ArchivalException("Failed to store files: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,29 @@ public Path getIndexFileWritten() {
/**
* Stores the given files in the archive.
*
* @param fileMetadataList The list of file metadata we should store
* @param groupedFileMetadataList The list of file metadata we should store grouped by their
* contents (each list in this list represents duplicates of the
* same file)
* @return the list of archived files
* @throws ArchivalException When the file cannot be archived due to an I/O error from the stream
*/
public List<ArchivedFileMetadata> storeEntries(
@NonNull final List<FileMetadata> fileMetadataList) throws ArchivalException {
return fileMetadataList.stream().map(fileMetadata -> {
if (fileMetadata == null) {
throw new IllegalArgumentException("File metadata cannot be null");
@NonNull final List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
return groupedFileMetadataList.stream().map(fileMetadataList -> {
if (fileMetadataList == null || fileMetadataList.isEmpty()) {
throw new IllegalArgumentException("File metadata list cannot be null or empty");
}
final var fileMetadata = fileMetadataList.get(0);
try {
log.debug("Storing {}", fileMetadata.getAbsolutePath());
fileMetadata.assertContentSource();
final var archivedFileMetadata = generateArchiveFileMetadata(fileMetadata);
archiveContentAndUpdateMetadata(fileMetadata, archivedFileMetadata);
fileMetadataList.stream().skip(1).forEach(duplicate -> {
warnIfHashDoesNotMatch(duplicate, archivedFileMetadata);
archivedFileMetadata.getFiles().add(duplicate.getId());
duplicate.setArchiveMetadataId(archivedFileMetadata.getId());
});
return archivedFileMetadata;
} catch (final Exception e) {
log.error("Failed to store {}", fileMetadata.getAbsolutePath(), e);
Expand Down Expand Up @@ -152,12 +160,23 @@ private void archiveContentAndUpdateMetadata(
}
archivedFileMetadata.setOriginalHash(source.getContentBoundary().getOriginalHash());
archivedFileMetadata.setArchivedHash(source.getContentBoundary().getArchivedHash());
warnIfHashDoesNotMatch(fileMetadata, archivedFileMetadata);
//commit
fileMetadata.setArchiveMetadataId(archivedFileMetadata.getId());
}

/**
* Logs a warning if the hash of the file changed between delta calculation and archival.
*
* @param fileMetadata the file metadata
* @param archivedFileMetadata the archived file metadata
*/
protected void warnIfHashDoesNotMatch(final FileMetadata fileMetadata, final ArchivedFileMetadata archivedFileMetadata) {
if (!Objects.equals(archivedFileMetadata.getOriginalHash(), fileMetadata.getOriginalHash())) {
log.warn("The hash changed between delta calculation and archival for: " + fileMetadata.getAbsolutePath()
+ " The archive might contain corrupt data for the file.");
fileMetadata.setError("The hash changed between delta calculation and archival.");
}
//commit
fileMetadata.setArchiveMetadataId(archivedFileMetadata.getId());
}

private ArchiveEntryLocator createArchiveEntryLocator(final UUID archiveId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

Expand Down Expand Up @@ -55,21 +54,30 @@ private static ParallelBarjCargoArchiverFileOutputStream convert(
/**
* Stores the given files in the archive.
*
* @param fileMetadataList The list of file metadata we should store
* @param groupedFileMetadataList The list of file metadata we should store
* @return the list of archived files
* @throws ArchivalException When the file cannot be archived due to an I/O error from the stream
*/
public List<ArchivedFileMetadata> storeEntries(
@NonNull final List<FileMetadata> fileMetadataList) throws ArchivalException {
final var list = fileMetadataList.stream().map(fileMetadata -> {
if (fileMetadata == null) {
throw new IllegalArgumentException("File metadata cannot be null");
@NonNull final List<List<FileMetadata>> groupedFileMetadataList) throws ArchivalException {
final var list = groupedFileMetadataList.stream().map(fileMetadataList -> {
if (fileMetadataList == null || fileMetadataList.isEmpty()) {
throw new IllegalArgumentException("File metadata list cannot be null or empty");
}
final var fileMetadata = fileMetadataList.get(0);
try {
log.debug("Storing {}", fileMetadata.getAbsolutePath());
fileMetadata.assertContentSource();
final var archivedFileMetadata = generateArchiveFileMetadata(fileMetadata);
return archiveContentAndUpdateMetadata(fileMetadata, archivedFileMetadata);
return archiveContentAndUpdateMetadata(fileMetadata, archivedFileMetadata)
.thenApply(archived -> {
fileMetadataList.stream().skip(1).forEach(duplicate -> {
warnIfHashDoesNotMatch(duplicate, archived);
duplicate.setArchiveMetadataId(archived.getId());
archived.getFiles().add(duplicate.getId());
});
return archived;
});
} catch (final Exception e) {
log.error("Failed to store {}", fileMetadata.getAbsolutePath(), e);
throw new ArchivalException("Failed to store " + fileMetadata.getAbsolutePath(), e);
Expand Down Expand Up @@ -103,10 +111,7 @@ private CompletableFuture<ArchivedFileMetadata> archiveContentAndUpdateMetadata(
return futureSource.thenApplyAsync(boundarySource -> {
archivedFileMetadata.setOriginalHash(boundarySource.getContentBoundary().getOriginalHash());
archivedFileMetadata.setArchivedHash(boundarySource.getContentBoundary().getArchivedHash());
if (!Objects.equals(archivedFileMetadata.getOriginalHash(), fileMetadata.getOriginalHash())) {
log.warn("The hash changed between delta calculation and archival for: " + fileMetadata.getAbsolutePath()
+ " The archive might contain corrupt data for the file.");
}
warnIfHashDoesNotMatch(fileMetadata, archivedFileMetadata);
//commit
fileMetadata.setArchiveMetadataId(archivedFileMetadata.getId());
return archivedFileMetadata;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.github.nagyesta.filebarj.core.backup.worker;

import com.github.nagyesta.filebarj.core.model.FileMetadata;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.List;

/**
* Partitions the backup scope into smaller batches.
*/
public interface BackupScopePartitioner {

/**
* Partitions the backup scope into smaller batches.
*
* @param scope the backup scope
* @return the partitioned scope
*/
@NotNull
List<List<List<FileMetadata>>> partitionBackupScope(@NotNull Collection<FileMetadata> scope);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.github.nagyesta.filebarj.core.backup.worker;

import com.github.nagyesta.filebarj.core.config.enums.DuplicateHandlingStrategy;
import com.github.nagyesta.filebarj.core.config.enums.HashAlgorithm;
import com.github.nagyesta.filebarj.core.model.FileMetadata;
import lombok.NonNull;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Base implementation of {@link BackupScopePartitioner}.
*/
public class DefaultBackupScopePartitioner implements BackupScopePartitioner {

private final int batchSize;
private final Function<FileMetadata, String> groupingFunction;

/**
* Creates a new instance with the specified batch size.
*
* @param batchSize the batch size
* @param duplicateHandlingStrategy the duplicate handling strategy
* @param hashAlgorithm the hash algorithm the backup is using
*/
public DefaultBackupScopePartitioner(
final int batchSize,
@NonNull final DuplicateHandlingStrategy duplicateHandlingStrategy,
@NonNull final HashAlgorithm hashAlgorithm) {
this.batchSize = batchSize;
this.groupingFunction = duplicateHandlingStrategy.fileGroupingFunctionForHash(hashAlgorithm);
}

@Override
@NotNull
public List<List<List<FileMetadata>>> partitionBackupScope(@NonNull final Collection<FileMetadata> scope) {
final var groupedScope = filterAndGroup(scope);
return partition(groupedScope);
}

@NotNull
private Collection<List<FileMetadata>> filterAndGroup(@NotNull final Collection<FileMetadata> scope) {
return scope.stream()
.filter(metadata -> metadata.getStatus().isStoreContent())
.filter(metadata -> metadata.getFileType().isContentSource())
.collect(Collectors.groupingBy(groupingFunction))
.values();
}

@NotNull
private List<List<List<FileMetadata>>> partition(@NotNull final Collection<List<FileMetadata>> groupedScope) {
final List<List<List<FileMetadata>>> partitionedScope = new ArrayList<>();
var batch = new ArrayList<List<FileMetadata>>();
var size = 0;
for (final var group : groupedScope) {
batch.add(group);
size += group.size();
if (size >= batchSize) {
partitionedScope.add(batch);
batch = new ArrayList<>();
size = 0;
}
}
if (!batch.isEmpty()) {
partitionedScope.add(batch);
}
return partitionedScope;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.github.nagyesta.filebarj.core.config.enums;

import com.github.nagyesta.filebarj.core.model.FileMetadata;
import lombok.NonNull;

import java.util.function.Function;

/**
* Defines the strategy used in case a file is found in more than one place.
*/
Expand All @@ -10,19 +15,29 @@ public enum DuplicateHandlingStrategy {
* Each duplicate is added as many times as it is found in the source.
*/
KEEP_EACH,
/**
* Archives one copy for each backup increment.
* <br/>e.g.,<br/>
* The second instance of the same file is not added to the current backup increment if it was
* already saved once. Each duplicate can point to the same archive file.
*/
KEEP_ONE_PER_INCREMENT,
/**
* Archives one copy per any increment of the backup since the last full backup.
* <br/>e.g.,<br/>
* The file is not added to the current archive even if the duplicate is found archived in a
* previous backup version, such as a file was overwritten with a previously archived version
* of the same file,
*/
KEEP_ONE_PER_BACKUP
KEEP_ONE_PER_BACKUP {

@Override
public Function<FileMetadata, String> fileGroupingFunctionForHash(final @NonNull HashAlgorithm hashAlgorithm) {
return hashAlgorithm.fileGroupingFunction();
}
};

/**
* Returns the file metadata grouping function for the specified hash algorithm. The grouping
* function is used to form groups containing the files with the same content in the backup.
*
* @param hashAlgorithm the hash algorithm
* @return the grouping function
*/
public Function<FileMetadata, String> fileGroupingFunctionForHash(@NonNull final HashAlgorithm hashAlgorithm) {
return fileMetadata -> fileMetadata.getId().toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package com.github.nagyesta.filebarj.core.config.enums;

import com.github.nagyesta.filebarj.core.model.FileMetadata;
import com.github.nagyesta.filebarj.io.stream.internal.OptionalDigestOutputStream;
import lombok.Getter;
import lombok.ToString;

import java.io.OutputStream;
import java.util.function.Function;

/**
* Defines the supported hash algorithms used for hash calculations.
*/
@Getter
@ToString
public enum HashAlgorithm {

/**
* No hash calculation needed.
*/
NONE(null),
NONE(null) {
@Override
public Function<FileMetadata, String> fileGroupingFunction() {
return fileMetadata -> fileMetadata.getAbsolutePath().getFileName().toString()
+ SEPARATOR + fileMetadata.getOriginalSizeBytes()
+ SEPARATOR + fileMetadata.getLastModifiedUtcEpochSeconds();
}
},
/**
* MD5.
*/
Expand All @@ -33,6 +43,8 @@ public enum HashAlgorithm {
*/
SHA512("SHA-512");

private static final String SEPARATOR = "_";

private final String algorithmName;

/**
Expand All @@ -54,4 +66,14 @@ public enum HashAlgorithm {
public OptionalDigestOutputStream decorate(final OutputStream stream) {
return new OptionalDigestOutputStream(stream, this.getAlgorithmName());
}

/**
* Returns the file metadata grouping function for the hash algorithm.
*
* @return the grouping function
*/
public Function<FileMetadata, String> fileGroupingFunction() {
return fileMetadata -> fileMetadata.getOriginalHash()
+ SEPARATOR + fileMetadata.getOriginalSizeBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public void restoreFiles(
.map(FileMetadata::getAbsolutePath)
.collect(Collectors.toSet());
final var files = manifest.getFilesOfLastManifest();
files.values().stream()
.filter(fileMetadata -> fileMetadata.getError() != null)
.forEach(fileMetadata -> log.warn("File {} might be corrupted. The following error was saved during backup:\n {}",
fileMetadata.getAbsolutePath(), fileMetadata.getError()));
final var entries = manifest.getArchivedEntriesOfLastManifest();
final var restoreScope = new RestoreScope(files, entries, changeStatus, pathsToRestore);
final var filesWithContentChanges = restoreScope.getChangedContentSourcesByPath();
Expand Down
Loading

0 comments on commit fc3b13d

Please sign in to comment.