diff --git a/README.md b/README.md index c18ab96e..768fa034 100644 --- a/README.md +++ b/README.md @@ -589,6 +589,10 @@ gcs.credentials.json={"type":"...", ...} gcs.credentials.default=true ## +# The value of object metadata Content-Encoding. +# This can be used for leveraging storage-side de-compression before download. +# Optional, the default is null. +gcs.object.content.encoding=gzip # The set of the fields that are to be output, comma separated. # Supported values are: `key`, `value`, `offset`, `timestamp`, and `headers`. diff --git a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 9709242f..5ac3a26b 100644 --- a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -57,11 +57,13 @@ public final class GcsSinkConfig extends AivenCommonConfig { public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json"; public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default"; public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name"; + public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding"; public static final String GCS_USER_AGENT = "gcs.user.agent"; private static final String GROUP_FILE = "File"; public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix"; public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; public static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type"; + public static final String FILE_MAX_RECORDS = "file.max.records"; public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; @@ -135,6 +137,11 @@ private static void addGcsConfigGroup(final ConfigDef configDef) { + GCS_CREDENTIALS_PATH_CONFIG + "\"", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG); + configDef.define(GCS_OBJECT_CONTENT_ENCODING_CONFIG, ConfigDef.Type.STRING, null, + new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW, + "The GCS object metadata value of Content-Encoding.", GROUP_GCS, gcsGroupCounter++, + ConfigDef.Width.NONE, GCS_OBJECT_CONTENT_ENCODING_CONFIG); + configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, "The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD @@ -332,7 +339,7 @@ private void validate() { .filter(Objects::nonNull) .count(); - // only validate non nulls here, since all nulls means falling back to the default "no credential" behavour. + // only validate non nulls here, since all nulls means falling back to the default "no credential" behaviour. if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) { throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.", GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG)); @@ -371,6 +378,10 @@ public String getBucketName() { return getString(GCS_BUCKET_NAME_CONFIG); } + public String getObjectContentEncoding() { + return getString(GCS_OBJECT_CONTENT_ENCODING_CONFIG); + } + @Override public CompressionType getCompressionType() { return CompressionType.forName(getString(FILE_COMPRESSION_TYPE_CONFIG)); diff --git a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java index f510723b..eda879e4 100644 --- a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java +++ b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java @@ -118,7 +118,9 @@ public void flush(final Map currentOffsets) { } private void flushFile(final String filename, final List records) { - final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename).build(); + final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename) + .setContentEncoding(config.getObjectContentEncoding()) + .build(); try (var out = Channels.newOutputStream(storage.writer(blob)); var writer = OutputWriter.builder() .withExternalProperties(config.originalsStrings()) diff --git a/src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java b/src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java index 4bb903ba..b084dc3b 100644 --- a/src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java +++ b/src/test/java/io/aiven/kafka/connect/gcs/GcsSinkTaskTest.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.threeten.bp.Duration; @@ -252,6 +253,45 @@ void compression(final String compression) { readSplittedAndDecodedLinesFromBlob("topic1-1-40" + compressionType.extension(), compression, 0)); } + @ParameterizedTest + @CsvSource({ "none,none", "gzip,none", "none,gzip", "gzip,gzip" }) + void contentEncodingAwareDownload(final String compression, final String encoding) { + properties.put(GcsSinkConfig.FILE_COMPRESSION_TYPE_CONFIG, compression); + properties.put(GcsSinkConfig.GCS_OBJECT_CONTENT_ENCODING_CONFIG, encoding); + final GcsSinkTask task = new GcsSinkTask(properties, storage); + + task.put(basicRecords); + task.flush(null); + + final CompressionType compressionType = CompressionType.forName(compression); + + final List names = Lists.newArrayList("topic0-0-10", "topic0-1-20", "topic0-2-50", "topic1-0-30", + "topic1-1-40"); + final List blobNames = names.stream() + .map(n -> n + compressionType.extension()) + .collect(Collectors.toList()); + + assertIterableEquals(blobNames, testBucketAccessor.getBlobNames()); + // given a blob with metadata Content-Encoding equal to its byte compression, + // the result of its GS-downloaded bytes is automatically un-compressed (gzip support only) + // see https://cloud.google.com/storage/docs/metadata#content-encoding + assertIterableEquals( + Lists.newArrayList(Collections.singletonList("value0"), Collections.singletonList("value5")), + readDecodedFieldsFromDownload("topic0-0-10" + compressionType.extension(), compression, 0)); + assertIterableEquals( + Lists.newArrayList(Collections.singletonList("value1"), Collections.singletonList("value6")), + readDecodedFieldsFromDownload("topic0-1-20" + compressionType.extension(), compression, 0)); + assertIterableEquals( + Lists.newArrayList(Collections.singletonList("value4"), Collections.singletonList("value9")), + readDecodedFieldsFromDownload("topic0-2-50" + compressionType.extension(), compression, 0)); + assertIterableEquals( + Lists.newArrayList(Collections.singletonList("value2"), Collections.singletonList("value7")), + readDecodedFieldsFromDownload("topic1-0-30" + compressionType.extension(), compression, 0)); + assertIterableEquals( + Lists.newArrayList(Collections.singletonList("value3"), Collections.singletonList("value8")), + readDecodedFieldsFromDownload("topic1-1-40" + compressionType.extension(), compression, 0)); + } + @ParameterizedTest @ValueSource(strings = { "none", "gzip", "snappy", "zstd" }) void allFields(final String compression) { @@ -745,6 +785,11 @@ private Collection> readSplittedAndDecodedLinesFromBlob(final Strin return testBucketAccessor.readAndDecodeLines(blobName, compression, fieldsToDecode); } + private Collection> readDecodedFieldsFromDownload(final String blobName, final String compression, + final int... fieldsToDecode) { + return testBucketAccessor.downloadBlobAndDecodeFields(blobName, compression, fieldsToDecode); + } + private Map>> buildBlobNameValuesMap(final String compression) { final CompressionType compressionType = CompressionType.forName(compression); final String extension = compressionType.extension(); diff --git a/src/test/java/io/aiven/kafka/connect/gcs/testutils/BucketAccessor.java b/src/test/java/io/aiven/kafka/connect/gcs/testutils/BucketAccessor.java index 0f20c1d7..e7adc492 100644 --- a/src/test/java/io/aiven/kafka/connect/gcs/testutils/BucketAccessor.java +++ b/src/test/java/io/aiven/kafka/connect/gcs/testutils/BucketAccessor.java @@ -18,10 +18,14 @@ import java.io.BufferedReader; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Base64; import java.util.HashMap; @@ -41,6 +45,7 @@ import com.github.luben.zstd.ZstdInputStream; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import org.xerial.snappy.SnappyInputStream; @@ -53,6 +58,7 @@ public final class BucketAccessor { private List blobNamesCache; private final Map stringContentCache = new HashMap<>(); private final Map> linesCache = new HashMap<>(); + private final Map> downloadedLinesCache = new HashMap<>(); private final Map>> decodedLinesCache = new HashMap<>(); public BucketAccessor(final Storage storage, final String bucketName, final boolean cache) { @@ -121,6 +127,7 @@ public void clear(final String prefix) { stringContentCache.clear(); linesCache.clear(); decodedLinesCache.clear(); + downloadedLinesCache.clear(); } } @@ -165,13 +172,49 @@ private List readLines0(final String blobName, final String compression) InputStream decompressedStream = getDecompressedStream(bais, compression); InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8); BufferedReader bufferedReader = new BufferedReader(reader)) { - return bufferedReader.lines().collect(Collectors.toList()); } catch (final IOException e) { throw new RuntimeException(e); // NOPMD } } + public List downloadBlobAndReadLines(final String blobName, final String compression) { + Objects.requireNonNull(blobName, "blobName cannot be null"); + Objects.requireNonNull(compression, "compression cannot be null"); + if (cache) { + return downloadedLinesCache.computeIfAbsent(blobName, + k -> downloadBlobAndReadLines0(blobName, compression)); + } else { + return downloadBlobAndReadLines0(blobName, compression); + } + } + + private List downloadBlobAndReadLines0(final String blobName, final String compression) { + final String filePath = downloadBlobToTempFile(blobName); + try { + final byte[] bytes = Files.readAllBytes(Path.of(filePath)); + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + InputStream decompressedStream = getDecompressedStream(bais, compression); + InputStreamReader reader = new InputStreamReader(decompressedStream, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(reader)) { + return bufferedReader.lines().collect(Collectors.toList()); + } + } catch (IOException exception) { + throw new RuntimeException(exception); // NOPMD + } + } + + private String downloadBlobToTempFile(final String blobName) { + try { + final File file = File.createTempFile("tmp", null); + final String filePath = file.getAbsolutePath(); + storage.downloadTo(BlobId.fromGsUtilUri("gs://" + bucketName + "/" + blobName), Paths.get(filePath)); + return filePath; + } catch (final IOException e) { + throw new RuntimeException(e); // NOPMD + } + } + private InputStream getDecompressedStream(final InputStream inputStream, final String compression) throws IOException { Objects.requireNonNull(inputStream, "inputStream cannot be null"); @@ -211,6 +254,27 @@ private List> readAndDecodeLines0(final String blobName, final Stri .collect(Collectors.toList()); } + public List> downloadBlobAndDecodeFields(final String blobName, final String compression, + final int... fieldsToDecode) { + Objects.requireNonNull(blobName, "blobName cannot be null"); + Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null"); + + if (cache) { + return decodedLinesCache.computeIfAbsent(blobName, + k -> downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode)); + } else { + return downloadBlobAndDecodeFields0(blobName, compression, fieldsToDecode); + } + } + + private List> downloadBlobAndDecodeFields0(final String blobName, final String compression, + final int... fieldsToDecode) { + return downloadBlobAndReadLines(blobName, compression).stream() + .map(l -> l.split(",")) + .map(fields -> decodeRequiredFields(fields, fieldsToDecode)) + .collect(Collectors.toList()); + } + private List decodeRequiredFields(final String[] originalFields, final int[] fieldsToDecode) { Objects.requireNonNull(originalFields, "originalFields cannot be null"); Objects.requireNonNull(fieldsToDecode, "fieldsToDecode cannot be null");