Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spool] Actual implementation #14507

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open

[Spool] Actual implementation #14507

wants to merge 21 commits into from

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Nov 20, 2024

This PR is a continuation of #14495 and the next step on #14196.

While previous PRs were focused the machinery to find the common stages and how to replace them in the plan, this one is focused on sending these plans to the servers and how servers deal with them.

The main change is that now send operator may need to send to multiple stages. That implies a change in the protocol, which I made backward compatible by deprecating the old attribute and adding a new one that is repeated. A deprecated attribute can still be used and reserves the id in protobuf. Servers still recognize the old attribute (required in case a new server talks with an old broker) but they first try to read the new attribute (which will be null/empty if sent by an old broker) and in case it is empty, use the old attribute.

To understand the change in the operators is important to understand the topology used by BlockExchange and Mailbox. Mailboxes receive blocks and send them to exactly one worker. They could send the block through memory or GRPC. BlockExchanges are the ones that decide how to distribute the block between different mailboxes. For example BroadcastExchange send the block to all Mailboxes while RandomExchange sends the block to a random mailbox.

With the multi-sender operators we need to send to broadcast the blocks to multiple stages, which they may need to apply another distribution for each worker. What this PR does is to create a new layer of BlockExchanges when a sender is multi-sender. The root of this layer is always a standard BroadcastExchange, but it sends to a new type of SendingMailbox: A BlockExchangeSendingMailbox. This new mailbox is just an adaptor pattern that wraps a BlockExchange and shows it as a SendingMailbox. The BlockExchanges used in this second layer is the one normally used for each stage. Remember that this distribution must the same for all stages in the same multi-sender.

As an example of how it works, you can run the following in ColocatedJoinEngineQuickStart

SET useSpools = true;
explain implementation plan for
select * from userAttributes as a1
join userGroups as a2
on a1.userUUID = a2.userUUID
join userAttributes as a3
on a1.userUUID = a3.userUUID
limit 10

which returns:

[0]@192.168.1.42:46155|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)
├── [1]@192.168.1.42:44265|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:34477|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:44185|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
└── [1]@192.168.1.42:34303|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]}
    └── [1]@192.168.1.42:34303|[3] SORT LIMIT 10
        └── [1]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
            ├── [2]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            ├── [2]@192.168.1.42:34477|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            ├── [2]@192.168.1.42:44185|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            └── [2]@192.168.1.42:34303|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]}
                └── [2]@192.168.1.42:34303|[3] SORT LIMIT 10
                    └── [2]@192.168.1.42:34303|[3] JOIN
                        ├── [2]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
                        │   ├── [3]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │   ├── [3]@192.168.1.42:34477|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:43409|[1]} (Subtree Omitted)
                        │   ├── [3]@192.168.1.42:44185|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:43851|[2]} (Subtree Omitted)
                        │   └── [3]@192.168.1.42:34303|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:37717|[3]}
                        │       └── [3]@192.168.1.42:34303|[3] JOIN
                        │           ├── [3]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
spool here ->           │           │   ├── [4]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │           │   └── [4]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                        │           │       └── [4]@192.168.1.42:44265|[0] PROJECT
                        │           │           └── [4]@192.168.1.42:44265|[0] TABLE SCAN (userAttributes) null
                        │           └── [3]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
                        │               ├── [5]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │               └── [5]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                        │                   └── [5]@192.168.1.42:44265|[0] PROJECT
                        │                       └── [5]@192.168.1.42:44265|[0] TABLE SCAN (userGroups) null
                        └── [2]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
spool here ->               ├── [4]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                            └── [4]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                                └── [4]@192.168.1.42:44265|[0] PROJECT
                                    └── [4]@192.168.1.42:44265|[0] TABLE SCAN (userAttributes) null

TODO:

  • Fix queries (right now this query return an empty result, which is incorrect).
  • Automatic tests

@gortiz gortiz changed the title Spool3 [Spool] Actual implementation Nov 20, 2024
@codecov-commenter
Copy link

codecov-commenter commented Nov 20, 2024

Codecov Report

Attention: Patch coverage is 75.00000% with 59 lines in your changes missing coverage. Please review.

Project coverage is 64.02%. Comparing base (59551e4) to head (6907740).
Report is 1485 commits behind head on master.

Files with missing lines Patch % Lines
...hysical/colocated/GreedyShuffleRewriteVisitor.java 0.00% 19 Missing ⚠️
...query/runtime/operator/exchange/BlockExchange.java 48.38% 10 Missing and 6 partials ⚠️
...me/operator/utils/BlockingMultiStreamConsumer.java 0.00% 9 Missing ⚠️
...ery/planner/physical/MailboxAssignmentVisitor.java 90.62% 4 Missing and 2 partials ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% 2 Missing ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 60.00% 1 Missing and 1 partial ⚠️
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 50.00% 2 Missing ⚠️
...he/pinot/query/planner/logical/PlanFragmenter.java 85.71% 0 Missing and 1 partial ⚠️
.../pinot/query/planner/serde/PlanNodeSerializer.java 87.50% 0 Missing and 1 partial ⚠️
...he/pinot/query/mailbox/InMemorySendingMailbox.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14507      +/-   ##
============================================
+ Coverage     61.75%   64.02%   +2.27%     
- Complexity      207     1602    +1395     
============================================
  Files          2436     2707     +271     
  Lines        133233   149187   +15954     
  Branches      20636    22860    +2224     
============================================
+ Hits          82274    95519   +13245     
- Misses        44911    46656    +1745     
- Partials       6048     7012     +964     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 64.00% <75.00%> (+2.29%) ⬆️
java-21 63.89% <75.00%> (+2.26%) ⬆️
skip-bytebuffers-false 64.01% <75.00%> (+2.27%) ⬆️
skip-bytebuffers-true 63.87% <75.00%> (+36.14%) ⬆️
temurin 64.02% <75.00%> (+2.27%) ⬆️
unittests 64.02% <75.00%> (+2.27%) ⬆️
unittests1 56.36% <74.35%> (+9.47%) ⬆️
unittests2 34.42% <3.38%> (+6.69%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@gortiz gortiz marked this pull request as ready for review December 17, 2024 07:55
@gortiz gortiz mentioned this pull request Dec 17, 2024
@gortiz
Copy link
Contributor Author

gortiz commented Dec 17, 2024

I think this PR is ready to review

Map<Integer, QueryServerInstance> receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap();
Map<Integer, Map<Integer, MailboxInfos>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap();
Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap();
for (Integer receiverStageId : sendNode.getReceiverStageIds()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to review this class with the hide whitespace option in GH

Comment on lines +123 to +125
if (protoReceiverIds == null || protoReceiverIds.isEmpty()) {
// This should only happen if a not updated broker sends the request
receiverIds = List.of(protoMailboxSendNode.getReceiverStageId());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and the change in proto) is what keeps backward compatibility


@Override
public String toString() {
return "g" + _id;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some toString to mailboxes to make it easier to understand them in debug/trace logs

Comment on lines +98 to +100
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending EOS metadata. Only mailbox #{} will get stats", mailboxIdToSendMetadata);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that this verbose way to log (first check then log) is used to avoid the unnecessary varargs array allocation in LOGGER.trace(string, object...) when the log is not going to be printed. Given this is the hotpath, we are extra careful trying to not include extra allocations.

*/
public boolean send(TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more precise and IIRC we need to limit the scope of the exception in order to create the BlockExchangeSendingMailbox

@gortiz
Copy link
Contributor Author

gortiz commented Dec 17, 2024

Improve PR description

Copy link
Collaborator

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be very useful to have a couple of integration tests for some basic spool use cases (verifying both the execution output as well as the plan potentially)?

@@ -207,13 +207,17 @@ protected TransferableBlock getNextBlock()
buildBroadcastHashTable();
}
if (_upstreamErrorBlock != null) {
LOGGER.trace("Returning upstream error block for join operator");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all the changes in this file unrelated to spools?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all the changes are just new logs I've added while debuging some blocking issues. These logs are disabled by default and should be easily removed by either the jit given they are not allocating. I recommend to keep them in case we need to debug something in the future.

Comment on lines +215 to +218
if (context.isJoinStage(receiverStageId)) {
sendsToJoin = true;
break;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only checking if at least one receiver stage is a join stage here?

LOGGER.debug("==[RECEIVE]== EOS received : " + _id + " in mailbox: " + removed.getId()
+ " (" + _mailboxes.size() + " mailboxes alive)");
+ " (" + ids + " mailboxes alive)");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ " (" + ids + " mailboxes alive)");
+ " (Mailboxes alive: " + ids + ")");

@yashmayya yashmayya added multi-stage Related to the multi-stage query engine feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Dec 23, 2024
@gortiz
Copy link
Contributor Author

gortiz commented Dec 23, 2024

We have some tests that verify that, but in different files. For example:

  • pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json verifies the plan.
  • pinot-query-runtime/src/test/resources/queries/Spool.json verifies the result.

TBH there is no test verifying with the plan that spools are used AND the result is correct (ie comparing with h2), although the query in these two cases is the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature multi-stage Related to the multi-stage query engine release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants