Skip to content

Commit

Permalink
[improve] update S3File connector config option (#8615)
Browse files Browse the repository at this point in the history
Co-authored-by: litiliu <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
  • Loading branch information
3 people authored Feb 8, 2025
1 parent 087087e commit 80cc9fa
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

import java.util.Arrays;
import java.util.Map;

import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class S3ConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> S3_ACCESS_KEY =
Options.key("access_key")
Expand All @@ -55,22 +48,6 @@ public class S3ConfigOptions extends BaseSourceConfigOptions {
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription(
"Before the synchronization task begins, process the existing path");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription(
"Before the synchronization task begins, different processing of data files that already exist in the directory");

/**
* The current key for that config option. if you need to add a new option, you can add it here
* and refer to this:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;

import java.util.Arrays;

import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class S3FileSinkOptions extends S3ConfigOptions {

public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription(
"Before the synchronization task begins, process the existing path");

public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.singleChoice(
DataSaveMode.class,
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
.defaultValue(APPEND_DATA)
.withDescription(
"Before the synchronization task begins, different processing of data files that already exist in the directory");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.config;

public class S3FileSourceOptions extends S3ConfigOptions {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

Expand Down Expand Up @@ -62,8 +62,8 @@ public S3FileSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
S3ConfigOptions.FILE_PATH.key(),
S3ConfigOptions.S3_BUCKET.key());
S3FileSinkOptions.FILE_PATH.key(),
S3FileSinkOptions.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
Expand All @@ -83,8 +83,8 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.empty();
}
final Catalog catalog = catalogFactory.createCatalog(S3, readonlyConfig);
SchemaSaveMode schemaSaveMode = readonlyConfig.get(S3ConfigOptions.SCHEMA_SAVE_MODE);
DataSaveMode dataSaveMode = readonlyConfig.get(S3ConfigOptions.DATA_SAVE_MODE);
SchemaSaveMode schemaSaveMode = readonlyConfig.get(S3FileSinkOptions.SCHEMA_SAVE_MODE);
DataSaveMode dataSaveMode = readonlyConfig.get(S3FileSinkOptions.DATA_SAVE_MODE);
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, catalogTable, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSinkOptions;

import com.google.auto.service.AutoService;

Expand All @@ -42,18 +42,18 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(S3ConfigOptions.FILE_PATH)
.required(S3ConfigOptions.S3_BUCKET)
.required(S3ConfigOptions.FS_S3A_ENDPOINT)
.required(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.required(S3ConfigOptions.SCHEMA_SAVE_MODE)
.required(S3ConfigOptions.DATA_SAVE_MODE)
.required(S3FileSinkOptions.FILE_PATH)
.required(S3FileSinkOptions.S3_BUCKET)
.required(S3FileSinkOptions.FS_S3A_ENDPOINT)
.required(S3FileSinkOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.required(S3FileSinkOptions.SCHEMA_SAVE_MODE)
.required(S3FileSinkOptions.DATA_SAVE_MODE)
.conditional(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER,
S3ConfigOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
S3ConfigOptions.S3_ACCESS_KEY,
S3ConfigOptions.S3_SECRET_KEY)
.optional(S3ConfigOptions.S3_PROPERTIES)
S3FileSinkOptions.S3A_AWS_CREDENTIALS_PROVIDER,
S3FileSinkOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
S3FileSinkOptions.S3_ACCESS_KEY,
S3FileSinkOptions.S3_SECRET_KEY)
.optional(S3FileSinkOptions.S3_PROPERTIES)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSourceOptions;

import com.google.auto.service.AutoService;

Expand All @@ -51,17 +51,17 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(S3ConfigOptions.FILE_PATH)
.required(S3ConfigOptions.FILE_FORMAT_TYPE)
.required(S3ConfigOptions.S3_BUCKET)
.required(S3ConfigOptions.FS_S3A_ENDPOINT)
.required(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.required(S3FileSourceOptions.FILE_PATH)
.required(S3FileSourceOptions.FILE_FORMAT_TYPE)
.required(S3FileSourceOptions.S3_BUCKET)
.required(S3FileSourceOptions.FS_S3A_ENDPOINT)
.required(S3FileSourceOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.conditional(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER,
S3ConfigOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
S3ConfigOptions.S3_ACCESS_KEY,
S3ConfigOptions.S3_SECRET_KEY)
.optional(S3ConfigOptions.S3_PROPERTIES)
S3FileSourceOptions.S3A_AWS_CREDENTIALS_PROVIDER,
S3FileSourceOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
S3FileSourceOptions.S3_ACCESS_KEY,
S3FileSourceOptions.S3_SECRET_KEY)
.optional(S3FileSourceOptions.S3_PROPERTIES)
.conditional(
BaseSourceConfigOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down

0 comments on commit 80cc9fa

Please sign in to comment.