From e728eddfa4ec99abe1208093ef55fcd5967c532c Mon Sep 17 00:00:00 2001 From: Olaf Hartig Date: Fri, 17 Mar 2023 22:42:04 +0100 Subject: [PATCH] addresses #212, but the relevant code paths are commented out at the moment (until we have time for some experimentation to better understand the behavior of it) --- ...ushBasedExecPlanTaskForBinaryOperator.java | 95 ++++++++++++++++++- .../PushBasedExecPlanTaskForNaryOperator.java | 94 +++++++++++++++++- 2 files changed, 180 insertions(+), 9 deletions(-) diff --git a/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForBinaryOperator.java b/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForBinaryOperator.java index 75509e724..35d70fbc4 100644 --- a/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForBinaryOperator.java +++ b/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForBinaryOperator.java @@ -40,12 +40,19 @@ protected ExecutableOperator getExecOp() { @Override protected void produceOutput( final IntermediateResultElementSink sink ) throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { + if ( op.requiresCompleteChild1InputFirst() ) + produceOutputByConsumingInput1First(sink); + else + //produceOutputByConsumingBothInputsInParallel(sink); + produceOutputByConsumingInput1First(sink); + } - // Attention: the current implementation of this method ignores - // whether op.requiresCompleteChild1InputFirst() is true or - // false but, instead, simply consumes and pushes the complete - // child 1 input first (as would be required in case that the - // aforementioned function returns true). + /** + * Consumes the complete child 1 input first (and pushes that input to the + * operator {@link #op}), before moving on to the input from child 2. + */ + protected void produceOutputByConsumingInput1First( final IntermediateResultElementSink sink ) + throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { boolean input1Consumed = false; while ( ! input1Consumed ) { @@ -72,4 +79,82 @@ protected void produceOutput( final IntermediateResultElementSink sink ) } } + /** + * Aims to consume both inputs in parallel. + */ + protected void produceOutputByConsumingBothInputsInParallel( final IntermediateResultElementSink sink ) + throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { + + boolean nextWaitForInput1 = true; // flag to switch between waiting for input 1 versus input 2 + boolean input1Consumed = false; + boolean input2Consumed = false; + while ( ! input1Consumed || ! input2Consumed ) { + // Before blindly asking any of the two inputs to give us its next + // IntermediateResultBlock (which may cause this thread to wait if + // no such block is available at the moment), let's first ask them + // if they currently have a block available. If so, request the next + // block from the input that says it has a block available. + boolean blockConsumed = false; + if ( ! input1Consumed && input1.hasNextIntermediateResultBlockAvailable() ) + { + // calling 'getNextIntermediateResultBlock()' should not cause this thread to wait + final IntermediateResultBlock nextInputBlock = input1.getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromChild1(nextInputBlock, sink, execCxt); + } + + blockConsumed = true; + } + + if ( ! input2Consumed && input2.hasNextIntermediateResultBlockAvailable() ) + { + // calling 'getNextIntermediateResultBlock()' should not cause this thread to wait + final IntermediateResultBlock nextInputBlock = input2.getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromChild2(nextInputBlock, sink, execCxt); + } + + blockConsumed = true; + } + + if ( ! blockConsumed ) { + // If none of the two inputs had a block available at the + // moment, we ask one of them to produce its next block, + // which may cause this thread to wait until that next + // block has been produced. To decide which of the two + // inputs we ask (and, then, wait for) we use a round + // robin approach (i.e., always switch between the two + // inputs). To this end, we use the 'nextWaitForInput1' + // flag: if that flag is true, we will next ask (and wait + // for) input 1; if that flag is false, we will next ask + // (and wait for) input 2. + if ( nextWaitForInput1 && ! input1Consumed ) { + // calling 'getNextIntermediateResultBlock()' may cause this thread to wait + final IntermediateResultBlock nextInputBlock = input1.getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromChild1(nextInputBlock, sink, execCxt); + } + else { + op.wrapUpForChild1(sink, execCxt); + input1Consumed = true; + } + } + else if ( ! input2Consumed ) { + // calling 'getNextIntermediateResultBlock()' may cause this thread to wait + final IntermediateResultBlock nextInputBlock = input2.getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromChild2(nextInputBlock, sink, execCxt); + } + else { + op.wrapUpForChild2(sink, execCxt); + input2Consumed = true; + } + } + // flip the 'nextWaitForInput1' flag so that, next time we + // have to wait, we will wait for the respective other input + nextWaitForInput1 = ! nextWaitForInput1; + } + } + } + } diff --git a/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForNaryOperator.java b/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForNaryOperator.java index 17ea23774..89f0a5d26 100644 --- a/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForNaryOperator.java +++ b/src/main/java/se/liu/ida/hefquin/engine/queryplan/executable/impl/pushbased/PushBasedExecPlanTaskForNaryOperator.java @@ -37,11 +37,19 @@ protected ExecutableOperator getExecOp() { @Override protected void produceOutput( final IntermediateResultElementSink sink ) throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { + produceOutputByConsumingAllInputsInParallel(sink); + //produceOutputByConsumingInputsOneAfterAnother(sink); + } - // Attention: the current implementation of this method simply consumes - // and pushes the complete i-th child input first before moving on to - // the (i+1)-th child. Hence, with this implementation we do not - // actually benefit from the parallelization. + /** + * Consumes the complete i-th input first (and pushes that input to the + * operator {@link #op}), before moving on to the (i+1)-th input. Hence, + * this implementation does not consume the inputs in parallel. Instead, + * if one of the inputs requires a long time, no progress is made in + * parallel based on any of the other inputs. + */ + protected void produceOutputByConsumingInputsOneAfterAnother( final IntermediateResultElementSink sink ) + throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { for ( int i = 0; i < inputs.length; i++ ) { boolean inputConsumed = false; @@ -58,4 +66,82 @@ protected void produceOutput( final IntermediateResultElementSink sink ) } } + + /** + * Consumes the complete i-th input first (and pushes that input to the + * operator {@link #op}), before moving on to the (i+1)-th input. + */ + protected void produceOutputByConsumingAllInputsInParallel( final IntermediateResultElementSink sink ) + throws ExecOpExecutionException, ExecPlanTaskInputException, ExecPlanTaskInterruptionException { + + final boolean[] inputConsumed = new boolean[inputs.length]; + for ( int i = 0; i < inputs.length; i++ ) { inputConsumed[i] = false; } + + int indexOfNextInputToWaitFor = 0; + int numberOfInputsConsumed = 0; + while ( numberOfInputsConsumed < inputs.length ) { + // Before blindly asking any of the inputs to give us its next + // IntermediateResultBlock (which may cause this thread to wait + // if no such block is available at the moment), let's first ask + // them if they currently have a block available. If so, request + // the next block from the input that says it has a block available. + boolean blockConsumed = false; + for ( int i = 0; i < inputs.length; i++ ) { + if ( ! inputConsumed[i] && inputs[i].hasNextIntermediateResultBlockAvailable() ) { + // calling 'getNextIntermediateResultBlock()' should not cause this thread to wait + final IntermediateResultBlock nextInputBlock = inputs[i].getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromXthChild(i, nextInputBlock, sink, execCxt); + } + + blockConsumed = true; + } + } + + if ( ! blockConsumed ) { + // If none of the inputs had a block available at the moment, + // we ask one of them to produce its next block, which may + // cause this thread to wait until that next block has been + // produced. To decide which of the inputs we ask (and, then, + // wait for) we use a round robin approach. To this end, we + // use the 'indexOfNextInputToWaitFor' pointer which we advance + // each time we leave this code block here. + + // First, we have to make sure that 'indexOfNextInputToWaitFor' + // points to an input that has not been consumed completely yet. + while ( inputConsumed[indexOfNextInputToWaitFor] == true ) { + indexOfNextInputToWaitFor = advanceIndexOfInput(indexOfNextInputToWaitFor); + } + + // Now we ask that input to produce its next block, which may + // cause this thread to wait. + final IntermediateResultBlock nextInputBlock = inputs[indexOfNextInputToWaitFor].getNextIntermediateResultBlock(); + if ( nextInputBlock != null ) { + op.processBlockFromXthChild(indexOfNextInputToWaitFor, nextInputBlock, sink, execCxt); + } + else { + op.wrapUpForXthChild(indexOfNextInputToWaitFor, sink, execCxt); + inputConsumed[indexOfNextInputToWaitFor] = true; + numberOfInputsConsumed++; + } + + // Finally, we advance the 'indexOfNextInputToWaitFor' pointer + // so that, next time we will have to wait, we will wait for + // the next input (rather than always waiting for the same + // input before moving on to the next input). + indexOfNextInputToWaitFor = advanceIndexOfInput(indexOfNextInputToWaitFor); + } + } + } + + /** + * Returns the given integer increased by one, unless such an + * increase results in an integer that is outside of the bounds + * of the {@link #inputs} array, in which case the function returns + * zero (effectively jumping back to the first index in the array). + */ + protected int advanceIndexOfInput( final int currentIndex ) { + final int i = currentIndex + 1; + return ( i < inputs.length ) ? i : 0; + } }