Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ class HybridSourceSplitEnumeratorTest {
private HybridSourceSplit splitFromSource1;

private void setupEnumeratorAndTriggerSourceSwitch() {
setupEnumeratorAndTriggerSourceSwitch(
HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build());
}

private HybridSourceSplitEnumerator setupEnumeratorAndTriggerSourceSwitch(
HybridSource<Integer> hybridSource) {
context = new MockSplitEnumeratorContext<>(2);
source = HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
source = hybridSource;

enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context);
enumerator.start();
Expand Down Expand Up @@ -86,6 +92,7 @@ private void setupEnumeratorAndTriggerSourceSwitch() {
assertThat(splitFromSource1.sourceIndex()).isEqualTo(1);
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1));
assertThat(getCurrentSourceIndex(enumerator)).as("reader without assignment").isEqualTo(1);
return enumerator;
}

@Test
Expand Down Expand Up @@ -232,6 +239,28 @@ void testRestoreEnumerator() throws Exception {
.hasSize(1);
}

@Test
public void testRestoreEnumeratorWithSwitchContextSource() throws Exception {
HybridSource<Integer> hybridSource =
HybridSource.builder(MOCK_SOURCE)
.addSource(
switchContext -> {
assertThat(switchContext.getPreviousEnumerator())
.describedAs(
"Previous enumerator is null, cannot derive start position for next source")
.isNotNull();
return MOCK_SOURCE;
},
MOCK_SOURCE.getBoundedness())
.build();
enumerator = setupEnumeratorAndTriggerSourceSwitch(hybridSource);
HybridSourceEnumeratorState enumeratorState = enumerator.snapshotState(0);
assertThat(enumeratorState.getCurrentSourceIndex()).isEqualTo(1);
enumerator =
(HybridSourceSplitEnumerator) source.restoreEnumerator(context, enumeratorState);
enumerator.start();
}

@Test
void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits() throws Exception {
setupEnumeratorAndTriggerSourceSwitch();
Expand Down