Skip to content

Commit

Permalink
Handle null values in stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tjbanghart committed Sep 14, 2023
1 parent 3339a94 commit 24e2532
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class RemoteMeta extends MetaImpl {
final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>();
private Map<DatabaseProperty, Object> databaseProperties;

public RemoteMeta(AvaticaConnection connection, Service service) {
protected RemoteMeta(AvaticaConnection connection, Service service) {
super(connection);
this.service = service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,33 @@
* {@link LookerRemoteMeta#stmtQueueMap} to hold complete Frames and present exceptions when
* consumers are ready to encounter them.
*/
class FrameEnvelope {
class LookerFrameEnvelope {

final Frame frame;
final Exception exception;

private FrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) {
private LookerFrameEnvelope(/*@Nullable*/ Frame frame, /*@Nullable*/ Exception exception) {
this.frame = frame;
this.exception = exception;
}

/**
* Constructs a FrameEnvelope with a {@link Frame}.
* Constructs a LookerFrameEnvelope with a {@link Frame}.
*/
public static FrameEnvelope ok(long offset, boolean done, Iterable<Object> rows) {
public static LookerFrameEnvelope ok(long offset, boolean done, Iterable<Object> rows) {
Frame frame = new Frame(offset, done, rows);
return new FrameEnvelope(frame, null);
return new LookerFrameEnvelope(frame, null);
}

/**
* Constructs a FrameEnvelope to hold an exception
* Constructs a LookerFrameEnvelope to hold an exception
*/
public static FrameEnvelope error(Exception e) {
return new FrameEnvelope(null, e);
public static LookerFrameEnvelope error(Exception e) {
return new LookerFrameEnvelope(null, e);
}

/**
* Whether this FrameEnvelope holds an exception. If true, the envelope holds no {@link Frame}.
* Whether this LookerFrameEnvelope holds an exception. If true, the envelope holds no {@link Frame}.
*/
public boolean hasException() {
return this.exception != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private LookerSDK getSdk() {
* {@code FrameEnvelopes}s that belong to a running statement. See {@link #prepareStreamingThread}
* for more details.
*/
final ConcurrentMap<Integer, BlockingQueue<FrameEnvelope>> stmtQueueMap =
final ConcurrentMap<Integer, BlockingQueue<LookerFrameEnvelope>> stmtQueueMap =
new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -120,17 +120,21 @@ public static final LookerFrame create(Long sqlInterfaceQueryId) {
*/
private void trustAllHosts(HttpsURLConnection connection) {
// Create a trust manager that does not validate certificate chains
TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}

public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}

public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}
}};
TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}

public void checkClientTrusted(java.security.cert.X509Certificate[] certs,
String authType) {
}

public void checkServerTrusted(java.security.cert.X509Certificate[] certs,
String authType) {
}
}
};
// Create all-trusting host name verifier
HostnameVerifier trustAllHostNames = (hostname, session) -> true;
try {
Expand Down Expand Up @@ -187,10 +191,10 @@ protected InputStream makeRunQueryRequest(String url) throws IOException {
}

/**
* Prepares a thread to stream a query response into a series of {@link FrameEnvelope}s.
* Prepares a thread to stream a query response into a series of {@link LookerFrameEnvelope}s.
*/
protected Thread prepareStreamingThread(String baseUrl, Signature signature, int fetchSize,
BlockingQueue<FrameEnvelope> frameQueue) throws IOException {
BlockingQueue<LookerFrameEnvelope> frameQueue) throws IOException {

InputStream in = makeRunQueryRequest(baseUrl);
LookerResponseParser parser = new LookerResponseParser(frameQueue);
Expand All @@ -204,10 +208,10 @@ public Frame fetch(final StatementHandle h, final long offset, final int fetchMa
// If this statement was initiated as a LookerFrame then it will have an entry in the queue map
if (stmtQueueMap.containsKey(h.id)) {
try {
BlockingQueue<FrameEnvelope> queue = stmtQueueMap.get(h.id);
BlockingQueue<LookerFrameEnvelope> queue = stmtQueueMap.get(h.id);

// `take` blocks until there is an entry in the queue
FrameEnvelope nextEnvelope = queue.take();
LookerFrameEnvelope nextEnvelope = queue.take();

// remove the statement from the map if it has an exception, or it is the last frame
if (nextEnvelope.hasException()) {
Expand All @@ -227,7 +231,7 @@ public Frame fetch(final StatementHandle h, final long offset, final int fetchMa

/**
* Creates a streaming iterable that parses a JSON {@link InputStream} into a series of
* {@link FrameEnvelope}s.
* {@link LookerFrameEnvelope}s.
*/
@Override
public Iterable<Object> createIterable(StatementHandle h, QueryState state, Signature signature,
Expand All @@ -246,7 +250,7 @@ public Iterable<Object> createIterable(StatementHandle h, QueryState state, Sign
}

// setup queue to place complete frames
BlockingQueue<FrameEnvelope> frameQueue = new ArrayBlockingQueue(DEFAULT_FRAME_QUEUE_SIZE);
BlockingQueue<LookerFrameEnvelope> frameQueue = new ArrayBlockingQueue(DEFAULT_FRAME_QUEUE_SIZE);

// update map so this statement is associated with a queue
stmtQueueMap.put(stmt.handle.id, frameQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;

import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL;

public class LookerResponseParser {

/**
Expand All @@ -39,9 +41,9 @@ public class LookerResponseParser {
private static final String ROWS_KEY = "rows";
private static final String VALUE_KEY = "value";

private final BlockingQueue<FrameEnvelope> queue;
private final BlockingQueue<LookerFrameEnvelope> queue;

LookerResponseParser(BlockingQueue<FrameEnvelope> queue) {
LookerResponseParser(BlockingQueue<LookerFrameEnvelope> queue) {
assert queue != null : "null queue!";

this.queue = queue;
Expand All @@ -62,6 +64,10 @@ public class LookerResponseParser {
*/
static Object deserializeValue(JsonParser parser, ColumnMetaData columnMetaData)
throws IOException {
// don't attempt to parse null values
if (parser.currentToken() == VALUE_NULL) {
return null;
}
switch (columnMetaData.type.rep) {
case PRIMITIVE_BOOLEAN:
case BOOLEAN:
Expand Down Expand Up @@ -125,15 +131,15 @@ private void seekToValue(JsonParser parser) throws IOException {
private void putExceptionOrFail(Exception e) {
try {
// `put` blocks until there is room on the queue but needs a catch
queue.put(FrameEnvelope.error(e));
queue.put(LookerFrameEnvelope.error(e));
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

/**
* Takes an input stream from a Looker query request and parses it into a series of
* {@link FrameEnvelope}s. Each FrameEnvelope is enqueued in the {@link #queue} of the parser.
* {@link LookerFrameEnvelope}s. Each LookerFrameEnvelope is enqueued in the {@link #queue} of the parser.
*
* @param in the {@link InputStream} to parse.
* @param signature the {@link Signature} for the statement. Needed to access column metadata
Expand Down Expand Up @@ -167,7 +173,7 @@ public void parseStream(InputStream in, Signature signature, int fetchSize) {
if (parser.isClosed()) {
// the stream is closed - all rows should be accounted for
currentOffset += rowsRead;
queue.put(FrameEnvelope.ok(currentOffset, /*done=*/true, rows));
queue.put(LookerFrameEnvelope.ok(currentOffset, /*done=*/true, rows));
return;
}

Expand All @@ -186,7 +192,7 @@ public void parseStream(InputStream in, Signature signature, int fetchSize) {
}
// we fetched the allowed number of rows so add the complete frame to the queue
currentOffset += rowsRead;
queue.put(FrameEnvelope.ok(currentOffset, /*done=*/false, rows));
queue.put(LookerFrameEnvelope.ok(currentOffset, /*done=*/false, rows));
}
} catch (Exception e) {
// enqueue all exceptions for the main thread to report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -165,4 +166,18 @@ public void deserializeValueWorksForSupportedTypes() {
}
});
}

@Test
public void returnsNullIfValueIsNull() {
try {
JsonParser parser = makeTestParserFromValue(null);
Object deserializedValue = LookerResponseParser.deserializeValue(parser,
makeDummyMetadata(Rep.DOUBLE));

assertNull(deserializedValue);

} catch (IOException e) {
fail("Should not throw an exception!");
}
}
}
Binary file modified libs/looker-kotlin-sdk-f91f8ad.jar
Binary file not shown.

0 comments on commit 24e2532

Please sign in to comment.