Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result file compression with zstd #303

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions schema/iguana-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@
},
"path": {
"type": "string"
},
"compressionLevel": {
"type": "integer",
"minimum": 1,
"maximum": 19,
"default": 3
}
},
"required": [
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/aksw/iguana/cc/storage/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ default void storeData(Storable data) {
storeResult(((Storable.AsRDF) data).toRDF());
}
}

default void close() {}
}
36 changes: 35 additions & 1 deletion src/main/java/org/aksw/iguana/cc/storage/impl/CSVStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public class CSVStorage implements Storage {
/** This private record is used to store information about the connections used in a task. */
private record ConnectionInfo(String connection, String version, String dataset) {}

public record Config(String directory) implements StorageConfig {
public record Config(String directory, Integer compressionLevel) implements StorageConfig {
public Config(String directory) {
this(directory, null);
}

public Config(String directory, Integer compressionLevel) {
if (directory == null) {
directory = "results";
}
Expand All @@ -42,6 +46,7 @@ public Config(String directory) {
throw new IllegalArgumentException("The given path is not a directory.");
}
this.directory = directory;
this.compressionLevel = 3;
}
}

Expand All @@ -58,8 +63,14 @@ public Config(String directory) {
private Resource taskRes;
List<ConnectionInfo> connections;

private Integer compressionLevel = 3;

public CSVStorage(Config config, List<Metric> metrics, String suiteID) {
this(config.directory(), metrics, suiteID);
if (config.compressionLevel() != null && (config.compressionLevel() < 1 || config.compressionLevel() > 19)) {
throw new IllegalArgumentException("Compression level must be between 1 and 19.");
}
this.compressionLevel = 3;
}

public CSVStorage(String folderPath, List<Metric> metrics, String suiteID) {
Expand Down Expand Up @@ -230,6 +241,29 @@ public void storeData(Storable data) {
}
}

@Override
public void close() {
final var temp = this.suiteFolder.getParent().relativize(this.suiteFolder);
LOGGER.info("Compressing the suite folder.");
LOGGER.info(String.join(" ", "tar", "-I", String.format("\"zstd -%d\"", compressionLevel), "-cf", temp + ".tar.zst", temp.toString()));
LOGGER.info(this.suiteFolder.getParent().toFile().toString());
if (this.compressionLevel != null) {
try {
final var process = new ProcessBuilder("tar", "-I", String.format("\"zstd -%d\"", compressionLevel), "-cf", temp + ".tar.zst", temp.toString())
.directory(this.suiteFolder.getParent().toFile().getAbsoluteFile())
.start();
try {
LOGGER.info("Waiting for the compression process to finish.");
process.waitFor();
LOGGER.info(Arrays.toString(process.getInputStream().readAllBytes()));
LOGGER.info("Compression process finished. Fuck you");
} catch (InterruptedException ignored) {} // ignore interruption
} catch (IOException e) {
LOGGER.error("Error while compressing the suite folder.", e);
}
}
}

/**
* This method sets the objects attributes by querying the given model.
*
Expand Down
57 changes: 51 additions & 6 deletions src/main/java/org/aksw/iguana/cc/storage/impl/RDFFileStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.util.function.Supplier;

public class RDFFileStorage implements Storage {
public record Config(String path) implements StorageConfig {}
public record Config(String path, Integer compressionLevel) implements StorageConfig {
public Config(String path) {
this(path, null);
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(RDFFileStorage.class.getName());

Expand All @@ -36,11 +40,14 @@ public record Config(String path) implements StorageConfig {}
now.get(Calendar.MILLISECOND));
};

final private Lang lang;
private final Lang lang;
private Path path;
private boolean compression;
private int compressionLevel;
private OutputStream outputStream;

public RDFFileStorage(Config config) {
this(config.path());
this(config.path(), config.compressionLevel());
}

/**
Expand All @@ -50,6 +57,15 @@ public RDFFileStorage() {
this("");
}

public RDFFileStorage(String filename, Integer compressionLevel) {
this(filename);
if (compressionLevel != null && (compressionLevel < 1 || compressionLevel > 19)) {
throw new IllegalArgumentException("Compression level must be between 1 and 19.");
}
this.compression = compressionLevel != null;
this.compressionLevel = compressionLevel != null ? compressionLevel : 0;
}

/**
* Uses the provided filename. If the filename is null or empty, a generated file called
* results_{DD}-{MM}-{YYYY}_{HH}-{mm}.ttl is used. The file extension determines the file format.
Expand All @@ -62,7 +78,8 @@ public RDFFileStorage(String fileName) {
}
else {
path = Paths.get(fileName);
if (Files.exists(path) && Files.isDirectory(path)) {
if ((Files.exists(path) || Files.exists(path.resolveSibling(path.getFileName() + ".zstd")))
&& Files.isDirectory(path)) {
path = path.resolve(defaultFileNameSupplier.get() + ".ttl");
} else if (Files.exists(path)) {
path = Paths.get(FilenameUtils.removeExtension(fileName) + "_" + defaultFileNameSupplier.get() + ".ttl"); // we're just going to assume that that's enough to make it unique
Expand All @@ -79,14 +96,27 @@ public RDFFileStorage(String fileName) {
}

@Override
public void storeResult(Model data){
try (OutputStream os = new FileOutputStream(path.toString(), true)) {
public void storeResult(Model data) {
try {
OutputStream os = getFileOutputstream();
RDFDataMgr.write(os, data, this.lang);
} catch (IOException e) {
LOGGER.error("Could not write to RDFFileStorage using lang: " + lang, e);
}
}

@Override
public void close() {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
LOGGER.error("Could not close output stream for RDFFileStorage", e);
}
}
outputStream = null;
}

@Override
public String toString() {
return this.getClass().getSimpleName();
Expand All @@ -95,4 +125,19 @@ public String toString() {
public String getFileName() {
return this.path.toString();
}

private OutputStream getFileOutputstream() throws IOException {
if (outputStream != null) {
return outputStream;
}

if (compression) { // TODO: check if process closes properly
final var process = new ProcessBuilder("zstd", "-o", path.toString() + ".zstd", "-T0", "-" + compressionLevel, "-q", "-").start();
outputStream = process.getOutputStream();
return outputStream;
} else {
// appending stream doesn't need to be kept open in between writes
return new FileOutputStream(path.toString(), true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public void calculateAndSaveMetrics(Calendar start, Calendar end) {
}
}
}
for (var storage : storages) {
storage.close();
}
}

/**
Expand Down
8 changes: 7 additions & 1 deletion src/main/resources/iguana-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@
},
"path": {
"type": "string"
},
"compressionLevel": {
"type": "integer",
"minimum": 1,
"maximum": 19,
"default": 3
}
},
"required": [
Expand Down Expand Up @@ -354,7 +360,7 @@
}
},
"required": [
"endpoint"
"endpoint"
],
"title": "Template"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class StorageConfigTest {

private static Stream<Arguments> testData() {
return Stream.of(
Arguments.of(new RDFFileStorage.Config("some.ttl"),
Arguments.of(new RDFFileStorage.Config("some.ttl", 2),
"""
{"type":"rdf file","path":"some.ttl"}
{"type":"rdf file","path":"some.ttl", "compressionLevel": 2}
"""
),
Arguments.of(new CSVStorage.Config("csv_results/"),
Expand Down
Loading