Skip to content

Commit

Permalink
Add Looker SDK and streaming support (#2)
Browse files Browse the repository at this point in the history
* Initial Looker SDK branch commit

* Streaming LookerIterator

* True streaming support

* Make a specific Driver class and 'jdbc:looker:' protocol support

* Add read timeout for stream

* SQL Interface API endpoints

* Use BlockingQueue of FrameEnvelopes rather than IO pipes

* Fix illegal cast for queries with a single column

* Stub driver for LookerRemoteMetaTest

* Address feedback

* Handle null values in stream

* Rename driver to LookerDriver

* Better error handling from API response

* Support userAgent and make LookerResponseParser public for testing
  • Loading branch information
tjbanghart authored and wnob committed Feb 15, 2024
1 parent 00552ee commit f989f12
Show file tree
Hide file tree
Showing 18 changed files with 2,176 additions and 4 deletions.
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ allprojects {
plugins.withId("java-library") {
dependencies {
"implementation"(platform(project(":bom")))
// Add the locally bundled LookerSDK fat jar
"implementation"(files("../libs/looker-kotlin-sdk-a48011f.jar"))
}
}
if (!skipAutostyle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.calcite.avatica.remote.Service.ErrorResponse;
import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.avatica.remote.looker.LookerRemoteMeta;
import org.apache.calcite.avatica.util.ArrayFactoryImpl;

import java.sql.Array;
Expand Down Expand Up @@ -691,9 +692,15 @@ protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState
// These are all the metadata operations, no updates
ResultSet resultSet = executeQueryInternal(statement, metaResultSet.signature.sanitize(),
metaResultSet.firstFrame, state, false);
if (metaResultSet.ownStatement) {

// TODO (b/300522680): All metadata responses from the Calcite adapter are being treated as
// "ownStatement" since there is no Avatica server to handle the result set response. We get
// the raw response from CalciteMetaImpl which correctly sets `closeOnCompletion`. We don't
// want that behavior here since metadata statements should remain open for multiple requests.
if (metaResultSet.ownStatement && !(meta instanceof LookerRemoteMeta)) {
resultSet.getStatement().closeOnCompletion();
}

return resultSet;
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/calcite/avatica/Helper.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public SQLException createException(String message, String sql, Exception e) {
return new AvaticaSqlException(message, rte.getSqlState(), rte.getErrorCode(),
rte.getServerExceptions(), serverAddress);
}
return new SQLException(message, e);
// TODO b/300529001: Tableau only surfaces the message to users but the important details
// are often in the main exception so present that as well.
return new SQLException(message + ": " + e.getMessage(), e);
}

public SQLException createException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
* Implementation of {@link org.apache.calcite.avatica.Meta} for the remote
* driver.
*/
class RemoteMeta extends MetaImpl {
public class RemoteMeta extends MetaImpl {
final Service service;
final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>();
private Map<DatabaseProperty, Object> databaseProperties;

RemoteMeta(AvaticaConnection connection, Service service) {
protected RemoteMeta(AvaticaConnection connection, Service service) {
super(connection);
this.service = service;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote.looker;

import org.apache.calcite.avatica.ConnectionConfigImpl.PropEnv;
import org.apache.calcite.avatica.ConnectionProperty;

import java.util.Properties;

/** TODO: implement Looker specific connection properties */
public class LookerConnectionProperty implements ConnectionProperty {

@Override
public String name() {
return null;
}

@Override
public String camelName() {
return null;
}

@Override
public Object defaultValue() {
return null;
}

@Override
public Type type() {
return null;
}

@Override
public PropEnv wrap(Properties properties) {
return null;
}

@Override
public boolean required() {
return false;
}

@Override
public Class valueClass() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote.looker;

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.avatica.DriverVersion;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.avatica.remote.Service;

import com.looker.sdk.LookerSDK;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/**
* JDBC Driver for Looker's SQL Interface. Communicates with a Looker instance via
* {@link LookerSDK}. Backed by Looker-specific {@link LookerRemoteMeta} and
* {@link LookerRemoteService}.
*
* Use 'jdbc:looker' as the protocol to select this over the default remote Avatica driver.
*/
public class LookerDriver extends UnregisteredDriver {

static {
new LookerDriver().register();
}

public LookerDriver() {
super();
}

public static final String CONNECT_STRING_PREFIX = "jdbc:looker:";

@Override
protected DriverVersion createDriverVersion() {
return DriverVersion.load(
org.apache.calcite.avatica.remote.Driver.class,
"org-apache-calcite-jdbc.properties",
"Looker JDBC Driver",
"unknown version",
"Looker",
"unknown version");
}

@Override
protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}

@Override
public Meta createMeta(AvaticaConnection connection) {
final Service service = new LookerRemoteService();
connection.setService(service);
return new LookerRemoteMeta(connection, service);
}

@Override public Connection connect(String url, Properties info)
throws SQLException {
AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);

if (conn == null) {
// the URL did not match Looker's JDBC connection string prefix
return null;
}

// the `looker` driver should always have a matching Service
Service service = conn.getService();
assert service instanceof LookerRemoteService;

// puts all additional url params into properties
Properties properties = ConnectStringParser.parse(url, info);

// create and set LookerSDK for the connection
LookerSDK sdk = LookerSdkFactory.createSdk(conn.config().url(), properties);
((LookerRemoteService) service).setSdk(sdk);
return conn;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote.looker;

import org.apache.calcite.avatica.Meta.Frame;

/**
* Wrapper for either a {@link Frame} or {@link Exception}. Allows for
* {@link LookerRemoteMeta#stmtQueueMap} to hold complete Frames and present exceptions when
* consumers are ready to encounter them.
*/
public class LookerFrameEnvelope {

private final Frame frame;
private final Exception exception;

private LookerFrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) {
this.frame = frame;
this.exception = exception;
}

public Frame getFrame() {
return this.frame;
}

public Exception getException() {
return this.exception;
}

/**
* Constructs a LookerFrameEnvelope with a {@link Frame}.
*/
public static LookerFrameEnvelope ok(long offset, boolean done, Iterable<Object> rows) {
Frame frame = new Frame(offset, done, rows);
return new LookerFrameEnvelope(frame, null);
}

/**
* Constructs a LookerFrameEnvelope to hold an exception
*/
public static LookerFrameEnvelope error(Exception e) {
return new LookerFrameEnvelope(null, e);
}

/**
* Whether this LookerFrameEnvelope holds an exception. If true, the envelope holds no {@link Frame}.
*/
public boolean hasException() {
return this.exception != null;
}
}
Loading

0 comments on commit f989f12

Please sign in to comment.