Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-#1862][s3] Added support for reading compressed format files , writing multiple objects etc. #1863

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chunjun-connectors/chunjun-connector-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,13 @@ public class S3Config extends CommonConfig implements Serializable {
* Default is false.
*/
private boolean safetySwitch = false;

/** 压缩方式 */
private String compress;

/** 是否写一个还是多个对象 */
private boolean writeSingleObject = true;

/** 生成的文件名后缀 */
private String suffix;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.chunjun.connector.s3.enums;

public enum CompressType {
GZIP,
BZIP2,
ZIP;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.s3.sink;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.SpeedConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter;
Expand Down Expand Up @@ -66,11 +67,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
s3Config.setColumn(columnList);
S3OutputFormatBuilder builder = new S3OutputFormatBuilder(new S3OutputFormat());
builder.setSpeedConf(new SpeedConfig());
builder.setS3Conf(s3Config);
builder.setRowConverter(
new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config));

return SinkFunctionProvider.of(new DtOutputFormatSinkFunction<>(builder.finish()), 1);
int sinkParallelism = s3Config.getParallelism() == null ? 1 : s3Config.getParallelism();
return SinkFunctionProvider.of(
new DtOutputFormatSinkFunction<>(builder.finish()), sinkParallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.connector.s3.util.WriterUtil;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -32,7 +33,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import com.esotericsoftware.minlog.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -54,10 +54,10 @@ public class S3OutputFormat extends BaseRichOutputFormat {
private S3Config s3Config;

/** Must start at 1 and cannot be greater than 10,000 */
private static int currentPartNumber;
private int currentPartNumber;

private static String currentUploadId;
private static boolean willClose = false;
private String currentUploadId;
private boolean willClose = false;
private transient StringWriter sw;
private transient List<MyPartETag> myPartETags;

Expand All @@ -66,22 +66,50 @@ public class S3OutputFormat extends BaseRichOutputFormat {

private static final long MIN_SIZE = 1024 * 1024 * 25L;

@Override
public void initializeGlobal(int parallelism) {
this.amazonS3 = S3Util.getS3Client(s3Config);
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
checkOutputDir();
}
}

@Override
protected void openInternal(int taskNumber, int numTasks) {
// 写多个对象时
if (!s3Config.isWriteSingleObject()) {
s3Config.setObject(
s3Config.getObject()
+ ConstantValue.SINGLE_SLASH_SYMBOL
+ jobId
+ "_"
+ taskNumber
+ getExtension());
} else {
// 写单个对象时
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
// 当写入模式是overwrite时
s3Config.setObject(s3Config.getObject() + getExtension());
} else {
// 当写入模式是append时
s3Config.setObject(
s3Config.getObject() + "_" + jobId + "_" + taskNumber + getExtension());
}
}
log.info("current write object name: {}", s3Config.getObject());
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
openSource();
restore();
checkOutputDir();
createActionFinishedTag();
nextBlock();
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
}

private void openSource() {
this.amazonS3 = S3Util.getS3Client(s3Config);
this.myPartETags = new ArrayList<>();
this.currentPartNumber = taskNumber - numTasks + 1;
this.currentPartNumber = 0;
beforeWriteRecords();
}

Expand All @@ -95,17 +123,50 @@ private void restore() {
}

private void checkOutputDir() {
if (S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
// 覆盖写单个对象时
if (s3Config.isWriteSingleObject()
&& S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
}
// 覆盖写多个对象时
if (!s3Config.isWriteSingleObject()) {
List<String> subObjects;
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
} else {
subObjects =
S3Util.listObjectsByv1(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
}
String[] keys = subObjects.toArray(new String[] {});
S3Util.deleteObjects(amazonS3, s3Config.getBucket(), keys);
log.info("delete objects num:" + keys.length);
log.debug("delete objects list:" + StringUtils.join(keys, ","));
}
}

public String getExtension() {
if (StringUtils.isNotBlank(s3Config.getSuffix())) {
return s3Config.getSuffix();
} else {
return "";
}
}

private void nextBlock() {
sw = new StringWriter();
if (sw == null) {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
this.currentPartNumber = this.currentPartNumber + numTasks;
this.currentPartNumber = this.currentPartNumber + 1;
}

/** Create file multipart upload ID */
Expand Down Expand Up @@ -159,6 +220,7 @@ protected void flushDataInternal() {
myPartETags.stream().map(Objects::toString).collect(Collectors.joining(",")));
writerUtil.close();
writerUtil = null;
sw = null;
}
}

Expand Down Expand Up @@ -192,7 +254,7 @@ public void closeInternal() {
flushDataInternal();
completeMultipartUploadFile();
S3Util.closeS3(amazonS3);
Log.info("S3Client close!");
log.info("S3Client close!");
}

@Override
Expand All @@ -218,7 +280,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
}
String[] stringRecord = new String[columnNameList.size()];
// convert row to string
rowConverter.toExternal(rowData, stringRecord);
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
for (int i = 0; i < columnNameList.size(); ++i) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.s3.source;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter;
Expand Down Expand Up @@ -70,6 +71,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}
s3Config.setColumn(columnList);
S3InputFormatBuilder builder = new S3InputFormatBuilder(new S3InputFormat());
builder.setRestoreConf(new RestoreConfig());
builder.setRowConverter(
new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config));
builder.setS3Conf(s3Config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
Expand All @@ -40,6 +41,7 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -50,6 +52,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

@Slf4j
public class S3InputFormat extends BaseRichInputFormat {
Expand Down Expand Up @@ -195,9 +198,15 @@ public boolean reachedEndWithoutCheckState() throws IOException {
// the file has not been read
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
InputStream inputStream = s3is;
if (StringUtils.isNotEmpty(s3Config.getCompress())) {
if (CompressType.GZIP.name().equalsIgnoreCase(s3Config.getCompress())) {
inputStream = new GZIPInputStream(s3is);
}
}
readerUtil =
new ReaderUtil(
new InputStreamReader(s3is, s3Config.getEncoding()),
new InputStreamReader(inputStream, s3Config.getEncoding()),
s3Config.getFieldDelimiter(),
0L,
s3Config.isSafetySwitch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink;
import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource;
import com.dtstack.chunjun.connector.s3.table.options.S3Options;
import com.dtstack.chunjun.table.options.SinkOptions;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -58,7 +59,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
s3Config.setEncoding(options.get(S3Options.ENCODING));
s3Config.setRegion(options.get(S3Options.REGION));
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));

s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config);
}

Expand All @@ -85,6 +87,13 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(S3Options.IS_FIRST_LINE_HEADER);
options.add(S3Options.OBJECTS);
options.add(S3Options.OBJECT);
options.add(S3Options.ENDPOINT);
options.add(S3Options.COMPRESS);
options.add(S3Options.WRITE_SINGLE_OBJECT);
options.add(S3Options.USE_V2);
options.add(S3Options.SUFFIX);
options.add(SinkOptions.SINK_PARALLELISM);
options.add(S3Options.WRITE_MODE);
return options;
}

Expand All @@ -105,6 +114,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
s3Config.setEncoding(options.get(S3Options.ENCODING));
s3Config.setRegion(options.get(S3Options.REGION));
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
s3Config.setWriteSingleObject(options.get(S3Options.WRITE_SINGLE_OBJECT));
s3Config.setUseV2(options.get(S3Options.USE_V2));
s3Config.setSuffix(options.get(S3Options.SUFFIX));
s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM));
s3Config.setWriteMode(options.get(S3Options.WRITE_MODE));

return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,31 @@ public class S3Options {
.defaultValue(false)
.withDescription(
"whether the first line is a header line, if so, the first line is not read");

public static final ConfigOption<String> ENDPOINT =
key("endpoint").stringType().noDefaultValue().withDescription("endpoint");

public static final ConfigOption<String> COMPRESS =
key("compress")
.stringType()
.noDefaultValue()
.withDescription("s3 file compression type");

public static final ConfigOption<Boolean> WRITE_SINGLE_OBJECT =
key("writeSingleObject")
.booleanType()
.defaultValue(true)
.withDescription("whether to write a single or multiple objects");

public static final ConfigOption<Boolean> USE_V2 =
key("useV2")
.booleanType()
.defaultValue(true)
.withDescription("Get the api version of the number of files in a directory");

public static final ConfigOption<String> SUFFIX =
key("suffix").stringType().noDefaultValue().withDescription("s3 file suffix");

public static final ConfigOption<String> WRITE_MODE =
key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
Expand Down Expand Up @@ -196,6 +197,13 @@ public static void deleteObject(AmazonS3 s3Client, String bucketName, String obj
s3Client.deleteObject(bucketName, object);
}

public static void deleteObjects(AmazonS3 s3Client, String bucketName, String[] keys) {
if (keys.length > 0) {
DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
s3Client.deleteObjects(request);
}
}

public static void closeS3(AmazonS3 amazonS3) {
if (amazonS3 != null) {
amazonS3.shutdown();
Expand Down
Loading
Loading