Skip to content

Commit

Permalink
Support gcs-connector 3.x in GcsUtil (#33368)
Browse files Browse the repository at this point in the history
* Parameterize GoogleCloudStorage provider in GcsUtil to unblock gcs-connector 3.x

* Use Reflection to attempt 3.x Builder, and fall back to 2.x Constructor

* Attempt constructing 3.x-style GCS via reflection; fall back to 2.x constructor

* Update CHANGES.md

* Add TODO note on reflection block

* Try non-reflected construction first

* Fix SpotBugs error
  • Loading branch information
clairemcginty authored Jan 14, 2025
1 parent 9268f69 commit ff8b6a1
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

## I/Os

* Support gcs-connector 3.x+ in GcsUtil ([#33368](https://github.com/apache/beam/pull/33368))
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
Expand All @@ -52,6 +53,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
Expand Down Expand Up @@ -739,7 +741,47 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO

GoogleCloudStorage createGoogleCloudStorage(
GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
return new GoogleCloudStorageImpl(options, storage, credentials);
try {
return new GoogleCloudStorageImpl(options, storage, credentials);
} catch (NoSuchMethodError e) {
// gcs-connector 3.x drops the direct constructor and exclusively uses Builder
// TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x
try {
final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder");
Object builder = builderMethod.invoke(null);
final Class<?> builderClass =
Class.forName(
"com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder");

final Method setOptionsMethod =
builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class);
setOptionsMethod.setAccessible(true);
builder = setOptionsMethod.invoke(builder, options);

final Method setHttpTransportMethod =
builderClass.getMethod("setHttpTransport", HttpTransport.class);
setHttpTransportMethod.setAccessible(true);
builder =
setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport());

final Method setCredentialsMethod =
builderClass.getMethod("setCredentials", Credentials.class);
setCredentialsMethod.setAccessible(true);
builder = setCredentialsMethod.invoke(builder, credentials);

final Method setHttpRequestInitializerMethod =
builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class);
setHttpRequestInitializerMethod.setAccessible(true);
builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer);

final Method buildMethod = builderClass.getMethod("build");
buildMethod.setAccessible(true);
return (GoogleCloudStorage) buildMethod.invoke(builder);
} catch (Exception reflectionError) {
throw new RuntimeException(
"Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError);
}
}
}

/**
Expand Down

0 comments on commit ff8b6a1

Please sign in to comment.