Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #359 from jclarysse/jclarysse/config-gcs-object-en…
Browse files Browse the repository at this point in the history
…coding

feat: Add support for setting object metadata `Content-Encoding`
  • Loading branch information
jjaakola-aiven authored Jun 6, 2024
2 parents 227fb1d + d93f29c commit a303f93
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...}
gcs.credentials.default=true
##

# The value of object metadata Content-Encoding.
# This can be used for leveraging storage-side de-compression before download.
# Optional, the default is null.
gcs.object.content.encoding=gzip

# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`.
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";

public static final String FILE_MAX_RECORDS = "file.max.records";
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
Expand Down Expand Up @@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW,
"The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++,
ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -332,7 +339,7 @@ private void validate() {
.filter(Objects::nonNull)
.count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
// only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour.
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
Expand Down Expand Up @@ -371,6 +378,10 @@ public String getBucketName() {
return getString(GCS_BUCKET_NAME_CONFIG);
}

public String getObjectContentEncoding() {
return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG);
}

@Override
public CompressionType getCompressionType() {
return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG));
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}

private void flushFile(final String filename, final List<SinkRecord> records) {
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build();
final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename)
.setContentEncoding(config.getObjectContentEncoding())
.build();
try (var out = Channels.newOutputStream(storage.writer(blob));
var writer = OutputWriter.builder()
.withExternalProperties(config.originalsStrings())
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -252,6 +253,45 @@ void compression(final String compression) {
readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0));
}

@ParameterizedTest
@CsvSource({ "none,none", "gzip,none", "none,gzip", "gzip,gzip" })
void contentEncodingAwareDownload(final String compression, final String encoding) {
properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression);
properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, encoding);
final GcsSinkTask task = new GcsSinkTask(properties, storage);

task.put(basicRecords);
task.flush(null);

final CompressionType compressionType = CompressionType.forName(compression);

final List<String> names = Lists.newArrayList("topic0-0-10", "topic0-1-20", "topic0-2-50", "topic1-0-30",
"topic1-1-40");
final List<String> blobNames = names.stream()
.map(n -> n + compressionType.extension())
.collect(Collectors.toList());

assertIterableEquals(blobNames, testBucketAccessor.getBlobNames());
// given a blob with metadata Content-Encoding equal to its byte compression,
// the result of its GS-downloaded bytes is automatically un-compressed (gzip support only)
// see https://cloud.google.com/storage/docs/metadata#content-encoding
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")),
readDecodedFieldsFromDownload("topic0-0-10" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value1"), Collections.singletonList("value6")),
readDecodedFieldsFromDownload("topic0-1-20" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value4"), Collections.singletonList("value9")),
readDecodedFieldsFromDownload("topic0-2-50" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value2"), Collections.singletonList("value7")),
readDecodedFieldsFromDownload("topic1-0-30" + compressionType.extension(), compression, 0));
assertIterableEquals(
Lists.newArrayList(Collections.singletonList("value3"), Collections.singletonList("value8")),
readDecodedFieldsFromDownload("topic1-1-40" + compressionType.extension(), compression, 0));
}

@ParameterizedTest
@ValueSource(strings = { "none", "gzip", "snappy", "zstd" })
void allFields(final String compression) {
Expand Down Expand Up @@ -745,6 +785,11 @@ private Collection<List<String>> readSplittedAndDecodedLinesFromBlob(final Strin
return testBucketAccessor.readAndDecodeLines(blobName, compression, fieldsToDecode);
}

private Collection<List<String>> readDecodedFieldsFromDownload(final String blobName, final String compression,
final int... fieldsToDecode) {
return testBucketAccessor.downloadBlobAndDecodeFields(blobName, compression, fieldsToDecode);
}

private Map<String, Collection<List<String>>> buildBlobNameValuesMap(final String compression) {
final CompressionType compressionType = CompressionType.forName(compression);
final String extension = compressionType.extension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
Expand All @@ -41,6 +45,7 @@

import com.github.luben.zstd.ZstdInputStream;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.xerial.snappy.SnappyInputStream;
Expand All @@ -53,6 +58,7 @@ public final class BucketAccessor {
private List<String> blobNamesCache;
private final Map<String, String> stringContentCache = new HashMap<>();
private final Map<String, List<String>> linesCache = new HashMap<>();
private final Map<String, List<String>> downloadedLinesCache = new HashMap<>();
private final Map<String, List<List<String>>> decodedLinesCache = new HashMap<>();

public BucketAccessor(final Storage storage, final String bucketName, final boolean cache) {
Expand Down Expand Up @@ -121,6 +127,7 @@ public void clear(final String prefix) {
stringContentCache.clear();
linesCache.clear();
decodedLinesCache.clear();
downloadedLinesCache.clear();
}
}

Expand Down Expand Up @@ -165,13 +172,49 @@ private List<String> readLines0(final String blobName, final String compression)
InputStream decompressedStream = getDecompressedStream(bais, compression);
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {

return bufferedReader.lines().collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

public List<String> downloadBlobAndReadLines(final String blobName, final String compression) {
Objects.requireNonNull(blobName, "blobName cannot be null");
Objects.requireNonNull(compression, "compression cannot be null");
if (cache) {
return downloadedLinesCache.computeIfAbsent(blobName,
k -> downloadBlobAndReadLines0(blobName, compression));
} else {
return downloadBlobAndReadLines0(blobName, compression);
}
}

private List<String> downloadBlobAndReadLines0(final String blobName, final String compression) {
final String filePath = downloadBlobToTempFile(blobName);
try {
final byte[] bytes = Files.readAllBytes(Path.of(filePath));
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InputStream decompressedStream = getDecompressedStream(bais, compression);
InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(reader)) {
return bufferedReader.lines().collect(Collectors.toList());
}
} catch (IOException exception) {
throw new RuntimeException(exception); // NOPMD
}
}

private String downloadBlobToTempFile(final String blobName) {
try {
final File file = File.createTempFile("tmp", null);
final String filePath = file.getAbsolutePath();
storage.downloadTo(BlobId.fromGsUtilUri("gs://" + bucketName + "/" + blobName), Paths.get(filePath));
return filePath;
} catch (final IOException e) {
throw new RuntimeException(e); // NOPMD
}
}

private InputStream getDecompressedStream(final InputStream inputStream, final String compression)
throws IOException {
Objects.requireNonNull(inputStream, "inputStream cannot be null");
Expand Down Expand Up @@ -211,6 +254,27 @@ private List<List<String>> readAndDecodeLines0(final String blobName, final Stri
.collect(Collectors.toList());
}

public List<List<String>> downloadBlobAndDecodeFields(final String blobName, final String compression,
final int... fieldsToDecode) {
Objects.requireNonNull(blobName, "blobName cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");

if (cache) {
return decodedLinesCache.computeIfAbsent(blobName,
k -> downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode));
} else {
return downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode);
}
}

private List<List<String>> downloadBlobAndDecodeFields0(final String blobName, final String compression,
final int... fieldsToDecode) {
return downloadBlobAndReadLines(blobName, compression).stream()
.map(l -> l.split(","))
.map(fields -> decodeRequiredFields(fields, fieldsToDecode))
.collect(Collectors.toList());
}

private List<String> decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) {
Objects.requireNonNull(originalFields, "originalFields cannot be null");
Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");
Expand Down

0 comments on commit a303f93

Please sign in to comment.