Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Cursors through API Query Params #14110

Merged
merged 61 commits into from
Dec 24, 2024
Merged

Conversation

vrajat
Copy link
Collaborator

@vrajat vrajat commented Sep 30, 2024

Cursor support will allow Pinot clients to consume query results in smaller chunks. This feature allows clients to work with lesser resources esp. memory. Application logic is simpler with cursors. For example an app UI paginates through results in a table or a graph. Cursor support has been implemented using APIs.

Design Doc

Implementation for #13185

API

POST /query/sql

A new broker API parameter has been added to trigger pagination.
The API accepts the following new optional query parameters:

  • getCursor(boolean):
  • numRows (int): The number of rows to return in the first page.

The response contains the following extra fields:

Field Description
brokerHost hostname of the processing broker
brokerPort port of the processing broker
offset starting offset of the result table slice in the cursor response
numRows Number of rows in result table slice in the cursor response
cursorResultWriteTimeMs Time taken to write the query response to ResponseStore
cursorFetchTimeMs Time taken to read a slice of the result table from ResponseStore
submissionTime Unix timestamp when the query was submitted
expirationTime Unix timestamp when the response can be deleted from the ResponseStore
bytesWritten number of bytes written to the response store when storing the result table

GET /resultStore/{requestId}/results

This is a broker API that can be used to iterate over the result set of a query submitted using the above API.
The API accepts the following query parameters:

  • offset (int) (required): The start offset of the page of results.
  • numRows (int) (optional): The number of rows in the page. By default it will use the default size.

GET /resultStore/{requestId}/

Returns the BrokerResponse metadata of the query.

GET /resultStore

Lists all the requestIds of all the query results available in the response store.

DELETE /resultStore/{requestId}/

Delete the results of a query.
The API accepts the following query parameters:

  • requestId (required)

SPI

The PR implements a FileSystem ResponseStore and a JSON ResponseSerde.

The feature provides two SPIs to extend the feature to support other implementations:

  • ResponseSerde: Serialize/Deserialize the response.
  • ResponseStore: Store responses in a storage system.
    Both SPIs use Java SPI and the default ServiceLoader to find implementation of the SPIs. All implementation should be annotated with AutoService to help generate files for discovering the implementations.

Configuration

ResponseStore

Configuration Default Description
pinot.broker.cursor.response.store.type file The protocol to use for storage

File Response Store

Configuration Default Description
pinot.broker.cursor.response.store.file.data.dir /tmp/pinot/broker/response_store/data Location where result files will be stored.
pinot.broker.cursor.response.store.file.data.dir file:///tmp/pinot/broker/response_store/data Location where temporary files will be created.

Miscellaneous

Configuration Default Description
pinot.broker.cursor.fetch.rows 10000 The result size if numRows is not specified in the API call.
pinot.broker.cursor.response.store.expiration 1h The time before a query result will be deleted.
controller.cluster.response.store.cleaner.frequencyPeriod 1h The frequency of the periodic task that deletes expired query results
controller.cluster.response.store.cursor.cleaner.initialDelay random The initial delay before the first run of the periodic task.

tags: feature, multi-stage, release-notes

@codecov-commenter
Copy link

codecov-commenter commented Sep 30, 2024

Codecov Report

Attention: Patch coverage is 14.06250% with 385 lines in your changes missing coverage. Please review.

Project coverage is 63.90%. Comparing base (59551e4) to head (ce3c4d7).
Report is 1502 commits behind head on master.

Files with missing lines Patch % Lines
...g/apache/pinot/broker/cursors/FsResponseStore.java 14.28% 76 Missing and 2 partials ⚠️
...pinot/controller/cursors/ResponseStoreCleaner.java 18.88% 71 Missing and 2 partials ⚠️
...t/common/response/broker/CursorResponseNative.java 0.00% 67 Missing ⚠️
...he/pinot/common/cursors/AbstractResponseStore.java 10.44% 60 Missing ⚠️
...ot/broker/api/resources/ResponseStoreResource.java 0.00% 51 Missing ⚠️
...apache/pinot/spi/cursors/ResponseStoreService.java 0.00% 22 Missing ⚠️
...r/requesthandler/BrokerRequestHandlerDelegate.java 11.76% 15 Missing ⚠️
...pinot/broker/api/resources/PinotClientRequest.java 10.00% 8 Missing and 1 partial ⚠️
...apache/pinot/broker/cursors/JsonResponseSerde.java 25.00% 3 Missing ⚠️
...e/pinot/common/utils/config/QueryOptionsUtils.java 0.00% 3 Missing ⚠️
... and 3 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14110      +/-   ##
============================================
+ Coverage     61.75%   63.90%   +2.14%     
- Complexity      207     1608    +1401     
============================================
  Files          2436     2713     +277     
  Lines        133233   149658   +16425     
  Branches      20636    22908    +2272     
============================================
+ Hits          82274    95634   +13360     
- Misses        44911    47012    +2101     
- Partials       6048     7012     +964     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.87% <14.06%> (+2.16%) ⬆️
java-21 63.75% <14.06%> (+2.12%) ⬆️
skip-bytebuffers-false 63.89% <14.06%> (+2.15%) ⬆️
skip-bytebuffers-true 63.71% <14.06%> (+35.98%) ⬆️
temurin 63.90% <14.06%> (+2.14%) ⬆️
unittests 63.89% <14.06%> (+2.15%) ⬆️
unittests1 56.26% <3.03%> (+9.37%) ⬆️
unittests2 34.41% <14.06%> (+6.68%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@vrajat vrajat force-pushed the cursors branch 2 times, most recently from 12a6250 to d46760b Compare October 17, 2024 05:32
@vrajat vrajat changed the title Cursors Add support for Cursors through API Query Params Oct 17, 2024
@vrajat vrajat added feature multi-stage Related to the multi-stage query engine labels Oct 17, 2024
@vrajat vrajat marked this pull request as ready for review October 17, 2024 16:27
@@ -100,6 +103,9 @@
public class PinotClientRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotClientRequest.class);

@Inject
PinotConfiguration _brokerConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to inject the broker config here as it's already in the BaseBrokerRequestHandler class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no getter right now to get the config. I can add that. However is that a better option than injecting the config ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However is that a better option than injecting the config ?

Probably not. It doesn't make sense to add that into the interface and we don't want to depend on the base class.

private static final Logger LOGGER = LoggerFactory.getLogger(ResponseStoreResource.class);

@Inject
private PinotConfiguration _brokerConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, you might not need to inject the broker config here.

}

response.setResultTable(
new ResultTable(resultTable.getDataSchema(), resultTable.getRows().subList(offset, sliceEnd)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean every time a read request will fetch the whole result table into memory first and then pick the page from the memory?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation does read the whole result table into memory. Does LinkedIn implementation support seeking within the result table ? Do you have suggestions on how can the interface be changed to also support seek ?

x -> new InstanceInfo(x.getInstanceName(), x.getHostName(), Integer.parseInt(x.getPort()))));

try {
Map<String, List<CursorResponseNative>> brokerResponses = getAllQueryResults(brokers, Collections.emptyMap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it seems every controller will try to fetch all the results from all the brokers, is it kind of a waste of resources to do that? Could we distribute the cleanup workloads to controllers instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that only the lead controller will run the periodic tasks. Right now, the controller does not have any information about ResponseStores. Only the broker has information.

The major point to discuss - is it a requirement that a ResponseStore should be accessible from all nodes ? Then the default implementation of using the broker local filesystem has to be changed.

The current perioidc task and cleaner assumes that a only a specific broker knows about the responses it has stored.

Comment on lines 105 to 107
@ManualAuthorization
public BrokerResponse getSqlQueryMetadata(
@ApiParam(value = "Request ID of the query", required = true) @PathParam("requestId") String requestId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this @ManualAuthorization means that anyone can get the metadata of any known request id?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've messed up my git branches. I have the code in a branch. I'll merge the code.

Comment on lines +133 to +138
@ManualAuthorization
public void getSqlQueryResult(
Copy link
Contributor

@gortiz gortiz Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we check here that only the user that opened the cursor can read from it? IICU that is one of the requirements in the design document

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've messed up my git branches. I have the code in a branch. I'll merge the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged support for authorization. My initial plan was to store the user who submitted the query. However that will require a lot of API changes in access control classes. Instead the response store contains the list of tables queried and that is used to authorize. So two users who have access to the same tables can read each other's cursors. I have not added tests yet as I havent found the infra to test these cases.

Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

URI dataFile = combinePath(queryDir, String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
try {
_responseSerde.serialize(resultTable, Files.newOutputStream(tempResultTableFile));
pinotFS.copyFromLocalFile(tempResultTableFile.toFile(), dataFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: couldn't you just use pinotFS.move? In this specific case, that could be implemented very cheap in LocalPinotFS (although I can see it is not)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wont work. e.g. in S3PinotFs, move eventually calls copyObject which assumes that the src is already in S3. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! is this class used for all PinotFs? I was thinking it was used for local fs 🤦

Path tempResultTableFile = getTempPath(_localTempDir, "resultTable", requestId);
URI dataFile = combinePath(queryDir, String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
try {
_responseSerde.serialize(resultTable, Files.newOutputStream(tempResultTableFile));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This output stream should also be closed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug in deeper why this worked without a close. Seems like Jackson may or may not decide to close the stream.

Method that can be used to serialize any Java value as JSON output, using output stream provided (using encoding JsonEncoding.UTF8.
Note: method does not close the underlying stream explicitly here; however, JsonFactor this mapper uses may choose to close the stream depending on its settings (by default, it will try to close it when JsonGenerator.

Ref: https://fasterxml.github.io/jackson-databind/javadoc/2.8/com/fasterxml/jackson/databind/ObjectWriter.html#writeValue(java.io.OutputStream,%20java.lang.Object)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a close.

doClean(cleanAtMs);
}

public void doClean(long currentTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method declares brokerResponses and responses variables. The first is the list of cursors open on each broker, which the method iterates to then ask each broker to delete it if needed. responses store the result of that second request to brokers.

It may be subjective, but the names are not very clear. Both are responses from browsers. I would appreciate to look for better names, like knownCursors and cleanResponses or something like that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +63 to +70
private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
private static final String DELETE_QUERY_RESULT = "%s://%s:%d/responseStore/%s";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two constants are templates that are used to build the URI we are going to call to get/delete cursors. I don't think it is very useful to declare them as a constant 60-90 lines above the single place they are going to be used. Instead I would prefer to see the template directly where we use them, specially if they are going to be used only once.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to turn this into a constant within the fn. ? 60-90 is an arbitrary distance. I can move the fn. up. With editor shortcuts, it should not be hard to move between the definition and usage ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer this to be directly defined (as a literal or as a variable) in the method, but this is nit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched for this regex in Intellij final String .* = ".*" and I could not find instances where a literal is defined in a fn. There is one in a test. Do you have examples ?

Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

Copy link
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I only have some minor, mostly nitpicky comments around the tests 🙂

Thanks for your patience on this one!

Comment on lines +208 to +218
protected Object[][] getPageSizesAndQueryEngine() {
return new Object[][]{
{false, 2}, {false, 3}, {false, 10}, {false, 0}, //0 trigger default behaviour
{true, 2}, {true, 3}, {true, 10}, {true, 0} //0 trigger default behaviour
};
}

@DataProvider(name = "pageSizeAndQueryEngineProvider")
public Object[][] pageSizeAndQueryEngineProvider() {
return getPageSizesAndQueryEngine();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be a single method. Also the method name and actual returned values has the order flipped (query engine / page size).

JsonNode exception = pinotResponse.get("exceptions").get(0);
Assert.assertTrue(exception.get("message").asText().startsWith("QueryValidationError:"));
Assert.assertEquals(exception.get("errorCode").asInt(), 700);
Assert.assertTrue(pinotResponse.get("brokerId").asText().startsWith("Broker_"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also verify that the get all response stores API returns 0 results in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Assert.assertNull(pinotResponse.get("resultTable"));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant something like this:

    List<CursorResponseNative> requestIds = JsonUtils.stringToObject(
        ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()), getHeaders()),
        new TypeReference<>() {
        });

    Assert.assertEquals(requestIds.size(), 0);

But it was just an optional suggestion anyway.

Comment on lines +71 to +72
// Used in tests to trigger the delete instead of waiting for the wall clock to move to an appropriate time.
public static final String CLEAN_AT_TIME = "response.store.cleaner.clean.at.ms";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming you were talking about CursorIntegrationTest::testResponseStoreCleaner here - doesn't that test only verify that at least one of the two response stores is deleted? In that case, why do we need this response.store.cleaner.clean.at.ms internal configuration? Wouldn't a short response store expiration value in the broker combined with a short frequency for the controller periodic job do the trick considering we're using a 100 second timeout for the wait condition anyway?

@yashmayya yashmayya merged commit 73843b5 into apache:master Dec 24, 2024
21 checks passed
zeronerdzerogeekzerocool pushed a commit to zeronerdzerogeekzerocool/pinot that referenced this pull request Feb 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants