Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c64d234
FLINK-38350: Upgrading Hadoop to 3.4.2
pra91 Sep 12, 2025
a9aede1
FLINK-38350: Upgrading Hadoop to 3.4.2
pra91 Sep 12, 2025
e7a6676
FLINK-38350: Enhance HadoopS3AccessHelper with proper exception handl…
Sep 16, 2025
be809ff
Enhance HadoopS3AccessHelper with comprehensive property copying from…
Sep 17, 2025
9a417cd
Fix S3 multipart upload NoSuchUploadException with client consistency
Sep 17, 2025
64509c0
Update tests for S3 callback implementation fixes
Sep 17, 2025
ca119a5
Fix S5cmd integration test and checkpoint race condition
Sep 17, 2025
2429598
Fix S3 client consistency issue causing NoSuchUploadException
Sep 17, 2025
6c4b459
feat: Comprehensive S3 client improvements for Hadoop 3.4.2 compatibi…
Sep 18, 2025
c40c91c
feat: Modernize S3 Hadoop filesystem architecture with comprehensive …
Sep 18, 2025
152f32f
refactor: Simplify S3 client architecture to single cached client
Sep 18, 2025
441b0b5
fix: Restore accidentally deleted HadoopS3FileSystemTest.java
Sep 18, 2025
93007fa
fix: Add Java 8 compatibility for release-1.20 branch
Sep 18, 2025
5726fd4
fix: Correct SSL configuration in S3ConfigurationBuilder
Sep 18, 2025
df8f094
refactor: Simplify S3 architecture by removing complex components
Sep 18, 2025
b7f364e
Fix unit test: Remove metrics expectations from HadoopS3AccessHelperTest
Sep 18, 2025
6f770ae
CRITICAL FIX: Eliminate S3 client resource leaks causing E2E test ins…
Sep 18, 2025
e08c993
Fix S3 client null config issue with lazy initialization
Sep 19, 2025
1b9f9bf
CRITICAL FIX: Implement shared S3 client with reference counting to p…
Sep 19, 2025
e5b85d0
CRITICAL: Apply E2E SSL networking fix to main development branch
Sep 19, 2025
58d5f7a
RADICAL FIX: Use S3AFileSystem's internal S3 client via reflection to…
Sep 22, 2025
7b8fd68
TEST: Remove S3ClientConfigurationFactory entirely to isolate SSL issue
Sep 22, 2025
27a319b
CRITICAL: Fix SSL/networking errors by excluding unshaded Netty from …
Sep 22, 2025
a48ff27
cleanup: Remove debug output from reflection approach
Sep 22, 2025
0e54283
CRITICAL: Remove shutdown hook causing SSL handler interference
Sep 22, 2025
e025897
TEST: Revert filesystem Hadoop version to 3.3.4 to isolate SSL issue
Sep 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions flink-end-to-end-tests/test-scripts/.minio.sys/format.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"1","format":"xl-single","id":"4d697b34-d749-4917-9a08-5796e6a79518","xl":{"version":"3","this":"912d2300-8fca-4ef9-9d89-0d43ff4e722e","sets":[["912d2300-8fca-4ef9-9d89-0d43ff4e722e"]],"distributionAlgo":"SIPMOD+PARITY"}}
Binary file not shown.
5 changes: 5 additions & 0 deletions flink-filesystems/flink-s3-fs-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<!-- Exclude unshaded Netty to prevent conflicts with flink-shaded-netty -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
5 changes: 5 additions & 0 deletions flink-filesystems/flink-s3-fs-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<!-- Exclude unshaded Netty to prevent conflicts with flink-shaded-netty -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3hadoop;

import org.apache.flink.annotation.Internal;

import javax.annotation.Nullable;

import java.net.URI;
import java.time.Duration;
import java.util.Objects;

/**
* Immutable configuration object for S3 client settings. This centralizes all S3 configuration and
* provides a clean, type-safe API for accessing configuration values.
*/
@Internal
public final class S3Configuration {

// Connection settings
private final Duration connectionTimeout;
private final Duration socketTimeout;
private final int maxConnections;

// Retry settings
private final int maxRetries;
private final Duration retryInterval;

// Timeouts
private final Duration apiCallTimeout;
private final Duration apiCallAttemptTimeout;

// Credentials
private final String accessKey;
private final String secretKey;
private final String sessionToken;

// Regional settings
private final String region;
private final URI endpoint;

// Service settings
private final boolean pathStyleAccess;
private final boolean checksumValidation;

// SSL settings
private final boolean sslEnabled;
private final boolean verifySslCertificates;
private final String trustStorePath;
private final String trustStorePassword;

// Buffer settings
private final int bufferSize;

S3Configuration(S3ConfigurationBuilder builder) {
this.connectionTimeout = builder.getConnectionTimeout();
this.socketTimeout = builder.getSocketTimeout();
this.maxConnections = builder.getMaxConnections();
this.maxRetries = builder.getMaxRetries();
this.retryInterval = builder.getRetryInterval();
this.apiCallTimeout = builder.getApiCallTimeout();
this.apiCallAttemptTimeout = builder.getApiCallAttemptTimeout();
this.accessKey = builder.getAccessKey();
this.secretKey = builder.getSecretKey();
this.sessionToken = builder.getSessionToken();
this.region = builder.getRegion();
this.endpoint = builder.getEndpoint();
this.pathStyleAccess = builder.isPathStyleAccess();
this.checksumValidation = builder.isChecksumValidation();
this.sslEnabled = builder.isSslEnabled();
this.verifySslCertificates = builder.isVerifySslCertificates();
this.trustStorePath = builder.getTrustStorePath();
this.trustStorePassword = builder.getTrustStorePassword();
this.bufferSize = builder.getBufferSize();
}

// Getters
public Duration getConnectionTimeout() {
return connectionTimeout;
}

public Duration getSocketTimeout() {
return socketTimeout;
}

public int getMaxConnections() {
return maxConnections;
}

public int getMaxRetries() {
return maxRetries;
}

public Duration getRetryInterval() {
return retryInterval;
}

public Duration getApiCallTimeout() {
return apiCallTimeout;
}

public Duration getApiCallAttemptTimeout() {
return apiCallAttemptTimeout;
}

@Nullable
public String getAccessKey() {
return accessKey;
}

@Nullable
public String getSecretKey() {
return secretKey;
}

@Nullable
public String getSessionToken() {
return sessionToken;
}

public String getRegion() {
return region;
}

@Nullable
public URI getEndpoint() {
return endpoint;
}

public boolean isPathStyleAccess() {
return pathStyleAccess;
}

public boolean isChecksumValidation() {
return checksumValidation;
}

public boolean isSslEnabled() {
return sslEnabled;
}

public boolean isVerifySslCertificates() {
return verifySslCertificates;
}

@Nullable
public String getTrustStorePath() {
return trustStorePath;
}

@Nullable
public String getTrustStorePassword() {
return trustStorePassword;
}

public int getBufferSize() {
return bufferSize;
}

/**
* Generates a configuration hash for caching purposes. This should be consistent across
* different instances with the same configuration.
*/
public String getConfigurationHash() {
return Integer.toHexString(
Objects.hash(
connectionTimeout,
socketTimeout,
maxConnections,
maxRetries,
retryInterval,
apiCallTimeout,
apiCallAttemptTimeout,
accessKey,
secretKey,
sessionToken,
region,
endpoint,
pathStyleAccess,
checksumValidation,
sslEnabled,
verifySslCertificates,
trustStorePath,
trustStorePassword,
bufferSize));
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
S3Configuration that = (S3Configuration) obj;
return maxConnections == that.maxConnections
&& maxRetries == that.maxRetries
&& pathStyleAccess == that.pathStyleAccess
&& checksumValidation == that.checksumValidation
&& sslEnabled == that.sslEnabled
&& verifySslCertificates == that.verifySslCertificates
&& bufferSize == that.bufferSize
&& Objects.equals(connectionTimeout, that.connectionTimeout)
&& Objects.equals(socketTimeout, that.socketTimeout)
&& Objects.equals(retryInterval, that.retryInterval)
&& Objects.equals(apiCallTimeout, that.apiCallTimeout)
&& Objects.equals(apiCallAttemptTimeout, that.apiCallAttemptTimeout)
&& Objects.equals(accessKey, that.accessKey)
&& Objects.equals(secretKey, that.secretKey)
&& Objects.equals(sessionToken, that.sessionToken)
&& Objects.equals(region, that.region)
&& Objects.equals(endpoint, that.endpoint)
&& Objects.equals(trustStorePath, that.trustStorePath)
&& Objects.equals(trustStorePassword, that.trustStorePassword);
}

@Override
public int hashCode() {
return Objects.hash(
connectionTimeout,
socketTimeout,
maxConnections,
maxRetries,
retryInterval,
apiCallTimeout,
apiCallAttemptTimeout,
accessKey,
secretKey,
sessionToken,
region,
endpoint,
pathStyleAccess,
checksumValidation,
sslEnabled,
verifySslCertificates,
trustStorePath,
trustStorePassword,
bufferSize);
}

@Override
public String toString() {
return "S3Configuration{"
+ "connectionTimeout="
+ connectionTimeout
+ ", socketTimeout="
+ socketTimeout
+ ", maxConnections="
+ maxConnections
+ ", maxRetries="
+ maxRetries
+ ", retryInterval="
+ retryInterval
+ ", region='"
+ region
+ '\''
+ ", endpoint="
+ endpoint
+ ", pathStyleAccess="
+ pathStyleAccess
+ ", checksumValidation="
+ checksumValidation
+ ", sslEnabled="
+ sslEnabled
+ ", bufferSize="
+ bufferSize
+
// Note: Don't include credentials in toString for security
'}';
}
}
Loading
Loading