From aa93bd90a48bc55d54238d13ca883aa1bd0443c0 Mon Sep 17 00:00:00 2001 From: TJ Banghart Date: Fri, 8 Sep 2023 09:42:08 -0700 Subject: [PATCH] Use BlockingQueue of FrameEnvelopes rather than IO pipes --- .../calcite/avatica/AvaticaConnection.java | 4 +- .../org/apache/calcite/avatica/MetaImpl.java | 19 +- .../calcite/avatica/remote/RemoteMeta.java | 4 +- .../calcite/avatica/remote/Service.java | 5 +- .../calcite/avatica/remote/looker/Driver.java | 14 +- .../remote/looker/LookerRemoteMeta.java | 381 ++++++++++-------- .../remote/looker/LookerRemoteService.java | 7 +- .../looker/{utils => }/LookerSdkFactory.java | 24 +- .../remote/looker/utils/package-info.java | 23 -- .../avatica/remote/looker/DriverTest.java | 21 +- .../remote/looker/LookerRemoteMetaTest.java | 55 ++- 11 files changed, 296 insertions(+), 261 deletions(-) rename core/src/main/java/org/apache/calcite/avatica/remote/looker/{utils => }/LookerSdkFactory.java (85%) delete mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/package-info.java diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java index 5465983745..f740d85ffb 100644 --- a/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java +++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java @@ -698,7 +698,7 @@ protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState } /** Creates a statement wrapper around an existing handle. */ - public AvaticaStatement lookupStatement(Meta.StatementHandle h) + protected AvaticaStatement lookupStatement(Meta.StatementHandle h) throws SQLException { final AvaticaStatement statement = statementMap.get(h.id); if (statement != null) { @@ -796,7 +796,7 @@ public T invokeWithRetries(CallableWithoutException callable) { } catch (AvaticaClientRuntimeException e) { lastException = e; if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode() - && transparentReconnectEnabled) { + && transparentReconnectEnabled) { this.openConnection(); continue; } diff --git a/core/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/core/src/main/java/org/apache/calcite/avatica/MetaImpl.java index eb240ca86a..2cf389bf82 100644 --- a/core/src/main/java/org/apache/calcite/avatica/MetaImpl.java +++ b/core/src/main/java/org/apache/calcite/avatica/MetaImpl.java @@ -1527,12 +1527,12 @@ public Object next() { /** Iterable that yields an iterator over rows coming from a sequence of * {@link Meta.Frame}s. */ - public class FetchIterable implements Iterable { + private class FetchIterable implements Iterable { private final AvaticaStatement stmt; private final QueryState state; private final Frame firstFrame; - public FetchIterable(AvaticaStatement stmt, QueryState state, Frame firstFrame) { + private FetchIterable(AvaticaStatement stmt, QueryState state, Frame firstFrame) { this.stmt = stmt; this.state = state; this.firstFrame = firstFrame; @@ -1544,7 +1544,7 @@ public Iterator iterator() { } /** Iterator over rows coming from a sequence of {@link Meta.Frame}s. */ - public class FetchIterator implements Iterator { + private class FetchIterator implements Iterator { private final AvaticaStatement stmt; private final QueryState state; private final int fetchSize; @@ -1552,7 +1552,7 @@ public class FetchIterator implements Iterator { private Iterator rows; private long currentOffset = 0; - public FetchIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame) { + private FetchIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame) { this.stmt = stmt; this.state = state; int fetchRowCount; @@ -1590,14 +1590,7 @@ public Object next() { return o; } - /** A helper method to call {@link Meta#fetch}. Pulled out from {@link #moveNext()} so that - * extending classes can override the fetch implementation without calling out to a Meta. */ - protected Frame doFetch(StatementHandle h, long currentOffset, int fetchSize) - throws NoSuchStatementException, MissingResultsException { - return fetch(h, currentOffset, fetchSize); - } - - void moveNext() { + private void moveNext() { for (;;) { if (rows.hasNext()) { break; @@ -1608,7 +1601,7 @@ void moveNext() { } try { // currentOffset updated after element is read from `rows` iterator - frame = doFetch(stmt.handle, currentOffset, fetchSize); + frame = fetch(stmt.handle, currentOffset, fetchSize); } catch (NoSuchStatementException e) { resetStatement(); // re-fetch the batch where we left off diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java index bb55812507..29d283b8be 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java @@ -40,11 +40,11 @@ * driver. */ public class RemoteMeta extends MetaImpl { - public final Service service; + final Service service; final Map propsMap = new HashMap<>(); private Map databaseProperties; - public RemoteMeta(AvaticaConnection connection, Service service) { + protected RemoteMeta(AvaticaConnection connection, Service service) { super(connection); this.service = service; } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/Service.java b/core/src/main/java/org/apache/calcite/avatica/remote/Service.java index dd92a9bbc0..270a0c33c4 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/Service.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/Service.java @@ -38,7 +38,6 @@ import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; -import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.sql.SQLException; @@ -147,7 +146,7 @@ protected static int p(int result, long v) { name = "prepareAndExecuteBatch"), @JsonSubTypes.Type(value = ExecuteBatchRequest.class, name = "executeBatch") }) abstract class Request extends Base { - abstract Response accept(Service service) throws IOException; + abstract Response accept(Service service); abstract Request deserialize(Message genericMsg); abstract Message serialize(); } @@ -949,7 +948,7 @@ public PrepareAndExecuteRequest( this.maxRowsInFirstFrame = maxRowsInFirstFrame; } - @Override ExecuteResponse accept(Service service) throws IOException { + @Override ExecuteResponse accept(Service service) { return service.apply(this); } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/Driver.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/Driver.java index acdfd36b30..77ac75803b 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/Driver.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/Driver.java @@ -21,7 +21,6 @@ import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.UnregisteredDriver; import org.apache.calcite.avatica.remote.Service; -import org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory; import com.looker.sdk.LookerSDK; @@ -29,6 +28,13 @@ import java.sql.SQLException; import java.util.Properties; +/** + * JDBC Driver for Looker's SQL Interface. Communicates with a Looker instance via + * {@link LookerSDK}. Backed by Looker-specific {@link LookerRemoteMeta} and + * {@link LookerRemoteService}. + * + * Use 'jdbc:looker' as the protocol to select this over the default remote Avatica driver. + */ public class Driver extends UnregisteredDriver { static { @@ -69,12 +75,14 @@ public Meta createMeta(AvaticaConnection connection) { AvaticaConnection conn = (AvaticaConnection) super.connect(url, info); if (conn == null) { - // It's not an url for our driver + // the URL did not match Looker's JDBC connection string prefix return null; } - Service service = conn.getService(); + // the `looker` driver should always have a matching Service + Service service = conn.getService(); assert service instanceof LookerRemoteService; + // create and set LookerSDK for the connection LookerSDK sdk = LookerSdkFactory.createSdk(conn.config().url(), info); ((LookerRemoteService) service).setSdk(sdk); diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java index 3d070790c6..8b0a01c286 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java @@ -18,14 +18,17 @@ import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchStatementException; import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.remote.RemoteMeta; import org.apache.calcite.avatica.remote.Service; import org.apache.calcite.avatica.remote.Service.PrepareAndExecuteRequest; +import org.apache.calcite.avatica.remote.Service.PrepareRequest; import org.apache.calcite.avatica.remote.TypedValue; -import org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; @@ -35,38 +38,90 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.net.URL; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; -import static org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory.DEFAULT_STREAM_BUFFER_SIZE; - /** * Implementation of Meta that works in tandem with {@link LookerRemoteService} to stream results * from the Looker SDK. */ public class LookerRemoteMeta extends RemoteMeta implements Meta { - - /** Authenticated LookerSDK lazily set from LookerRemoteService. */ - LookerSDK sdk; + private final LookerRemoteService lookerService; public LookerRemoteMeta(AvaticaConnection connection, Service service) { super(connection, service); // this class _must_ be backed by a LookerRemoteService assert service.getClass() == LookerRemoteService.class; + lookerService = (LookerRemoteService) service; + } + + /** + * Constants used in JSON parsing + */ + private static final String ROWS_KEY = "rows"; + private static final String VALUE_KEY = "value"; + + /** + * Default queue size. Could probably be more or less. 10 chosen for now. + */ + private static final int DEFAULT_FRAME_QUEUE_SIZE = 10; + + /** + * Returns authenticated LookerSDK from LookerRemoteService. + */ + private LookerSDK getSdk() { + return lookerService.sdk; + } + + /** + * A single meta can have multiple running statements. This map keeps track of + * {@code FrameEnvelopes}s that belong to a running statement. See {@link #prepareStreamingThread} + * for more details. + */ + private final ConcurrentMap> stmtQueueMap = + new ConcurrentHashMap(); + + /** + * Wrapper for either a {@link Frame} or {@link Exception}. Allows for {@link #stmtQueueMap} to + * hold complete Frames and present exceptions when consumers are ready to encounter them. + */ + private class FrameEnvelope { + + final Frame frame; + final Exception exception; + + FrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) { + this.frame = frame; + this.exception = exception; + } + + boolean hasException() { + return this.exception != null; + } + } + + private FrameEnvelope makeFrame(long offset, boolean done, Iterable rows) { + Frame frame = new Frame(offset, done, rows); + return new FrameEnvelope(frame, null); + } + + private FrameEnvelope makeException(Exception e) { + return new FrameEnvelope(null, e); } /** @@ -74,16 +129,17 @@ public LookerRemoteMeta(AvaticaConnection connection, Service service) { * not advance the current token, so they can be called multiple times without changing the state * of the parser. * - * @param columnTypeRep the internal Avatica representation for this value. It is important to - * use the {@link Rep} rather than the type name since Avatica represents most datetime values - * as milliseconds since epoch via {@code long}s or {@code int}s. + * @param columnMetaData the {@link ColumnMetaData} for this value. It is important to use the + * {@link Rep} rather than the type name since Avatica represents most datetime values as + * milliseconds since epoch via {@code long}s or {@code int}s. * @param parser a JsonParser whose current token is a value from the JSON response. Callers * must ensure that the parser is ready to consume a value token. This method does not change * the state of the parser. * @return the parsed value. */ - static Object deserializeValue(Rep columnTypeRep, JsonParser parser) throws IOException { - switch (columnTypeRep) { + static Object deserializeValue(JsonParser parser, ColumnMetaData columnMetaData) + throws IOException { + switch (columnMetaData.type.rep) { case PRIMITIVE_BOOLEAN: case BOOLEAN: return parser.getBooleanValue(); @@ -110,44 +166,81 @@ static Object deserializeValue(Rep columnTypeRep, JsonParser parser) throws IOEx case NUMBER: // NUMBER is represented as BigDecimal return parser.getDecimalValue(); + // TODO: MEASURE types are appearing as Objects. This should have been solved by CALCITE-5549. + // Be sure that the signature of a prepared query matches the metadata we see from JDBC. + case OBJECT: + switch (columnMetaData.type.id) { + case Types.INTEGER: + return parser.getIntValue(); + case Types.BIGINT: + return parser.getBigIntegerValue(); + case Types.DOUBLE: + return parser.getDoubleValue(); + case Types.DECIMAL: + case Types.NUMERIC: + return parser.getDecimalValue(); + + } default: - throw new RuntimeException("Unable to parse " + columnTypeRep + " from stream!"); + throw new IOException("Unable to parse " + columnMetaData.type.rep + " from stream!"); + } + } + + @Override + public Frame fetch(final StatementHandle h, final long offset, final int fetchMaxRowCount) + throws NoSuchStatementException, MissingResultsException { + // If this statement was initiated as a LookerFrame then it will have an entry in the queue map + if (stmtQueueMap.containsKey(h)) { + try { + BlockingQueue queue = stmtQueueMap.get(h); + + // `take` blocks until there is an entry in the queue + FrameEnvelope nextEnvelope = (FrameEnvelope) queue.take(); + + // remove the statement from the map if it has an exception, or it is the last frame + if (nextEnvelope.hasException()) { + stmtQueueMap.remove(h); + throw new RuntimeException(nextEnvelope.exception); + } else if (nextEnvelope.frame.done) { + stmtQueueMap.remove(h); + } + return nextEnvelope.frame; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + // not a streaming query - default to RemoteMeta behavior + return super.fetch(h, offset, fetchMaxRowCount); } /** - * An initially empty frame specific to Looker result sets. The {@code statementSlug} is used to - * begin a streaming query. + * An initially empty frame specific to Looker result sets. {@link #sqlInterfaceQueryId} is used + * to begin a streaming query. */ static class LookerFrame extends Frame { /** * A unique ID for the current SQL statement to run. Prepared and set during - * {@link LookerRemoteService#apply(PrepareAndExecuteRequest)}. + * {@link LookerRemoteService#apply(PrepareAndExecuteRequest)} or + * {@link LookerRemoteService#apply(PrepareRequest)}. This is distinct from a statement ID. + * Multiple statements may execute the same query ID. */ - public final Long statementId; + public final Long sqlInterfaceQueryId; private LookerFrame(long offset, boolean done, Iterable rows, Long statementId) { super(offset, done, rows); - this.statementId = statementId; + this.sqlInterfaceQueryId = statementId; } /** * Creates a {@code LookerFrame} for the statement slug * - * @param statementId id for the prepared statement generated by a Looker instance. + * @param sqlInterfaceQueryId id for the prepared statement generated by a Looker instance. * @return the {@code firstFrame} for the result set. */ - public static final LookerFrame create(Long statementId) { - return new LookerFrame(0, false, Collections.emptyList(), statementId); - } - } - - LookerSDK getSdk() { - if (null == sdk) { - sdk = ((LookerRemoteService) service).sdk; + public static final LookerFrame create(Long sqlInterfaceQueryId) { + return new LookerFrame(0, false, Collections.emptyList(), sqlInterfaceQueryId); } - return sdk; } /** @@ -160,9 +253,11 @@ private void trustAllHosts(HttpsURLConnection connection) { public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { } + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { } @@ -182,199 +277,157 @@ public void checkServerTrusted(java.security.cert.X509Certificate[] certs, /** * A regrettable necessity. The Kotlin SDK relies on an outdated Ktor HTTP client based on Kotlin - * Coroutines which are a tad difficult to work with in Java. Here we make a HTTP client to handle - * the response stream ourselves which is "good enough" for now. This method should be revisited - * when the Kotlin SDK has built-in streams. + * Coroutines which are difficult to work with in Java. Here we make a HTTP client to handle the + * request and input stream ourselves. This adds complexity that would normally be handled by the + * SDK. We should revisit this once the SDK has built-in streams. * * TODO https://github.com/looker-open-source/sdk-codegen/issues/1341: * Add streaming support to the Kotlin SDK. */ - private void streamResponse(String url, OutputStream outputStream) throws IOException { - // use some SDK client helpers to tighten up this call + private InputStream makeRunQueryRequest(String url) throws IOException { AuthSession authSession = getSdk().getAuthSession(); Transport sdkTransport = authSession.getTransport(); + // makes a proper URL from the API endpoint path as the SDK would. String endpoint = sdkTransport.makeUrl(url, Collections.emptyMap(), null); URL httpsUrl = new URL(endpoint); HttpsURLConnection connection = (HttpsURLConnection) httpsUrl.openConnection(); + // WARNING: You should only set `verifySSL=false` for local/dev instances!! if (!sdkTransport.getOptions().getVerifySSL()) { trustAllHosts(connection); } + // timeout is given as seconds int timeout = sdkTransport.getOptions().getTimeout() * 1000; connection.setReadTimeout(timeout); connection.setRequestMethod("GET"); connection.setRequestProperty("Accept", "application/json"); connection.setDoOutput(true); + // Set the auth header as the SDK would connection.setRequestProperty("Authorization", "token " + authSession.getAuthToken().getAccessToken()); + int responseCode = connection.getResponseCode(); if (responseCode == 200) { - // grab the input stream and write to the output for the main thread to consume. - InputStream inputStream = connection.getInputStream(); - int bytesRead; - byte[] buffer = new byte[DEFAULT_STREAM_BUFFER_SIZE]; - while ((bytesRead = inputStream.read(buffer)) != -1) { - outputStream.write(buffer, 0, bytesRead); - } - outputStream.close(); - inputStream.close(); + // return the input stream to parse from. + return connection.getInputStream(); } else { throw new IOException("HTTP request failed with status code: " + responseCode); } } - /** - * Prepares a thread to stream query response into an OutputStream. Sets an exception handler for - * the thread for reporting. - */ - private Thread streamingThread(String baseUrl, InputStream in, OutputStream out) { - Thread stream = new Thread(() -> { - try { - streamResponse(baseUrl, out); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - Thread.UncaughtExceptionHandler exceptionHandler = (th, ex) -> { - try { - in.close(); - out.close(); - throw new RuntimeException(ex); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - stream.setUncaughtExceptionHandler(exceptionHandler); - return stream; + private void seekToRows(JsonParser parser) throws IOException { + while (parser.nextToken() != null && !ROWS_KEY.equals(parser.currentName())) { + // move position to start of `rows` + } } - /** - * Creates a streaming iterable that parses a JSON {@link InputStream} into a series of - * {@link Frame}s. - */ - @Override - public Iterable createIterable(StatementHandle h, QueryState state, Signature signature, - List parameters, Frame firstFrame) { - if (LookerFrame.class.isAssignableFrom(firstFrame.getClass())) { - try { - LookerFrame lookerFrame = (LookerFrame) firstFrame; - String endpoint = LookerSdkFactory.queryEndpoint(lookerFrame.statementId); - // set up in/out streams - PipedOutputStream out = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(out); - // grab the statement - AvaticaStatement stmt = connection.lookupStatement(h); - // init a new thread to stream from a Looker instance - Thread stream = streamingThread(endpoint, in, out); - stream.start(); - // TODO bug in Avatica - the statement handle never has the signature updated - // workaround by handing the signature to the iterable directly. - return new LookerIterable(stmt, state, firstFrame, in, signature); - } catch (IOException | SQLException e) { - throw new RuntimeException(e); - } + private void seekToValue(JsonParser parser) throws IOException { + while (parser.nextToken() != null && !VALUE_KEY.equals(parser.currentName())) { + // seeking to `value` key for the field e.g. `"rows": [{"field_1": {"value": 123 }}] + } + // now move to the actual value + parser.nextToken(); + } + + private void putExceptionOrFail(BlockingQueue queue, Exception e) { + try { + // `put` blocks until there is room on the queue but needs a catch + queue.put(makeException(e)); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); } - // if this is a normal Frame from Avatica no special treatment is needed. - return super.createIterable(h, state, signature, parameters, firstFrame); } /** - * Iterable that yields an iterator that parses an {@link InputStream} of JSON into a sequence of - * {@link Meta.Frame}s. + * Prepares a thread to stream a query response into a series of {@link FrameEnvelope}s. */ - public class LookerIterable extends FetchIterable implements Iterable { - - static final String ROWS_KEY = "rows"; - static final String VALUE_KEY = "value"; - - final AvaticaStatement statement; - final QueryState state; - final Frame firstFrame; - final JsonParser parser; - final Signature signature; - - LookerIterable(AvaticaStatement statement, QueryState state, Frame firstFrame, InputStream in, - Signature signature) { - super(statement, state, firstFrame); - assert null != in; - this.statement = statement; - this.state = state; - this.firstFrame = firstFrame; - this.signature = signature; + private Thread prepareStreamingThread(String baseUrl, Signature signature, int fetchSize, + BlockingQueue frameQueue) { + Thread stream = new Thread(() -> { try { - // Create a streaming parser based on the provided InputStream - this.parser = new JsonFactory().createParser(in); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Iterator iterator() { - return new LookerIterator(statement, state, firstFrame); - } - - /** - * Iterator that parses an {@link InputStream} of JSON into a sequence of {@link Meta.Frame}s. - */ - public class LookerIterator extends FetchIterator implements Iterator { - - LookerIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame) { - super(stmt, state, firstFrame); - } - - private void seekToRows() throws IOException { - while (parser.nextToken() != null && !ROWS_KEY.equals(parser.currentName())) { - // move position to start of `rows` - } - } + InputStream in = makeRunQueryRequest(baseUrl); + JsonParser parser = new JsonFactory().createParser(in); - private void seekToValue() throws IOException { - while (parser.nextToken() != null && !VALUE_KEY.equals(parser.currentName())) { - // seeking to `value` key for the field e.g. `"rows": [{"field_1": {"value": 123 }}] - } - // now move to the actual value - parser.nextToken(); - } + int currentOffset = 0; - @Override - public Frame doFetch(StatementHandle h, long currentOffset, int fetchSize) { - try { + while (parser.nextToken() != null) { if (currentOffset == 0) { // TODO: Handle `metadata`. We are currently ignoring it and seeking to `rows` array - seekToRows(); + seekToRows(parser); } + int rowsRead = 0; List rows = new ArrayList<>(); - // only read the number of rows requested by the connection config `fetchSize` + while (rowsRead < fetchSize) { - // TODO: could probably be optimized List columns = new ArrayList<>(); // the signature should _always_ have the correct number of columns. // if not, something went wrong during query preparation on the Looker instance. for (int i = 0; i < signature.columns.size(); i++) { - seekToValue(); + seekToValue(parser); + if (parser.isClosed()) { // the stream is closed - all rows should be accounted for - return new Frame(currentOffset + rowsRead, true, rows); + currentOffset += rowsRead; + frameQueue.put(makeFrame(currentOffset, /*done=*/true, rows)); + return; } + // add the value to the column list - Object value = LookerRemoteMeta.deserializeValue(signature.columns.get(i).type.rep, - parser); - columns.add(value); + columns.add(deserializeValue(parser, signature.columns.get(i))); } + rows.add(columns); rowsRead++; } - // we fetched the allowed number of rows so return the frame with the current batch - return new Frame(currentOffset + rowsRead, false, rows); - } catch (IOException e) { - throw new RuntimeException(e); + // we fetched the allowed number of rows so add the complete frame to the queue + currentOffset += rowsRead; + frameQueue.put(makeFrame(currentOffset, /*done=*/false, rows)); } + } catch (Exception e) { + // enqueue all exceptions for the main thread to report + putExceptionOrFail(frameQueue, e); + } + }); + return stream; + } + + /** + * Creates a streaming iterable that parses a JSON {@link InputStream} into a series of + * {@link FrameEnvelope}s. + */ + @Override + public Iterable createIterable(StatementHandle h, QueryState state, Signature signature, + List parameters, Frame firstFrame) { + // If this a LookerFrame, then we must be targeting the sql_interface APIs + if (LookerFrame.class.isAssignableFrom(firstFrame.getClass())) { + try { + // generate the endpoint URL to begin the request + LookerFrame lookerFrame = (LookerFrame) firstFrame; + String url = LookerSdkFactory.queryEndpoint(lookerFrame.sqlInterfaceQueryId); + + // grab the statement + AvaticaStatement stmt = connection.statementMap.get(h.id); + if (null == stmt) { + throw new NoSuchStatementException(h); + } + + // setup queue to place complete frames + BlockingQueue frameQueue = new ArrayBlockingQueue(DEFAULT_FRAME_QUEUE_SIZE); + + // update map so this statement is associated with a queue + stmtQueueMap.put(stmt.handle, frameQueue); + + // init and start a new thread to stream from a Looker instance and populate the frameQueue + prepareStreamingThread(url, signature, stmt.getFetchSize(), frameQueue).start(); + } catch (SQLException | NoSuchStatementException e) { + throw new RuntimeException(e); } } + // always return a FetchIterable - we'll check in LookerRemoteMeta#fetch for any enqueued Frames + return super.createIterable(h, state, signature, parameters, firstFrame); } } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteService.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteService.java index a310d12544..ec4a8111ef 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteService.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteService.java @@ -28,13 +28,14 @@ import java.io.IOException; import java.util.Arrays; -import static org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory.safeSdkCall; +import static org.apache.calcite.avatica.remote.looker.LookerSdkFactory.safeSdkCall; /** * Implementation of {@link org.apache.calcite.avatica.remote.Service} that uses the Looker SDK to * send Avatica request/responses to a Looker instance via JSON. */ public class LookerRemoteService extends JsonService { + public LookerSDK sdk; void setSdk(LookerSDK sdk) { @@ -67,7 +68,7 @@ public String apply(String request) { /** * Handles PrepareAndExecuteRequests by preparing a query via {@link LookerSDK#create_sql_query} - * whose response contains a slug. This slug is used to execute the query via + * whose response contains a query id. This id is used to execute the query via * {@link LookerSDK#run_sql_query} with the 'json_bi' format. * * @param request the base Avatica request to convert into a Looker SDK call. @@ -76,10 +77,12 @@ public String apply(String request) { @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) { assert null != sdk; + WriteSqlInterfaceQueryCreate queryRequest = new WriteSqlInterfaceQueryCreate( request.sql, /*jdbcClient=*/true); SqlInterfaceQuery preparedQuery = safeSdkCall( () -> sdk.create_sql_interface_query(queryRequest)); + Signature signature; try { signature = JsonService.MAPPER.readValue(preparedQuery.getSignature(), Signature.class); diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/LookerSdkFactory.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerSdkFactory.java similarity index 85% rename from core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/LookerSdkFactory.java rename to core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerSdkFactory.java index 05eeaed4d9..22a66a61b4 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/LookerSdkFactory.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerSdkFactory.java @@ -14,9 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.calcite.avatica.remote.looker.utils; - - +package org.apache.calcite.avatica.remote.looker; import com.looker.rtl.AuthSession; import com.looker.rtl.ConfigurationProvider; @@ -49,29 +47,15 @@ private LookerSdkFactory() { private static final String RESULT_FORMAT = "json_bi"; private static final String QUERY_ENDPOINT = "/api/4.0/sql_interface_queries/%s/run/%s"; - /** - * Default buffer size. Could probably be more or less. 1024 chosen for now. - */ - public static final int DEFAULT_STREAM_BUFFER_SIZE = 1024; /** - * Simple functional interface to wrap SDK calls + * Simple interface to wrap SDK calls */ public interface LookerSDKCall { SDKResponse call(); } - /** - * Wraps {@link SQLException}s as {@link RuntimeException}s. Almost all exceptions in Avatica are - * thrown as RuntimeExceptions. There are 'TODO's to change this behavior but until those are - * resolved we should do the same. RuntimeExceptions do not have to be part of the method - * signature so it does make things nicer to work with. - */ - public static RuntimeException handle(String errorMessage) { - return new RuntimeException(errorMessage); - } - /** * Makes the API endpoint to run a previously made query. */ @@ -80,7 +64,7 @@ public static String queryEndpoint(Long id) { } /** - * Makes the SDK call and throws any errors as runtime {@link SQLException}s + * Makes the SDK call and throws any errors as runtime exceptions */ public static T safeSdkCall(LookerSDKCall sdkCall) { try { @@ -88,7 +72,7 @@ public static T safeSdkCall(LookerSDKCall sdkCall) { } catch (Error e) { SDKErrorInfo error = parseSDKError(e.toString()); // TODO: Get full errors from error.errors array - throw handle(error.getMessage()); + throw new RuntimeException(error.getMessage()); } } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/package-info.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/package-info.java deleted file mode 100644 index c33f6306a9..0000000000 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/utils/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Static utilities for {@link org.apache.calcite.avatica.remote.looker} - */ -package org.apache.calcite.avatica.remote.looker.utils; - -// End diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/looker/DriverTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/looker/DriverTest.java index 441186d7d5..8139b03be2 100644 --- a/core/src/test/java/org/apache/calcite/avatica/remote/looker/DriverTest.java +++ b/core/src/test/java/org/apache/calcite/avatica/remote/looker/DriverTest.java @@ -18,9 +18,7 @@ import org.apache.calcite.avatica.AvaticaConnection; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.sql.Connection; import java.sql.Driver; @@ -32,32 +30,39 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; public class DriverTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); @Test public void lookerDriverIsRegistered() throws SQLException { Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com"); + assertThat(driver, is(instanceOf(org.apache.calcite.avatica.remote.looker.Driver.class))); } @Test public void driverThrowsAuthExceptionForBlankProperties() throws SQLException { Properties props = new Properties(); - Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com"); - thrown.expect(SQLInvalidAuthorizationSpecException.class); - thrown.expectMessage("Missing either API3 credentials or access token"); - driver.connect("jdbc:looker:url=foobar.com", props); + try { + Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com"); + driver.connect("jdbc:looker:url=foobar.com", props); + + fail("Should have thrown an auth exception!"); + } catch (SQLInvalidAuthorizationSpecException e) { + assertThat(e.getMessage(), is("Invalid connection params.\nMissing either API3 credentials" + + " or access token")); + } } @Test public void createsAvaticaConnections() throws SQLException { Properties props = new Properties(); props.put("token", "foobar"); + Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com"); Connection connection = driver.connect("jdbc:looker:url=foobar.com", props); + assertThat(connection, is(instanceOf(AvaticaConnection.class))); } } diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMetaTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMetaTest.java index 70da8868b4..be644d3d86 100644 --- a/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMetaTest.java +++ b/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMetaTest.java @@ -18,6 +18,7 @@ import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.ColumnMetaData.AvaticaType; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.calcite.avatica.util.ByteString; import org.apache.calcite.avatica.util.StructImpl; @@ -61,6 +62,7 @@ * Test for Looker specific functionality in {@link LookerRemoteMeta} implementations. */ public class LookerRemoteMetaTest { + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -89,6 +91,9 @@ public class LookerRemoteMetaTest { buildingMap.put(Rep.DOUBLE, new Double(1.99)); buildingMap.put(Rep.STRING, "hello"); buildingMap.put(Rep.NUMBER, new BigDecimal(1000000)); + // TODO: We shouldn't need to support OBJECT but MEASUREs are appearing as generic objects in + // the signature + buildingMap.put(Rep.OBJECT, 1000); supportedRepValues = new HashMap(buildingMap); buildingMap.clear(); @@ -99,17 +104,16 @@ public class LookerRemoteMetaTest { buildingMap.put(Rep.JAVA_UTIL_DATE, new java.util.Date(1000000)); // Unsupported object types buildingMap.put(Rep.ARRAY, new Array[]{}); - buildingMap.put(Rep.BYTE_STRING, new ByteString(new byte[] {'h', 'e', 'l', 'l', 'o'})); + buildingMap.put(Rep.BYTE_STRING, new ByteString(new byte[]{'h', 'e', 'l', 'l', 'o'})); buildingMap.put(Rep.PRIMITIVE_CHAR, 'c'); buildingMap.put(Rep.CHARACTER, new Character('c')); buildingMap.put(Rep.MULTISET, new ArrayList()); buildingMap.put(Rep.STRUCT, new StructImpl(new ArrayList())); - buildingMap.put(Rep.OBJECT, new Object()); unsupportedRepValues = new HashMap(buildingMap); buildingMap.clear(); } - private JsonParser makeTestParserFromRep(Rep rep, Object value) { + private JsonParser makeTestParserFromValue(Object value) throws IOException { String template = "{ \"value\": %s }"; try { String valAsJson = mapper.writeValueAsString(value); @@ -120,9 +124,16 @@ private JsonParser makeTestParserFromRep(Rep rep, Object value) { jp.nextValue(); // move to value itself return jp; } catch (IOException e) { - fail(e.getMessage()); + throw e; } - return null; + } + + private ColumnMetaData makeDummyMetadata(Rep rep) { + // MEASUREs appear as Objects but typeId is the underlying data type (usually int or double) + // See relevant TODO in LookerRemoteMeta#deserializeValue + int typeId = rep == Rep.OBJECT ? 4 : rep.typeId; + AvaticaType type = new AvaticaType(typeId, rep.name(), rep); + return ColumnMetaData.dummy(type, false); } @Test @@ -130,21 +141,22 @@ public void deserializeValueTestingIsExhaustive() { HashMap allMap = new HashMap(); allMap.putAll(supportedRepValues); allMap.putAll(unsupportedRepValues); - Arrays.stream(Rep.values()).forEach(val -> { - assertNotNull(allMap.get(val)); - }); + + Arrays.stream(Rep.values()).forEach(val -> assertNotNull(allMap.get(val))); } @Test public void deserializeValueThrowsErrorOnUnsupportedType() { unsupportedRepValues.forEach((rep, value) -> { - JsonParser parser = makeTestParserFromRep(rep, value); - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to parse " + rep.name() + " from stream!"); try { - LookerRemoteMeta.deserializeValue(rep, parser); + JsonParser parser = makeTestParserFromValue(value); + + // should throw an IOException + LookerRemoteMeta.deserializeValue(parser, makeDummyMetadata(rep)); + fail("Should have thrown an IOException!"); + } catch (IOException e) { - fail(); + assertThat(e.getMessage(), is("Unable to parse " + rep.name() + " from stream!")); } }); } @@ -152,9 +164,12 @@ public void deserializeValueThrowsErrorOnUnsupportedType() { @Test public void deserializeValueWorksForSupportedTypes() { supportedRepValues.forEach((rep, value) -> { - JsonParser parser = makeTestParserFromRep(rep, value); try { - assertThat(value, is(equalTo(LookerRemoteMeta.deserializeValue(rep, parser)))); + JsonParser parser = makeTestParserFromValue(value); + Object deserializedValue = LookerRemoteMeta.deserializeValue(parser, + makeDummyMetadata(rep)); + + assertThat(value, is(equalTo(deserializedValue))); } catch (IOException e) { fail(e.getMessage()); } @@ -163,23 +178,21 @@ public void deserializeValueWorksForSupportedTypes() { @Ignore @Test - public void testIt() throws SQLException, InterruptedException, IOException { + public void testIt() throws SQLException, IOException { Connection connection = DriverManager.getConnection(LookerTestCommon.getUrl(), LookerTestCommon.getBaseProps()); ResultSet models = connection.getMetaData().getSchemas(); while (models.next()) { System.out.println(models.getObject(1)); } - - String sql = - "SELECT `orders.status`" - + "FROM `thelook`.`order_items`"; + String sql = "SELECT 'hello world', AGGREGATE(`million_users.avg_id`) as woot FROM `thelook`" + + ".`million_users` LIMIT 5"; ResultSet test = connection.createStatement().executeQuery(sql); int i = 0; PrintWriter writer = new PrintWriter("the-file-name.txt", "UTF-8"); while (test.next()) { i++; - writer.println(i + ": " + test.getObject(1)); + writer.println(i + ": " + test.getObject(2)); } writer.close(); System.out.println("END !!!!!");