Skip to content

Commit

Permalink
CAT: fix incremental by running tests per stream
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-yermilov-gl committed Apr 4, 2024
1 parent 4ed294b commit 4cfe937
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ class SuggestedStreamsConfiguration(BaseConfig):
)


class TestWithStateProgressionConfiguration(BaseConfig):
name: str
fraction_of_batches: Optional[int] = Field(
default=3,
description=(
"To avoid spamming APIs, we only test a fraction of batches, skipping the first and last states to avoid corner cases. "
"The formula we use is: `state_num % (total_states // fraction_of_batches)`. "
"For instance, if there are 10 state messages, we would execute tests solely on the third and sixth states."
),
)


class UnsupportedFileTypeConfig(BaseConfig):
extension: str
bypass_reason: Optional[str] = Field(description="Reason why this type is considered unsupported.")
Expand Down Expand Up @@ -212,6 +224,9 @@ class IncrementalConfig(BaseConfig):
skip_comprehensive_incremental_tests: Optional[bool] = Field(
description="Determines whether to skip more granular testing for incremental syncs", default=False
)
test_with_state_progression: List[TestWithStateProgressionConfiguration] = Field(
default_factory=list, description="Test with states progression"
)

class Config:
smart_union = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,70 +170,97 @@ async def test_read_sequential_slices(
pytest.skip("Skipping new incremental test based on acceptance-test-config.yml")
return

output_1 = await docker_runner.call_read(connector_config, configured_catalog_for_incremental)
records_1 = filter_output(output_1, type_=Type.RECORD)
states_1 = filter_output(output_1, type_=Type.STATE)
state_progression_stream_names = {
state_progression_stream.name: state_progression_stream for state_progression_stream in inputs.test_with_state_progression
}
for stream in configured_catalog_for_incremental.streams:
is_testing_state_progression = stream.stream.name in state_progression_stream_names

# We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
unique_state_messages = [message for index, message in enumerate(states_1) if message not in states_1[:index]]
configured_catalog_for_incremental_per_stream = ConfiguredAirbyteCatalog(streams=[stream])

# Important!
output_1 = await docker_runner.call_read(connector_config, configured_catalog_for_incremental_per_stream)

# There is only a small subset of assertions we can make
# in the absense of enforcing that all connectors return 3 or more state messages
# during the first read.
records_1 = filter_output(output_1, type_=Type.RECORD)
# If the output of a full read is empty, there is no reason to iterate over its state.
# So, reading from any checkpoint of an empty stream will also produce nothing.
if len(records_1) == 0:
continue

# To learn more: https://github.com/airbytehq/airbyte/issues/29926
if len(unique_state_messages) < 3:
pytest.skip("Skipping test because there are not enough state messages to test with")
return
states_1 = filter_output(output_1, type_=Type.STATE)
# We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
unique_state_messages = [message for index, message in enumerate(states_1) if message not in states_1[:index]]
if not is_testing_state_progression:
unique_state_messages = unique_state_messages[-1:]

assert records_1, "First Read should produce at least one record"
# Important!

# For legacy state format, the final state message contains the final state of all streams. For per-stream state format,
# the complete final state of streams must be assembled by going through all prior state messages received
is_per_stream = is_per_stream_state(states_1[-1])

# To avoid spamming APIs we only test a fraction of batches (10%) and enforce a minimum of 10 tested
min_batches_to_test = 5
sample_rate = len(unique_state_messages) // min_batches_to_test
# There is only a small subset of assertions we can make
# in the absense of enforcing that all connectors return 3 or more state messages
# during the first read.

mutating_stream_name_to_per_stream_state = dict()
for idx, state_message in enumerate(unique_state_messages):
assert state_message.type == Type.STATE

# if first state message, skip
# this is because we cannot assert if the first state message will result in new records
# as in this case it is possible for a connector to return an empty state message when it first starts.
# e.g. if the connector decides it wants to let the caller know that it has started with an empty state.
if idx == 0:
continue
# To learn more: https://github.com/airbytehq/airbyte/issues/29926
if len(unique_state_messages) == 0:
pytest.skip("Skipping test because there are not enough state messages to test with")
return

# if last state message, skip
# this is because we cannot assert if the last state message will result in new records
# as in this case it is possible for a connector to return a previous state message.
# e.g. if the connector is using pagination and the last page is only partially full
if idx == len(unique_state_messages) - 1:
if len(unique_state_messages) < 3 and is_testing_state_progression:
continue

# if batching required, and not a sample, skip
if len(unique_state_messages) >= min_batches_to_test and idx % sample_rate != 0:
continue
# For legacy state format, the final state message contains the final state of all streams. For per-stream state format,
# the complete final state of streams must be assembled by going through all prior state messages received
is_per_stream = is_per_stream_state(states_1[-1])

state_input, mutating_stream_name_to_per_stream_state = self.get_next_state_input(
state_message, mutating_stream_name_to_per_stream_state, is_per_stream
# To avoid spamming APIs we only test a fraction of batches (2 or 3 states by default)
min_batches_to_test = (
3 if not is_testing_state_progression else state_progression_stream_names[stream.stream.name].fraction_of_batches
)
sample_rate = len(unique_state_messages) // min_batches_to_test

mutating_stream_name_to_per_stream_state = dict()
for idx, state_message in enumerate(unique_state_messages):
assert state_message.type == Type.STATE

is_first_state_message = idx == 0
is_last_state_message = idx == len(unique_state_messages) - 1

if is_testing_state_progression:
# if first state message, skip
# this is because we cannot assert if the first state message will result in new records
# as in this case it is possible for a connector to return an empty state message when it first starts.
# e.g. if the connector decides it wants to let the caller know that it has started with an empty state.
if is_first_state_message:
continue

# if last state message, skip
# this is because we cannot assert if the last state message will result in new records
# as in this case it is possible for a connector to return a previous state message.
# e.g. if the connector is using pagination and the last page is only partially full
if is_last_state_message:
continue

# if batching required, and not a sample, skip
if len(unique_state_messages) >= min_batches_to_test and idx % sample_rate != 0:
continue

state_input, mutating_stream_name_to_per_stream_state = self.get_next_state_input(
state_message, mutating_stream_name_to_per_stream_state, is_per_stream
)

output_N = await docker_runner.call_read_with_state(connector_config, configured_catalog_for_incremental, state=state_input)
records_N = filter_output(output_N, type_=Type.RECORD)
assert (
records_N
), f"Read {idx + 2} of {len(unique_state_messages)} should produce at least one record.\n\n state: {state_input} \n\n records_{idx + 2}: {records_N}"

diff = naive_diff_records(records_1, records_N)
assert (
diff
), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 2}: {records_N} \n\n diff: {diff}"
output_N = await docker_runner.call_read_with_state(
connector_config, configured_catalog_for_incremental_per_stream, state=state_input
)
records_N = filter_output(output_N, type_=Type.RECORD)

assert (
# We assume that the output may be empty when we read the latest state, or it must produce some data if we are in the middle of our progression
records_N
or is_last_state_message
), f"Read {idx + 2} of {len(unique_state_messages)} should produce at least one record.\n\n state: {state_input} \n\n records_{idx + 2}: {records_N}"

diff = naive_diff_records(records_1, records_N)
assert (
diff
), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 2}: {records_N} \n\n diff: {diff}"

async def test_state_with_abnormally_large_values(
self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
SyncMode,
Type,
)
from connector_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig
from connector_acceptance_test.config import Config, EmptyStreamConfiguration, IncrementalConfig, TestWithStateProgressionConfiguration
from connector_acceptance_test.tests import test_incremental
from connector_acceptance_test.tests.test_incremental import TestIncremental as _TestIncremental
from connector_acceptance_test.tests.test_incremental import future_state_configuration_fixture, future_state_fixture
Expand Down Expand Up @@ -224,27 +224,11 @@ async def test_incremental_two_sequential_reads(
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
[],
[]
],
pytest.raises(AssertionError, match="First Read should produce at least one record"),
id="test_incremental_no_record_on_first_read_raises_error",
does_not_raise(),
id="test_incremental_no_record_on_first_read_skips_stream",
),
pytest.param(
[
Expand Down Expand Up @@ -428,27 +412,6 @@ async def test_incremental_two_sequential_reads(
does_not_raise(),
id="test_incremental_with_multiple_streams",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": None},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-07"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-08"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
],
[
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-09"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-10"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-11"}},
{"type": Type.RECORD, "name": "test_stream", "data": {"date": "2022-05-12"}},
{"type": Type.STATE, "name": "test_stream", "stream_state": {"date": "2022-05-13"}},
],
],
does_not_raise(),
id="test_incremental_with_none_state",
),
pytest.param(
[
{"type": Type.STATE, "name": "test_stream", "stream_state": {}},
Expand All @@ -458,6 +421,7 @@ async def test_incremental_two_sequential_reads(
],
[
[],
[],
],
does_not_raise(),
id="test_incremental_with_empty_second_read",
Expand Down Expand Up @@ -605,9 +569,10 @@ async def test_state_skip_test(mocker):
docker_runner_mock.call_read = mocker.AsyncMock(return_value=call_read_output_messages)

t = _TestIncremental()
incremental_config = IncrementalConfig(test_with_state_progression=[TestWithStateProgressionConfiguration(name="test_stream")])
with patch.object(pytest, "skip", return_value=None):
await t.test_read_sequential_slices(
inputs=IncrementalConfig(),
inputs=incremental_config,
connector_config=MagicMock(),
configured_catalog_for_incremental=ConfiguredAirbyteCatalog(
streams=[
Expand Down

0 comments on commit 4cfe937

Please sign in to comment.