Skip to content

Commit

Permalink
SQL Interface API endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tjbanghart committed Sep 5, 2023
1 parent 1e914b6 commit 1590447
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 97 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ allprojects {
dependencies {
"implementation"(platform(project(":bom")))
// Add the locally bundled LookerSDK fat jar
"implementation"(files("../libs/looker-kotlin-sdk-all.jar"))
"implementation"(files("../libs/looker-kotlin-sdk-f91f8ad.jar"))
}
}
if (!skipAutostyle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,7 @@ public Meta getMeta(AvaticaConnection connection) {
* @param <T> The return type from {@code call}.
*/
public interface CallableWithoutException<T> {
// TODO: whoops, broke the "WithoutException" part. Let's make sure we handle errors better.
T call() throws SQLException;
T call();
}

/**
Expand All @@ -802,9 +801,6 @@ public <T> T invokeWithRetries(CallableWithoutException<T> callable) {
continue;
}
throw e;
// TODO: don't do this - raise a proper AvaticaRuntimeException for callables
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (null != lastException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,52 @@ public LookerRemoteMeta(AvaticaConnection connection, Service service) {
assert service.getClass() == LookerRemoteService.class;
}

/**
* Calls the correct method to read the current value on the stream. The {@code get} methods do
* not advance the current token, so they can be called multiple times without changing the state
* of the parser.
*
* @param columnTypeRep the internal Avatica representation for this value. It is important to
* use the {@link Rep} rather than the type name since Avatica represents most datetime values
* as milliseconds since epoch via {@code long}s or {@code int}s.
* @param parser a JsonParser whose current token is a value from the JSON response. Callers
* must ensure that the parser is ready to consume a value token. This method does not change
* the state of the parser.
* @return the parsed value.
*/
static Object deserializeValue(Rep columnTypeRep, JsonParser parser) throws IOException {
switch (columnTypeRep) {
case PRIMITIVE_BOOLEAN:
case BOOLEAN:
return parser.getBooleanValue();
case PRIMITIVE_BYTE:
case BYTE:
return parser.getByteValue();
case STRING:
return parser.getValueAsString();
case PRIMITIVE_SHORT:
case SHORT:
return parser.getShortValue();
case PRIMITIVE_INT:
case INTEGER:
return parser.getIntValue();
case PRIMITIVE_LONG:
case LONG:
return parser.getLongValue();
case PRIMITIVE_FLOAT:
case FLOAT:
return parser.getFloatValue();
case PRIMITIVE_DOUBLE:
case DOUBLE:
return parser.getDoubleValue();
case NUMBER:
// NUMBER is represented as BigDecimal
return parser.getDecimalValue();
default:
throw new RuntimeException("Unable to parse " + columnTypeRep + " from stream!");
}
}

/**
* An initially empty frame specific to Looker result sets. The {@code statementSlug} is used to
* begin a streaming query.
Expand All @@ -79,22 +125,21 @@ static class LookerFrame extends Frame {
* A unique ID for the current SQL statement to run. Prepared and set during
* {@link LookerRemoteService#apply(PrepareAndExecuteRequest)}.
*/
public final String statementSlug;
public final Long statementId;

private LookerFrame(long offset, boolean done, Iterable<Object> rows, String statementSlug) {
private LookerFrame(long offset, boolean done, Iterable<Object> rows, Long statementId) {
super(offset, done, rows);
this.statementSlug = statementSlug;
this.statementId = statementId;
}

/**
* Creates a {@code LookerFrame} for the statement slug
*
* @param statementSlug slug for the prepared statement generated by a Looker instance.
* @param statementId id for the prepared statement generated by a Looker instance.
* @return the {@code firstFrame} for the result set.
*/
public static final LookerFrame create(String statementSlug) {
assert null != statementSlug;
return new LookerFrame(0, false, Collections.emptyList(), statementSlug);
public static final LookerFrame create(Long statementId) {
return new LookerFrame(0, false, Collections.emptyList(), statementId);
}
}

Expand Down Expand Up @@ -159,7 +204,7 @@ private void streamResponse(String url, OutputStream outputStream) throws IOExce
// timeout is given as seconds
int timeout = sdkTransport.getOptions().getTimeout() * 1000;
connection.setReadTimeout(timeout);
connection.setRequestMethod("POST");
connection.setRequestMethod("GET");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
// Set the auth header as the SDK would
Expand Down Expand Up @@ -216,7 +261,7 @@ public Iterable<Object> createIterable(StatementHandle h, QueryState state, Sign
if (LookerFrame.class.isAssignableFrom(firstFrame.getClass())) {
try {
LookerFrame lookerFrame = (LookerFrame) firstFrame;
String endpoint = LookerSdkFactory.queryEndpoint(lookerFrame.statementSlug);
String endpoint = LookerSdkFactory.queryEndpoint(lookerFrame.statementId);
// set up in/out streams
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);
Expand Down Expand Up @@ -281,50 +326,6 @@ public class LookerIterator extends FetchIterator implements Iterator<Object> {
super(stmt, state, firstFrame);
}

/**
* Calls the correct method to read the current value on the stream. The {@code get} methods
* do not advance the current token, so they can be called multiple times without changing the
* state of the parser.
*
* @param columnTypeRep the internal Avatica representation for this value. It is
* important to use the {@link Rep} rather than the type name since Avatica represents
* most datetime values as milliseconds since epoch via {@code long}s or {@code int}s.
* @return the parsed value.
*/
private Object deserializeValue(Rep columnTypeRep) throws IOException {
switch (columnTypeRep) {
case PRIMITIVE_BOOLEAN:
case BOOLEAN:
return parser.getBooleanValue();
case PRIMITIVE_BYTE:
case BYTE:
return parser.getByteValue();
case PRIMITIVE_CHAR:
case CHARACTER:
case STRING:
return parser.getValueAsString();
case PRIMITIVE_SHORT:
case SHORT:
return parser.getShortValue();
case PRIMITIVE_INT:
case INTEGER:
return parser.getIntValue();
case PRIMITIVE_LONG:
case LONG:
return parser.getLongValue();
case PRIMITIVE_FLOAT:
case FLOAT:
return parser.getFloatValue();
case PRIMITIVE_DOUBLE:
case DOUBLE:
return parser.getDoubleValue();
case NUMBER:
return parser.getBigIntegerValue();
default:
throw new RuntimeException("Unable to parse " + columnTypeRep + "from stream!");
}
}

private void seekToRows() throws IOException {
while (parser.nextToken() != null && !ROWS_KEY.equals(parser.currentName())) {
// move position to start of `rows`
Expand Down Expand Up @@ -361,7 +362,8 @@ public Frame doFetch(StatementHandle h, long currentOffset, int fetchSize) {
return new Frame(currentOffset + rowsRead, true, rows);
}
// add the value to the column list
Object value = deserializeValue(signature.columns.get(i).type.rep);
Object value = LookerRemoteMeta.deserializeValue(signature.columns.get(i).type.rep,
parser);
columns.add(value);
}
rows.add(columns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@

import com.looker.sdk.JdbcInterface;
import com.looker.sdk.LookerSDK;
import com.looker.sdk.SqlQuery;
import com.looker.sdk.SqlQueryCreate;
import com.looker.sdk.SqlInterfaceQuery;
import com.looker.sdk.WriteSqlInterfaceQueryCreate;

import java.io.IOException;
import java.util.Arrays;

import static org.apache.calcite.avatica.remote.looker.utils.LookerSdkFactory.safeSdkCall;
Expand Down Expand Up @@ -75,24 +76,16 @@ public String apply(String request) {
@Override
public ExecuteResponse apply(PrepareAndExecuteRequest request) {
assert null != sdk;
// TODO: b/288031194 - Remove this stubbed query once the Looker SQL endpoints exist.
// For dev we first prepare the query to get a signature and then create a query to run.
String prepSql = "SELECT\n" + " (FORMAT_DATE('%F %T', `order_items.created_date` )) AS "
+ "order_items_created_time, 'AHHHH' as testy, 10000 as num\n"
+ "FROM `thelook`.`order_items`\n" + " AS order_items\n" + "GROUP BY\n" + " 1\n"
+ "ORDER BY\n" + " 1 DESC";
PrepareRequest prepareRequest = new PrepareRequest("looker-adapter", prepSql, -1);
PrepareResponse prepare = super.apply(prepareRequest);
SqlQuery query = safeSdkCall(() -> {
SqlQueryCreate sqlQuery = new SqlQueryCreate(
/* connection_name=*/ null,
/* connection_id=*/ null,
/* model_name=*/ "thelook",
/* sql=*/ request.sql,
/* vis_config=*/ null);
return sdk.create_sql_query(sqlQuery);
});
return lookerExecuteResponse(request, prepare.statement.signature,
LookerFrame.create(query.getSlug()));
WriteSqlInterfaceQueryCreate queryRequest = new WriteSqlInterfaceQueryCreate(
request.sql, /*jdbcClient=*/true);
SqlInterfaceQuery preparedQuery = safeSdkCall(
() -> sdk.create_sql_interface_query(queryRequest));
Signature signature;
try {
signature = JsonService.MAPPER.readValue(preparedQuery.getSignature(), Signature.class);
} catch (IOException e) {
throw handle(e);
}
return lookerExecuteResponse(request, signature, LookerFrame.create(preparedQuery.getId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private LookerSdkFactory() {
}

private static final String RESULT_FORMAT = "json_bi";
private static final String QUERY_ENDPOINT = "/api/4.0/sql_queries/%s/run/%s";
private static final String QUERY_ENDPOINT = "/api/4.0/sql_interface_queries/%s/run/%s";
/**
* Default buffer size. Could probably be more or less. 1024 chosen for now.
*/
Expand All @@ -68,14 +68,14 @@ public interface LookerSDKCall {
* resolved we should do the same. RuntimeExceptions do not have to be part of the method
* signature so it does make things nicer to work with.
*/
public static RuntimeException handle(SQLException e) {
return new RuntimeException(e);
public static RuntimeException handle(String errorMessage) {
return new RuntimeException(errorMessage);
}

/**
* Makes the API endpoint to run a previously made query.
*/
public static String queryEndpoint(String id) {
public static String queryEndpoint(Long id) {
return String.format(Locale.ROOT, QUERY_ENDPOINT, TransportKt.encodeParam(id), RESULT_FORMAT);
}

Expand All @@ -88,7 +88,7 @@ public static <T> T safeSdkCall(LookerSDKCall sdkCall) {
} catch (Error e) {
SDKErrorInfo error = parseSDKError(e.toString());
// TODO: Get full errors from error.errors array
throw handle(new SQLException(error.getMessage()));
throw handle(error.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote.looker;

import org.apache.calcite.avatica.AvaticaConnection;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLInvalidAuthorizationSpecException;
import java.util.Properties;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class DriverTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void lookerDriverIsRegistered() throws SQLException {
Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com");
assertThat(driver, is(instanceOf(org.apache.calcite.avatica.remote.looker.Driver.class)));
}

@Test
public void driverThrowsAuthExceptionForBlankProperties() throws SQLException {
Properties props = new Properties();
Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com");
thrown.expect(SQLInvalidAuthorizationSpecException.class);
thrown.expectMessage("Missing either API3 credentials or access token");
driver.connect("jdbc:looker:url=foobar.com", props);
}

@Test
public void createsAvaticaConnections() throws SQLException {
Properties props = new Properties();
props.put("token", "foobar");
Driver driver = DriverManager.getDriver("jdbc:looker:url=foobar.com");
Connection connection = driver.connect("jdbc:looker:url=foobar.com", props);
assertThat(connection, is(instanceOf(AvaticaConnection.class)));
}
}
Loading

0 comments on commit 1590447

Please sign in to comment.