diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java index 96223d4a95..29d283b8be 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java @@ -44,7 +44,7 @@ public class RemoteMeta extends MetaImpl { final Map propsMap = new HashMap<>(); private Map databaseProperties; - public RemoteMeta(AvaticaConnection connection, Service service) { + protected RemoteMeta(AvaticaConnection connection, Service service) { super(connection); this.service = service; } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/FrameEnvelope.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerFrameEnvelope.java similarity index 70% rename from core/src/main/java/org/apache/calcite/avatica/remote/looker/FrameEnvelope.java rename to core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerFrameEnvelope.java index 63b6102b87..3603982a5f 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/FrameEnvelope.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerFrameEnvelope.java @@ -23,33 +23,33 @@ * {@link LookerRemoteMeta#stmtQueueMap} to hold complete Frames and present exceptions when * consumers are ready to encounter them. */ -class FrameEnvelope { +class LookerFrameEnvelope { final Frame frame; final Exception exception; - private FrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) { + private LookerFrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) { this.frame = frame; this.exception = exception; } /** - * Constructs a FrameEnvelope with a {@link Frame}. + * Constructs a LookerFrameEnvelope with a {@link Frame}. */ - public static FrameEnvelope ok(long offset, boolean done, Iterable rows) { + public static LookerFrameEnvelope ok(long offset, boolean done, Iterable rows) { Frame frame = new Frame(offset, done, rows); - return new FrameEnvelope(frame, null); + return new LookerFrameEnvelope(frame, null); } /** - * Constructs a FrameEnvelope to hold an exception + * Constructs a LookerFrameEnvelope to hold an exception */ - public static FrameEnvelope error(Exception e) { - return new FrameEnvelope(null, e); + public static LookerFrameEnvelope error(Exception e) { + return new LookerFrameEnvelope(null, e); } /** - * Whether this FrameEnvelope holds an exception. If true, the envelope holds no {@link Frame}. + * Whether this LookerFrameEnvelope holds an exception. If true, the envelope holds no {@link Frame}. */ public boolean hasException() { return this.exception != null; diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java index 01046216d4..ab68dfb8c0 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerRemoteMeta.java @@ -82,7 +82,7 @@ private LookerSDK getSdk() { * {@code FrameEnvelopes}s that belong to a running statement. See {@link #prepareStreamingThread} * for more details. */ - final ConcurrentMap> stmtQueueMap = + final ConcurrentMap> stmtQueueMap = new ConcurrentHashMap<>(); /** @@ -120,17 +120,21 @@ public static final LookerFrame create(Long sqlInterfaceQueryId) { */ private void trustAllHosts(HttpsURLConnection connection) { // Create a trust manager that does not validate certificate chains - TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { - public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; - } - - public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { - } - - public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { - } - }}; + TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, + String authType) { + } + + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, + String authType) { + } + } + }; // Create all-trusting host name verifier HostnameVerifier trustAllHostNames = (hostname, session) -> true; try { @@ -187,10 +191,10 @@ protected InputStream makeRunQueryRequest(String url) throws IOException { } /** - * Prepares a thread to stream a query response into a series of {@link FrameEnvelope}s. + * Prepares a thread to stream a query response into a series of {@link LookerFrameEnvelope}s. */ protected Thread prepareStreamingThread(String baseUrl, Signature signature, int fetchSize, - BlockingQueue frameQueue) throws IOException { + BlockingQueue frameQueue) throws IOException { InputStream in = makeRunQueryRequest(baseUrl); LookerResponseParser parser = new LookerResponseParser(frameQueue); @@ -204,10 +208,10 @@ public Frame fetch(final StatementHandle h, final long offset, final int fetchMa // If this statement was initiated as a LookerFrame then it will have an entry in the queue map if (stmtQueueMap.containsKey(h.id)) { try { - BlockingQueue queue = stmtQueueMap.get(h.id); + BlockingQueue queue = stmtQueueMap.get(h.id); // `take` blocks until there is an entry in the queue - FrameEnvelope nextEnvelope = queue.take(); + LookerFrameEnvelope nextEnvelope = queue.take(); // remove the statement from the map if it has an exception, or it is the last frame if (nextEnvelope.hasException()) { @@ -227,7 +231,7 @@ public Frame fetch(final StatementHandle h, final long offset, final int fetchMa /** * Creates a streaming iterable that parses a JSON {@link InputStream} into a series of - * {@link FrameEnvelope}s. + * {@link LookerFrameEnvelope}s. */ @Override public Iterable createIterable(StatementHandle h, QueryState state, Signature signature, @@ -246,7 +250,7 @@ public Iterable createIterable(StatementHandle h, QueryState state, Sign } // setup queue to place complete frames - BlockingQueue frameQueue = new ArrayBlockingQueue(DEFAULT_FRAME_QUEUE_SIZE); + BlockingQueue frameQueue = new ArrayBlockingQueue(DEFAULT_FRAME_QUEUE_SIZE); // update map so this statement is associated with a queue stmtQueueMap.put(stmt.handle.id, frameQueue); diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerResponseParser.java b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerResponseParser.java index b0eec02153..73f1e50b5b 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerResponseParser.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/looker/LookerResponseParser.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.concurrent.BlockingQueue; +import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL; + public class LookerResponseParser { /** @@ -39,9 +41,9 @@ public class LookerResponseParser { private static final String ROWS_KEY = "rows"; private static final String VALUE_KEY = "value"; - private final BlockingQueue queue; + private final BlockingQueue queue; - LookerResponseParser(BlockingQueue queue) { + LookerResponseParser(BlockingQueue queue) { assert queue != null : "null queue!"; this.queue = queue; @@ -62,6 +64,10 @@ public class LookerResponseParser { */ static Object deserializeValue(JsonParser parser, ColumnMetaData columnMetaData) throws IOException { + // don't attempt to parse null values + if (parser.currentToken() == VALUE_NULL) { + return null; + } switch (columnMetaData.type.rep) { case PRIMITIVE_BOOLEAN: case BOOLEAN: @@ -125,7 +131,7 @@ private void seekToValue(JsonParser parser) throws IOException { private void putExceptionOrFail(Exception e) { try { // `put` blocks until there is room on the queue but needs a catch - queue.put(FrameEnvelope.error(e)); + queue.put(LookerFrameEnvelope.error(e)); } catch (InterruptedException ex) { throw new RuntimeException(ex); } @@ -133,7 +139,7 @@ private void putExceptionOrFail(Exception e) { /** * Takes an input stream from a Looker query request and parses it into a series of - * {@link FrameEnvelope}s. Each FrameEnvelope is enqueued in the {@link #queue} of the parser. + * {@link LookerFrameEnvelope}s. Each LookerFrameEnvelope is enqueued in the {@link #queue} of the parser. * * @param in the {@link InputStream} to parse. * @param signature the {@link Signature} for the statement. Needed to access column metadata @@ -167,7 +173,7 @@ public void parseStream(InputStream in, Signature signature, int fetchSize) { if (parser.isClosed()) { // the stream is closed - all rows should be accounted for currentOffset += rowsRead; - queue.put(FrameEnvelope.ok(currentOffset, /*done=*/true, rows)); + queue.put(LookerFrameEnvelope.ok(currentOffset, /*done=*/true, rows)); return; } @@ -186,7 +192,7 @@ public void parseStream(InputStream in, Signature signature, int fetchSize) { } // we fetched the allowed number of rows so add the complete frame to the queue currentOffset += rowsRead; - queue.put(FrameEnvelope.ok(currentOffset, /*done=*/false, rows)); + queue.put(LookerFrameEnvelope.ok(currentOffset, /*done=*/false, rows)); } } catch (Exception e) { // enqueue all exceptions for the main thread to report diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerResponseParserTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerResponseParserTest.java index 2162c7ff66..e8478d9ebe 100644 --- a/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerResponseParserTest.java +++ b/core/src/test/java/org/apache/calcite/avatica/remote/looker/LookerResponseParserTest.java @@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -165,4 +166,18 @@ public void deserializeValueWorksForSupportedTypes() { } }); } + + @Test + public void returnsNullIfValueIsNull() { + try { + JsonParser parser = makeTestParserFromValue(null); + Object deserializedValue = LookerResponseParser.deserializeValue(parser, + makeDummyMetadata(Rep.DOUBLE)); + + assertNull(deserializedValue); + + } catch (IOException e) { + fail("Should not throw an exception!"); + } + } } diff --git a/libs/looker-kotlin-sdk-f91f8ad.jar b/libs/looker-kotlin-sdk-f91f8ad.jar index d16b14fd56..1625e286ca 100644 Binary files a/libs/looker-kotlin-sdk-f91f8ad.jar and b/libs/looker-kotlin-sdk-f91f8ad.jar differ