Skip to content

Commit

Permalink
Use BlockingQueue of FrameEnvelopes rather than IO pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
tjbanghart committed Sep 8, 2023
1 parent 1590447 commit 78906b2
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState
}

/** Creates a statement wrapper around an existing handle. */
public AvaticaStatement lookupStatement(Meta.StatementHandle h)
protected AvaticaStatement lookupStatement(Meta.StatementHandle h)
throws SQLException {
final AvaticaStatement statement = statementMap.get(h.id);
if (statement != null) {
Expand Down
17 changes: 5 additions & 12 deletions core/src/main/java/org/apache/calcite/avatica/MetaImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1527,12 +1527,12 @@ public Object next() {

/** Iterable that yields an iterator over rows coming from a sequence of
* {@link Meta.Frame}s. */
public class FetchIterable implements Iterable<Object> {
private class FetchIterable implements Iterable<Object> {
private final AvaticaStatement stmt;
private final QueryState state;
private final Frame firstFrame;

public FetchIterable(AvaticaStatement stmt, QueryState state, Frame firstFrame) {
private FetchIterable(AvaticaStatement stmt, QueryState state, Frame firstFrame) {
this.stmt = stmt;
this.state = state;
this.firstFrame = firstFrame;
Expand All @@ -1544,15 +1544,15 @@ public Iterator<Object> iterator() {
}

/** Iterator over rows coming from a sequence of {@link Meta.Frame}s. */
public class FetchIterator implements Iterator<Object> {
private class FetchIterator implements Iterator<Object> {
private final AvaticaStatement stmt;
private final QueryState state;
private final int fetchSize;
private Frame frame;
private Iterator<Object> rows;
private long currentOffset = 0;

public FetchIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame) {
private FetchIterator(AvaticaStatement stmt, QueryState state, Frame firstFrame) {
this.stmt = stmt;
this.state = state;
int fetchRowCount;
Expand Down Expand Up @@ -1590,13 +1590,6 @@ public Object next() {
return o;
}

/** A helper method to call {@link Meta#fetch}. Pulled out from {@link #moveNext()} so that
* extending classes can override the fetch implementation without calling out to a Meta. */
protected Frame doFetch(StatementHandle h, long currentOffset, int fetchSize)
throws NoSuchStatementException, MissingResultsException {
return fetch(h, currentOffset, fetchSize);
}

void moveNext() {
for (;;) {
if (rows.hasNext()) {
Expand All @@ -1608,7 +1601,7 @@ void moveNext() {
}
try {
// currentOffset updated after element is read from `rows` iterator
frame = doFetch(stmt.handle, currentOffset, fetchSize);
frame = fetch(stmt.handle, currentOffset, fetchSize);
} catch (NoSuchStatementException e) {
resetStatement();
// re-fetch the batch where we left off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
* driver.
*/
public class RemoteMeta extends MetaImpl {
public final Service service;
final Service service;
final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>();
private Map<DatabaseProperty, Object> databaseProperties;

public RemoteMeta(AvaticaConnection connection, Service service) {
protected RemoteMeta(AvaticaConnection connection, Service service) {
super(connection);
this.service = service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.SQLException;
Expand Down Expand Up @@ -147,7 +146,7 @@ protected static int p(int result, long v) {
name = "prepareAndExecuteBatch"),
@JsonSubTypes.Type(value = ExecuteBatchRequest.class, name = "executeBatch") })
abstract class Request extends Base {
abstract Response accept(Service service) throws IOException;
abstract Response accept(Service service);
abstract Request deserialize(Message genericMsg);
abstract Message serialize();
}
Expand Down Expand Up @@ -949,7 +948,7 @@ public PrepareAndExecuteRequest(
this.maxRowsInFirstFrame = maxRowsInFirstFrame;
}

@Override ExecuteResponse accept(Service service) throws IOException {
@Override ExecuteResponse accept(Service service) {
return service.apply(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.avatica.remote.Service;
import org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory;

import com.looker.sdk.LookerSDK;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/**
* JDBC Driver for Looker's SQL Interface. Communicates with a Looker instance via
* {@link LookerSDK}. Backed by Looker-specific {@link LookerRemoteMeta} and
* {@link LookerRemoteService}.
*
* Use 'jdbc:looker' as the protocol to select this over the default remote Avatica driver.
*/
public class Driver extends UnregisteredDriver {

static {
Expand Down Expand Up @@ -69,12 +75,14 @@ public Meta createMeta(AvaticaConnection connection) {
AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);

if (conn == null) {
// It's not an url for our driver
// the URL did not match Looker's JDBC connection string prefix
return null;
}
Service service = conn.getService();

// the `looker` driver should always have a matching Service
Service service = conn.getService();
assert service instanceof LookerRemoteService;

// create and set LookerSDK for the connection
LookerSDK sdk = LookerSdkFactory.createSdk(conn.config().url(), info);
((LookerRemoteService) service).setSdk(sdk);
Expand Down
Loading

0 comments on commit 78906b2

Please sign in to comment.