Skip to content

Commit

Permalink
GH-41947: [Java] Support catalog in JDBC driver with session options (#…
Browse files Browse the repository at this point in the history
…42035)

### Rationale for this change

See Issue #41947

### What changes are included in this PR?

### Are these changes tested?

Yes

### Are there any user-facing changes?

Introductiona of an optional catalog query parameter in the JDBC url string.

* GitHub Issue: #41947

Authored-by: Steve Lord <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
stevelorddremio authored Jun 13, 2024
1 parent 7d84c1e commit cecd771
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 6 deletions.
7 changes: 7 additions & 0 deletions java/flight/flight-sql-jdbc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ under the License.
<artifactId>bcpkix-jdk18on</artifactId>
<version>1.78.1</version>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private static ArrowFlightSqlClientHandler createNewClientHandler(
.withCallOptions(config.toCallOption())
.withRetainCookies(config.retainCookies())
.withRetainAuth(config.retainAuth())
.withCatalog(config.getCatalog())
.build();
} catch (final SQLException e) {
try {
Expand Down Expand Up @@ -171,6 +172,7 @@ public Properties getClientInfo() {

@Override
public void close() throws SQLException {
clientHandler.close();
if (executorService != null) {
executorService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.client;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
Expand All @@ -25,9 +26,14 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.CloseSessionRequest;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightClientMiddleware;
import org.apache.arrow.flight.FlightEndpoint;
Expand All @@ -36,6 +42,10 @@
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.LocationSchemes;
import org.apache.arrow.flight.SessionOptionValue;
import org.apache.arrow.flight.SessionOptionValueFactory;
import org.apache.arrow.flight.SetSessionOptionsRequest;
import org.apache.arrow.flight.SetSessionOptionsResult;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
Expand All @@ -57,19 +67,24 @@
/** A {@link FlightSqlClient} handler. */
public final class ArrowFlightSqlClientHandler implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
// JDBC connection string query parameter
private static final String CATALOG = "catalog";

private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();
private final Builder builder;
private final Optional<String> catalog;

ArrowFlightSqlClientHandler(
final FlightSqlClient sqlClient,
final Builder builder,
final Collection<CallOption> credentialOptions) {
final Collection<CallOption> credentialOptions,
final Optional<String> catalog) {
this.options.addAll(builder.options);
this.options.addAll(credentialOptions);
this.sqlClient = Preconditions.checkNotNull(sqlClient);
this.builder = builder;
this.catalog = catalog;
}

/**
Expand All @@ -80,9 +95,15 @@ public final class ArrowFlightSqlClientHandler implements AutoCloseable {
* @param options the {@link CallOption}s to persist in between subsequent client calls.
* @return a new {@link ArrowFlightSqlClientHandler}.
*/
public static ArrowFlightSqlClientHandler createNewHandler(
final FlightClient client, final Builder builder, final Collection<CallOption> options) {
return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, options);
static ArrowFlightSqlClientHandler createNewHandler(
final FlightClient client,
final Builder builder,
final Collection<CallOption> options,
final Optional<String> catalog) {
final ArrowFlightSqlClientHandler handler =
new ArrowFlightSqlClientHandler(new FlightSqlClient(client), builder, options, catalog);
handler.setSetCatalogInSessionIfPresent();
return handler;
}

/**
Expand Down Expand Up @@ -199,6 +220,9 @@ public FlightInfo getInfo(final String query) {

@Override
public void close() throws SQLException {
if (catalog.isPresent()) {
sqlClient.closeSession(new CloseSessionRequest(), getOptions());
}
try {
AutoCloseables.close(sqlClient);
} catch (final Exception e) {
Expand Down Expand Up @@ -250,6 +274,31 @@ public interface PreparedStatement extends AutoCloseable {
void close();
}

/** A connection is created with catalog set as a session option. */
private void setSetCatalogInSessionIfPresent() {
if (catalog.isPresent()) {
final SetSessionOptionsRequest setSessionOptionRequest =
new SetSessionOptionsRequest(
ImmutableMap.<String, SessionOptionValue>builder()
.put(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
.build());
final SetSessionOptionsResult result =
sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());

if (result.hasErrors()) {
Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
for (Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
LOGGER.warn(error.toString());
}
throw CallStatus.INVALID_ARGUMENT
.withDescription(
String.format(
"Cannot set session option for catalog = %s. Check log for details.", catalog))
.toRuntimeException();
}
}
}

/**
* Creates a new {@link PreparedStatement} for the given {@code query}.
*
Expand Down Expand Up @@ -492,6 +541,8 @@ public static final class Builder {

@VisibleForTesting boolean retainAuth = true;

@VisibleForTesting Optional<String> catalog = Optional.empty();

// These two middleware are for internal use within build() and should not be exposed by builder
// APIs.
// Note that these middleware may not necessarily be registered.
Expand Down Expand Up @@ -527,6 +578,7 @@ public Builder() {}
this.clientCertificatePath = original.clientCertificatePath;
this.clientKeyPath = original.clientKeyPath;
this.allocator = original.allocator;
this.catalog = original.catalog;

if (original.retainCookies) {
this.cookieFactory = original.cookieFactory;
Expand Down Expand Up @@ -762,6 +814,17 @@ public Builder withCallOptions(final Collection<CallOption> options) {
return this;
}

/**
* Sets the catalog for this handler if it is not null.
*
* @param catalog the catalog
* @return this instance.
*/
public Builder withCatalog(@Nullable final String catalog) {
this.catalog = Optional.ofNullable(catalog);
return this;
}

/**
* Builds a new {@link ArrowFlightSqlClientHandler} from the provided fields.
*
Expand Down Expand Up @@ -841,7 +904,8 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
new CredentialCallOption(new BearerCredentialWriter(token)),
options.toArray(new CallOption[0])));
}
return ArrowFlightSqlClientHandler.createNewHandler(client, this, credentialOptions);
return ArrowFlightSqlClientHandler.createNewHandler(
client, this, credentialOptions, catalog);

} catch (final IllegalArgumentException
| GeneralSecurityException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ public boolean retainAuth() {
return ArrowFlightConnectionProperty.RETAIN_AUTH.getBoolean(properties);
}

/**
* The catalog to which a connection is made.
*
* @return the catalog.
*/
public String getCatalog() {
return ArrowFlightConnectionProperty.CATALOG.getString(properties);
}

/**
* Gets the {@link CallOption}s from this {@link ConnectionConfig}.
*
Expand Down Expand Up @@ -203,7 +212,8 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty {
THREAD_POOL_SIZE("threadPoolSize", 1, Type.NUMBER, false),
TOKEN("token", null, Type.STRING, false),
RETAIN_COOKIES("retainCookies", true, Type.BOOLEAN, false),
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false);
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
CATALOG("catalog", null, Type.STRING, false);

private final String camelName;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.arrow.driver.jdbc.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.util.Optional;
import org.apache.arrow.driver.jdbc.FlightServerTestRule;
import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -142,5 +144,30 @@ public void testDefaults() {
assertNull(builder.tlsRootCertificatesPath);
assertNull(builder.clientCertificatePath);
assertNull(builder.clientKeyPath);
assertEquals(Optional.empty(), builder.catalog);
}

@Test
public void testCatalog() {
ArrowFlightSqlClientHandler.Builder rootBuilder = new ArrowFlightSqlClientHandler.Builder();

rootBuilder.withCatalog(null);
assertFalse(rootBuilder.catalog.isPresent());

rootBuilder.withCatalog("");
assertTrue(rootBuilder.catalog.isPresent());

rootBuilder.withCatalog(" ");
assertTrue(rootBuilder.catalog.isPresent());

final String noSpaces = "noSpaces";
rootBuilder.withCatalog(noSpaces);
assertTrue(rootBuilder.catalog.isPresent());
assertEquals(noSpaces, rootBuilder.catalog.get());

final String nameWithSpaces = " spaces ";
rootBuilder.withCatalog(nameWithSpaces);
assertTrue(rootBuilder.catalog.isPresent());
assertEquals(nameWithSpaces, rootBuilder.catalog.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static java.lang.Runtime.getRuntime;
import static java.util.Arrays.asList;
import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.CATALOG;
import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST;
import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PASSWORD;
import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT;
Expand Down Expand Up @@ -107,6 +108,12 @@ public static List<Object[]> provideParameters() {
(Function<ArrowFlightConnectionConfigImpl, ?>)
ArrowFlightConnectionConfigImpl::threadPoolSize
},
{
CATALOG,
"catalog",
(Function<ArrowFlightConnectionConfigImpl, ?>)
ArrowFlightConnectionConfigImpl::getCatalog
},
});
}
}

0 comments on commit cecd771

Please sign in to comment.