diff --git a/crates/sparrow-runtime/src/execute/operation/shift_to.rs b/crates/sparrow-runtime/src/execute/operation/shift_to.rs index 4ae0177eb..b8c3a85e2 100644 --- a/crates/sparrow-runtime/src/execute/operation/shift_to.rs +++ b/crates/sparrow-runtime/src/execute/operation/shift_to.rs @@ -514,8 +514,15 @@ impl ShiftToColumnOperation { Err(not_found) => not_found, }; let (prefix, suffix) = pending.split(split_length)?; - // TODO: What if suffix is empty? - self.pending = Some(suffix); + + if suffix.len() == 0 { + // If the suffix is empty, set the pending batch to empty. + // This ensures that we won't try to send an empty batch if + // we flush the pending set on the next iteration. + self.pending = None; + } else { + self.pending = Some(suffix); + } // The subsort column can naively monotonically increase and preserve the // uniqueness invariant. @@ -572,8 +579,8 @@ impl ShiftToColumnOperation { // so we can output the pending batch. The next call to `try_next` should // see that `computed.pending.take()` returns `None`, indicating there is // nothing else. - let result = self.pending.take(); + // Update the subsort column to be monotonically increasing from 0, // to match the pattern of the merged results. We don't update the // subsort column in the pending batch, as it helps keep the correct diff --git a/python/pytests/golden/shift_by_test/test_filter_to_shift.jsonl b/python/pytests/golden/shift_by_test/test_filter_to_shift.jsonl new file mode 100644 index 000000000..c21ee9fb4 --- /dev/null +++ b/python/pytests/golden/shift_by_test/test_filter_to_shift.jsonl @@ -0,0 +1,3 @@ +{"_time":"2023-08-11T14:03:35.000000000","_key":"Project","thread_ts":null,"ts":1691762610.0,"channel":"Project"} +{"_time":"2023-08-11T14:03:45.000000000","_key":"Project","thread_ts":null,"ts":1691762620.0,"channel":"Project"} +{"_time":"2023-08-11T14:03:55.000000000","_key":"Project","thread_ts":null,"ts":1691762630.0,"channel":"Project"} diff --git a/python/pytests/shift_by_test.py b/python/pytests/shift_by_test.py index 95be876fd..183013e97 100644 --- a/python/pytests/shift_by_test.py +++ b/python/pytests/shift_by_test.py @@ -20,6 +20,23 @@ async def source() -> kd.sources.CsvString: ) +@pytest.fixture(scope="module") +async def filter_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "thread_ts,ts,channel", + "null,1691762610.0,Project", + "null,1691762620.0,Project", + "null,1691762630.0,Project", + "1691762650.0,1691762650.0,Project", + "1691762650.0,1691762660.0,Project", + ] + ) + return await kd.sources.CsvString.create( + content, time_column="ts", key_column="channel", time_unit="s" + ) + + async def test_shift_by_timedelta(source, golden) -> None: time = source.col("time") golden.jsonl( @@ -55,3 +72,10 @@ async def test_shift_collect(source, golden) -> None: } ) ) + + +# Regression test for https://github.com/kaskada-ai/kaskada/issues/726 +async def test_filter_to_shift(filter_source, golden) -> None: + messages = filter_source.filter(filter_source.col("thread_ts").is_null()) + messages = messages.shift_by(timedelta(seconds=5)) + golden.jsonl(messages)