Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Adding batch query interface through CLI #370

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public boolean run()
boolean isFromFile = !isNullOrEmpty(cliOptions.sqlFile);

String query = cliOptions.execute;

if (hasQuery) {
query += ";";
}
Expand Down
3 changes: 3 additions & 0 deletions presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public class ClientOptions
@Option(name = "--max-batch-process-size", title = "Maximum Batch Process Size (Rows)", description = "Maximum Batch Process Size as the number of Rows which can be processed")
public String maxBatchProcessSize = "50000000";

@Option(name = {"-b", "--batch"}, title = "batch query", description = "Execute batch query")
public String batchQuery;

public enum OutputFormat
{
ALIGNED,
Expand Down
15 changes: 14 additions & 1 deletion presto-cli/src/main/java/io/prestosql/cli/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public boolean run()
boolean hasQuery = !isNullOrEmpty(clientOptions.execute);
boolean isFromFile = !isNullOrEmpty(clientOptions.file);
initializeLogging(clientOptions.logLevelsFile);
boolean hasBatchQuery = !isNullOrEmpty(clientOptions.batchQuery);

String query = clientOptions.execute;
if (hasQuery) {
Expand All @@ -103,6 +104,9 @@ public boolean run()
if (hasQuery) {
throw new RuntimeException("both --execute and --file specified");
}
if (hasBatchQuery) {
throw new RuntimeException("both --batch and --file specified");
}
try {
query = asCharSource(new File(clientOptions.file), UTF_8).read();
hasQuery = true;
Expand All @@ -112,6 +116,14 @@ public boolean run()
}
}

if (hasBatchQuery) {
if (hasQuery) {
throw new RuntimeException("both --execute and --batch specified");
}
query = clientOptions.batchQuery;
hasQuery = true;
}

// abort any running query if the CLI is terminated
AtomicBoolean exiting = new AtomicBoolean();
ThreadInterruptor interruptor = new ThreadInterruptor();
Expand Down Expand Up @@ -140,7 +152,8 @@ public boolean run()
Optional.ofNullable(clientOptions.krb5ConfigPath),
Optional.ofNullable(clientOptions.krb5KeytabPath),
Optional.ofNullable(clientOptions.krb5CredentialCachePath),
!clientOptions.krb5DisableRemoteServiceHostnameCanonicalization)) {
!clientOptions.krb5DisableRemoteServiceHostnameCanonicalization,
hasBatchQuery)) {
if (hasQuery) {
return executeCommand(
queryRunner,
Expand Down
7 changes: 5 additions & 2 deletions presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class QueryRunner
{
private final AtomicReference<ClientSession> session;
private final boolean debug;
private final boolean isBatchQuery;
private final OkHttpClient httpClient;
private final Consumer<OkHttpClient.Builder> sslSetup;
private CubeConsole cubeConsole;
Expand All @@ -67,10 +68,12 @@ public QueryRunner(
Optional<String> kerberosConfigPath,
Optional<String> kerberosKeytabPath,
Optional<String> kerberosCredentialCachePath,
boolean kerberosUseCanonicalHostname)
boolean kerberosUseCanonicalHostname,
boolean isBatchQuery)
{
this.session = new AtomicReference<>(requireNonNull(session, "session is null"));
this.debug = debug;
this.isBatchQuery = isBatchQuery;

this.sslSetup = builder -> setupSsl(builder, keystorePath, keystorePassword, truststorePath, truststorePassword);

Expand Down Expand Up @@ -153,7 +156,7 @@ private StatementClient startInternalQuery(ClientSession session, String query)
sslSetup.accept(builder);
OkHttpClient client = builder.build();

return newStatementClient(client, session, query);
return newStatementClient(client, session, query, isBatchQuery);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static QueryRunner createQueryRunner(ClientSession clientSession)
Optional.empty(),
Optional.empty(),
Optional.empty(),
false,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class PrestoHeaders
public static final String PRESTO_PAGE_TOKEN = "X-Presto-Page-Sequence-Id";
public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id";
public static final String PRESTO_BUFFER_COMPLETE = "X-Presto-Buffer-Complete";
public static final String PRESTO_BATCH_QUERY = "X-Presto-Is_Batch_Query";

private PrestoHeaders() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ private StatementClientFactory() {}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
return new StatementClientV1(httpClient, session, query);
return new StatementClientV1(httpClient, session, query, false);
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery)
{
return new StatementClientV1(httpClient, session, query, isBatchQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XPCDP_VALUE;
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP;
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP_VALUE;
import static io.prestosql.client.PrestoHeaders.PRESTO_BATCH_QUERY;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
Expand Down Expand Up @@ -97,10 +98,10 @@ class StatementClientV1
private final String user;
private final String clientCapabilities;
private final boolean timeInMilliseconds;

private boolean isBatchQuery;
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
Expand All @@ -113,6 +114,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
this.user = session.getUser();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.timeInMilliseconds = session.isTimeInMilliseconds();
this.isBatchQuery = isBatchQuery;

Request request = buildQueryRequest(session, query);

Expand Down Expand Up @@ -233,6 +235,9 @@ private Request buildQueryRequest(ClientSession session, String query)
else {
builder.addHeader(HTTP_SECURITY_XXP, HTTP_SECURITY_XXP_VALUE);
}
if (isBatchQuery) {
builder.addHeader(PRESTO_BATCH_QUERY, "1");
}

return builder.build();
}
Expand Down
113 changes: 111 additions & 2 deletions presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.prestosql.spi.resourcegroups.SelectionContext;
import io.prestosql.spi.resourcegroups.SelectionCriteria;
import io.prestosql.spi.service.PropertyService;
import io.prestosql.sql.tree.Statement;
import io.prestosql.statestore.SharedQueryState;
import io.prestosql.statestore.StateCacheStore;
import io.prestosql.statestore.StateFetcher;
Expand All @@ -57,10 +58,12 @@
import javax.inject.Inject;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -170,7 +173,7 @@ public QueryId createQueryId()
return queryIdGenerator.createNextQueryId();
}

public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query)
public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query, boolean isBatchQuery)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(sessionContext, "sessionFactory is null");
Expand All @@ -181,7 +184,12 @@ public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionCont
DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
queryExecutor.execute(() -> {
try {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
if (!isBatchQuery) {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
else {
createBatchQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
}
finally {
queryCreationFuture.set(null);
Expand Down Expand Up @@ -279,6 +287,107 @@ private <C> void createQueryInternal(QueryId queryId, String slug, SessionContex
}
}

private <C> void createBatchQueryInternal(QueryId queryId, String slug, SessionContext sessionContext, String inputQuery, ResourceGroupManager<C> resourceGroupManager)
{
String query = inputQuery;
Session session = null;
DispatchQuery dispatchQuery = null;
List<String> queryList = new ArrayList<>();
List<PreparedQuery> preparedQueryList = new ArrayList<>();
boolean isTransactionControlStatement = false;

try {
if (query.length() > maxQueryLength) {
int queryLength = query.length();
query = query.substring(0, maxQueryLength);
throw new PrestoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
}

// decode session
session = sessionSupplier.createSession(queryId, sessionContext);
StringTokenizer tokenizer = new StringTokenizer(query, ";");
while (tokenizer.hasMoreTokens()) {
String curQuery = tokenizer.nextToken();
queryList.add(curQuery);
// prepare query
preparedQueryList.add(queryPreparer.prepareQuery(session, curQuery));
}

// select resource group
SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
sessionContext.getIdentity().getPrincipal().isPresent(),
sessionContext.getIdentity().getUser(),
Optional.ofNullable(sessionContext.getSource()),
sessionContext.getClientTags(),
sessionContext.getResourceEstimates(),
Optional.empty()));

// apply system default session properties (does not override user set properties)
session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, Optional.empty(), selectionContext.getResourceGroupId());

// Check if any query is transaction control statement
for (PreparedQuery preparedQuery : preparedQueryList) {
Statement statement = preparedQuery.getStatement();
isTransactionControlStatement = isTransactionControlStatement(statement);
if (isTransactionControlStatement) {
break;
}
}
// mark existing transaction as active
transactionManager.activateTransaction(session, isTransactionControlStatement, accessControl);

dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
queryList,
preparedQueryList,
slug,
selectionContext.getResourceGroupId(),
resourceGroupManager,
isTransactionControlStatement);

boolean queryAdded = queryCreated(dispatchQuery);
if (queryAdded && !dispatchQuery.isDone()) {
try {
resourceGroupManager.submit(dispatchQuery, selectionContext, queryExecutor);

if (PropertyService.getBooleanProperty(HetuConstant.MULTI_COORDINATOR_ENABLED) && stateUpdater != null) {
stateUpdater.registerQuery(StateStoreConstants.QUERY_STATE_COLLECTION_NAME, dispatchQuery);
}

if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
LOG.debug("query:%s submission started at %s, ended at %s, total time use: %sms",
dispatchQuery.getQueryId(),
new SimpleDateFormat("HH:mm:ss:SSS").format(dispatchQuery.getCreateTime().toDate()),
new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(now)),
now - dispatchQuery.getCreateTime().getMillis());
}
}
catch (Throwable e) {
// dispatch query has already been registered, so just fail it directly
dispatchQuery.fail(e);
}
}
}
catch (Throwable throwable) {
// creation must never fail, so register a failed query in this case
if (dispatchQuery == null) {
if (session == null) {
session = Session.builder(new SessionPropertyManager())
.setQueryId(queryId)
.setIdentity(sessionContext.getIdentity())
.setSource(sessionContext.getSource())
.build();
}
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, Optional.empty(), throwable);
queryCreated(failedDispatchQuery);
}
else {
dispatchQuery.fail(throwable);
}
}
}

private boolean queryCreated(DispatchQuery dispatchQuery)
{
boolean queryAdded = queryTracker.addQuery(dispatchQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.prestosql.execution.resourcegroups.ResourceGroupManager;
import io.prestosql.spi.resourcegroups.ResourceGroupId;

import java.util.List;

public interface DispatchQueryFactory
{
DispatchQuery createDispatchQuery(
Expand All @@ -27,4 +29,13 @@ DispatchQuery createDispatchQuery(
String slug,
ResourceGroupId resourceGroup,
ResourceGroupManager resourceGroupManager);

DispatchQuery createDispatchQuery(
Session session,
List<String> queryList,
List<PreparedQuery> preparedQueryList,
String slug,
ResourceGroupId resourceGroup,
ResourceGroupManager resourceGroupManager,
boolean isTransactionControlStatement);
}
Loading