Skip to content

Commit

Permalink
Fix design issues in multi-sender MailboxSendNode
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz committed Nov 21, 2024
1 parent b857f1e commit 6b56d2e
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public Integer next() {
};
}

/**
* returns true if this node sends to multiple receivers
*/
public boolean isMultiSend() {
return _receiverStages.cardinality() > 1;
}

@Deprecated
public int getReceiverStageId() {
Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.blocks;

import com.google.common.collect.Iterators;
import java.util.Iterator;
import org.apache.pinot.common.datablock.BaseDataBlock;

Expand All @@ -28,6 +29,7 @@
* underlying transport.
*/
public interface BlockSplitter {
BlockSplitter NO_OP = (block, type, maxBlockSize) -> Iterators.singletonIterator(block);

/**
* @return a list of blocks that was split from the original {@code block}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.routing.MailboxInfo;
import org.apache.pinot.query.routing.RoutingInfo;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
Expand Down Expand Up @@ -65,9 +66,7 @@ public class MailboxSendOperator extends MultiStageOperator {

// TODO: Support sort on sender
public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator input, MailboxSendNode node) {
this(context, input,
statMap -> getBlockExchange(context, node.getReceiverStageIds(), node.getDistributionType(), node.getKeys(),
statMap));
this(context, input, statMap -> getBlockExchange(context, node, statMap));
_statMap.merge(StatKey.STAGE, context.getStageId());
_statMap.merge(StatKey.PARALLELISM, 1);
}
Expand All @@ -80,28 +79,64 @@ public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator i
_exchange = exchangeFactory.apply(_statMap);
}

private static BlockExchange getBlockExchange(OpChainExecutionContext context, Iterable<Integer> receiverStageIds,
RelDistribution.Type distributionType, List<Integer> keys, StatMap<StatKey> statMap) {
/**
* Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
*
* In normal cases, where the sender sends data to a single receiver stage, this method just delegates on
* {@link #getBlockExchange(OpChainExecutionContext, int, RelDistribution.Type, List, StatMap, BlockSplitter)}.
*
* In case of a multi-sender node, this method creates a two steps exchange:
* <ol>
* <li>One inner exchange is created for each receiver stage, using the method mentioned above and keeping the
* distribution type specified in the {@link MailboxSendNode}.</li>
* <li>Then, a single outer broadcast exchange is created to fan out the data to all the inner exchanges.</li>
* </ol>
*
* @see BlockExchange#asSendingMailbox()
*/
private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, MailboxSendNode node,
StatMap<StatKey> statMap) {
BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock;
if (!node.isMultiSend()) {
// it is guaranteed that there is exactly one receiver stage
int receiverStageId = node.getReceiverStageIds().iterator().next();
return getBlockExchange(ctx, receiverStageId, node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
}
List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
// The inner splitter is a NO_OP because the outer splitter will take care of splitting the blocks
BlockSplitter innerSplitter = BlockSplitter.NO_OP;
for (int receiverStageId : node.getReceiverStageIds()) {
BlockExchange blockExchange =
getBlockExchange(ctx, receiverStageId, node.getDistributionType(), node.getKeys(), statMap, innerSplitter);
perStageSendingMailboxes.add(blockExchange.asSendingMailbox());
}
return BlockExchange.getExchange(perStageSendingMailboxes, RelDistribution.Type.BROADCAST_DISTRIBUTED,
Collections.emptyList(), mainSplitter);
}

/**
* Creates a {@link BlockExchange} that sends data to the given receiver stage.
*
* In case of a multi-sender node, this method will be called for each receiver stage.
*/
private static BlockExchange getBlockExchange(OpChainExecutionContext context, int receiverStageId,
RelDistribution.Type distributionType, List<Integer> keys, StatMap<StatKey> statMap, BlockSplitter splitter) {
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType), "Unsupported distribution type: %s",
distributionType);
MailboxService mailboxService = context.getMailboxService();
long requestId = context.getRequestId();
long deadlineMs = context.getDeadlineMs();

List<RoutingInfo> routingInfos = new ArrayList<>();
for (Integer receiverStageId : receiverStageIds) {
List<MailboxInfo> mailboxInfos =
context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
List<RoutingInfo> stageRoutingInfos =
List<MailboxInfo> mailboxInfos =
context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
List<RoutingInfo> routingInfos =
MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId,
mailboxInfos);
routingInfos.addAll(stageRoutingInfos);
}
List<SendingMailbox> sendingMailboxes = routingInfos.stream()
.map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs, statMap))
.collect(Collectors.toList());
statMap.merge(StatKey.FAN_OUT, sendingMailboxes.size());
return BlockExchange.getExchange(sendingMailboxes, distributionType, keys, TransferableBlockUtils::splitBlock);
return BlockExchange.getExchange(sendingMailboxes, distributionType, keys, splitter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
Expand Down Expand Up @@ -69,10 +73,11 @@ protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter spl
* API to send a block to the destination mailboxes.
* @param block the block to be transferred
* @return true if all the mailboxes has been early terminated.
* @throws Exception when sending stream unexpectedly closed.
* @throws IOException when sending stream unexpectedly closed.
* @throws TimeoutException when sending stream timeout.
*/
public boolean send(TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
if (block.isErrorBlock()) {
// Send error block to all mailboxes to propagate the error
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
Expand Down Expand Up @@ -110,7 +115,7 @@ public boolean send(TransferableBlock block)
}

protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
if (block.isEndOfStreamBlock()) {
sendingMailbox.send(block);
sendingMailbox.complete();
Expand All @@ -125,7 +130,7 @@ protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
}

protected abstract void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception;
throws IOException, TimeoutException;

// Called when the OpChain gracefully returns.
// TODO: This is a no-op right now.
Expand All @@ -137,4 +142,53 @@ public void cancel(Throwable t) {
sendingMailbox.cancel(t);
}
}

public SendingMailbox asSendingMailbox() {
return new BlockExchangeSendingMailbox();
}

/**
* A mailbox that sends data blocks to a {@link org.apache.pinot.query.runtime.operator.exchange.BlockExchange}.
*
* BlockExchanges send data to a list of {@link SendingMailbox}es, which are responsible for sending the data
* to the corresponding {@link ReceivingMailbox}es. This class applies the decorator pattern to expose a BlockExchange
* as a SendingMailbox, open the possibility of having a BlockExchange as a destination for another BlockExchange.
*
* This is useful for example when a send operator has to send data to more than one stage. We need to broadcast the
* data to all the stages (the first BlockExchange). Then for each stage, we need to send the data to the
* corresponding workers (the inner BlockExchange). The inner BlockExchange may send data using a different
* distribution strategy.
*
* @see MailboxSendNode#isMultiSend()}
*/
private class BlockExchangeSendingMailbox implements SendingMailbox {
private boolean _earlyTerminated = false;
private boolean _completed = false;

@Override
public void send(TransferableBlock block)
throws IOException, TimeoutException {
_earlyTerminated = BlockExchange.this.send(block);
}

@Override
public void complete() {
_completed = true;
}

@Override
public void cancel(Throwable t) {
BlockExchange.this.cancel(t);
}

@Override
public boolean isTerminated() {
return _completed;
}

@Override
public boolean isEarlyTerminated() {
return _earlyTerminated;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
Expand All @@ -35,7 +37,7 @@ protected BroadcastExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter

@Override
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.EmptyKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
Expand All @@ -42,7 +44,7 @@ class HashExchange extends BlockExchange {

@Override
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
int numMailboxes = destinations.size();
if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) {
sendBlock(destinations.get(0), block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.query.runtime.operator.exchange;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.function.IntFunction;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
Expand Down Expand Up @@ -48,7 +50,7 @@ class RandomExchange extends BlockExchange {

@Override
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
int destinationIdx = _rand.apply(destinations.size());
sendBlock(destinations.get(destinationIdx), block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pinot.query.runtime.operator.exchange;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.mailbox.InMemorySendingMailbox;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
Expand All @@ -41,7 +43,7 @@ class SingletonExchange extends BlockExchange {

@Override
protected void route(List<SendingMailbox> sendingMailboxes, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
sendBlock(sendingMailboxes.get(0), block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
Expand Down Expand Up @@ -176,7 +178,7 @@ protected TestBlockExchange(List<SendingMailbox> destinations, BlockSplitter spl

@Override
protected void route(List<SendingMailbox> destinations, TransferableBlock block)
throws Exception {
throws IOException, TimeoutException {
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
Expand Down

0 comments on commit 6b56d2e

Please sign in to comment.