Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAT: fix incremental by running tests per stream #36814

Merged
merged 21 commits into from
Jun 3, 2024

Conversation

roman-yermilov-gl
Copy link
Contributor

@roman-yermilov-gl roman-yermilov-gl commented Apr 3, 2024

What

While fixing the Source Hubspot CATs (7200 timeouts), I encountered an issue with test_read_sequential_slices test related to records comparison. The diff function accepts two sets of 6000 records, and it takes an exceptionally long time to complete the operation. It turns out that the issue occurred because stream slicing is not organized optimally, leading to another significant problem: the current implementation tests the state changes only for the first stream and neglects the next ones.

Example:

Given 2 streams (where the second is empty) and we perform a full refresh read which returns 1000 records for the first stream and 0 records for the second stream, along with some state messages. Here's how we run the test:

Some iterations testing the first stream:

  1. The test extracts the n-th state message containing the updated checkpoint for the first stream and conducts another read using this state and the original catalog (the same as used for the initial read operation). The new output consists of 10 records of the first stream and 0 records of the second stream.
  2. It then verifies that some records were returned and that they differ from the original output.
  3. The check is successful, and the first iteration is completed.
  4. Repeat starting from 1. untill we have state messages with unique checkpoints for the first stream

First issue becomes clear after analizing these steps: the state is updated only for one stream, yet we use the original catalog which includes both streams. Consequently, we end up with unnecessary readings as many times as many state messages (divided by some rate) we have.

The following iterations intended to test the second stream, but it's flawed:

  1. The test retrieves the n-th + rate state message containing the updated checkpoint for both streams and performs another read using this state and the original catalog. The new output mirrors the same changes as before: 10 records from the first stream and 0 records from the second stream
  2. It then verifies that some records were returned and that they differ from the original output.
  3. The check appears successful, but it's wrong because we know that the second stream is empty.
Read Iteration State checkpoints Stream A Stream B Assert any records Assert Diff (initial vs n)
0 (initial) - [{"id": 100}, ..., {"id": 1000}] [] True -
1 stream A: timestamp 5 [{"id": 500}, ..., {"id": 1000}] [] True True
2 stream A: timestamp 10 [{"id": 900}, ..., {"id": 1000}] [] True True
3 stream A: timestamp 10; stream B: timestamp 1 [{"id": 900}, ..., {"id": 1000}] [] True True

So why the test passed:
This iteration reveals that for the second stream, we're checking the same updates as during the last iteration made for latest checkpoint of the first stream

  1. Why the Assert any records check is always true: This occurs because we always read the entire catalog with all streams, and as long as Stream A has any changes, the entire output also contains changes, but we can't ascertain which streams have changed.
  2. Why the Assert Diff check is always true: Once again, this happens because we compare the output of all the streams present in the catalog with the entire output of the initial read. As long as there are any changes in Stream A, the test detects differences.

Short summary

  1. We are testing nothing except first stream of the catalog.
  2. We are comparing two full read outputs, which may take an eternity to run diff on datasets of 6k and 5.9k rows.
  3. We are performing read operations for streams that have no state changes.

How

My proposal is to modify this test to run per stream. This means that we will run an initial read and subsequent reads for states for each individual stream. It will give us a clear picture of what exactly we are testing. While running the test on real connector, it became clear that we require configuration to select the strategy for its execution. Specifically, for HubSpot, most of the streams lack enough states (less than 3) necessary for this test to run. But, we want to test with at least latest state to verify the functionality of the stream. So, by default, we will run the test as we have done previously but per stream, while at the same time offering configuration options to alter the test's behavior:

checkpointing_strategy:
  strategy: use_state_variation
  streams:
    - name: email_events
      strategy: use_latest_state

This configuration defines both a common strategy and strategies specific to individual streams.

Test results

This test was run against the critical connectors list, and some of them failed due to the reasons described below.

The list:

Source Hubspot
Source Salesforce
Source Google Ads
Source Amazon S3
Source Stripe
Source Shopify
Source Facebook Marketing
Source Google Analytics
Source Google Search Console
Source Zendesk Support
Source Amazon Ads
Source Google Sheets
Source Intercom
Source Linkedin Ads
Source Mixpanel
Source Jira
Source Airtable
Source Github
Source Amplitude
Source Slack
Source Notion

The errors:

Source Stripe:

  • SetupAttempts - The test is failing and requires investigation to determine why checkpointing is not working correctly.
    To reproduce: read all the data, then take any checkpoint and run another read with state. Ensure that no records are produced.

Source Facebook Marketing:

  • AdsInsights - The test is failing and requires investigation to determine why checkpointing is not working correctly.
    To reproduce: read all the data, then take any checkpoint and run another read with state. Ensure that all the records from the full incremental read are present.

Source Zendesk Support:

  • TicketMetrics - The test is failing and requires investigation to determine why checkpointing is not working correctly.
    To reproduce: read all the data, then take the state checkpoint from 2022-07-18T16:59:22Z and run another read with the state applied. Ensure that the second read has fewer records than the number of records produced after the given state in the first read.

Investigation:

When examining the initial read, I notice something strange with the record ID 5163373144591: there are state checkpoints before this record that point to a much newer date than the record's cursor field. Additionally, this record is not present in the second read when the state checkpoint 2022-07-18T16:59:22Z is applied, which is correct and leads to a test crash.

A part of output from the initial read

...
record -> {"id": 5455947694223, "ticket_id": 143, "created_at": "2022-09-06T19:51:07Z", "updated_at": "2022-09-06T19:51:07Z"}
state  -> {"stream_state": {"updated_at": "2022-09-06T19:51:07Z"}, "sourceStats": {"recordCount": 1.0}}
record -> {"id": 5527080505487, "ticket_id": 144, "created_at": "2022-09-19T14:43:42Z", "updated_at": "2022-09-19T14:43:42Z"}
state  -> {"stream_state": {"updated_at": "2022-09-19T14:43:42Z"}, "sourceStats": {"recordCount": 1.0}}
state  -> {"stream_state": {"updated_at": "2022-09-19T14:43:42Z"}, "sourceStats": {"recordCount": 0.0}}
state  -> {"stream_state": {"updated_at": "2022-09-19T14:43:42Z"}, "sourceStats": {"recordCount": 0.0}}

# !!! This is a record (1 of 3) that is not present in the second read but, for some reason, is present in the first output after the state checkpoint, which is newer than the record's cursor
record -> {"id": 5163373144591, "ticket_id": 130, "created_at": "2022-07-18T16:39:49Z", "updated_at": "2022-07-18T16:39:49Z"}
state  -> {"stream_state": {"updated_at": "2022-09-19T14:43:42Z"}, "sourceStats": {"recordCount": 1.0}}

Source Jira:

  • SprintIssues: The test is failing and requires investigation to determine why checkpointing is not working correctly.
    To reproduce: read all the data, then take state checkpoint from example below and run another read with state. Ensure that amount of records is differ (30 from first read vs 25 from read with state applied) but there is no difference in records. This can only happen when first read contains duplicates, so both output totally match by value and differ in amount

State:

[{'type': 'STREAM', 'stream': {'stream_descriptor': {'name': 'sprint_issues'}, 'stream_state': {'states': [{'partition': {'parent_slice': {'board_id': 1, 'parent_slice': {}}, 'sprint_id': 2}, 'cursor': {'updated': '2023-10-12T13:30:02.307-0700'}}, {'partition': {'parent_slice': {'board_id': 1, 'parent_slice': {}}, 'sprint_id': 3}, 'cursor': {'updated': '2023-04-05T04:57:26.258-0700'}}, {'partition': {'parent_slice': {'board_id': 1, 'parent_slice': {}}, 'sprint_id': 4}, 'cursor': {'updated': '2023-04-05T04:57:48.978-0700'}}]}}}]

Duplicate ids: 2-10012 , 2-10019, ...

(cc: @bazarnov as I think the test results will also be interesting for you)

@roman-yermilov-gl roman-yermilov-gl self-assigned this Apr 3, 2024
Copy link

vercel bot commented Apr 3, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback May 30, 2024 0:02am

@roman-yermilov-gl roman-yermilov-gl force-pushed the ryermilov/cat-fix-incremental-test branch from 7491ba8 to 4cfe937 Compare April 4, 2024 13:37
for idx, state_message in enumerate(unique_state_messages):
assert state_message.type == Type.STATE
states_1 = filter_output(output_1, type_=Type.STATE)
# We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this as another signal we should filter out duplicate state messages @brianjlai - two consumers need to filter out the messages (platform and CATs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting yeah. i mean its something we can certainly do. The platform is reslient to duplicate states now, but i'm sure they're not opposed to us sending them less

if len(unique_state_messages) < 3:
continue

# For legacy state format, the final state message contains the final state of all streams. For per-stream state format,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove support for legacy state format? It's not supported by the CDK anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the problem highlighted by this PR started to happen when we actually started to use per stream state message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove support for legacy state format? It's not supported by the CDK anymore - yes we can. I will remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alafanechere We have encountered such problems periodically before, but I can't specify what was the reason

Comment on lines 249 to 253
assert (
# We assume that the output may be empty when we read the latest state, or it must produce some data if we are in the middle of our progression
records_N
or is_last_state_message
), f"Read {idx + 2} of {len(unique_state_messages)} should produce at least one record.\n\n state: {state_input} \n\n records_{idx + 2}: {records_N}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we count the number of records in between state message from output_1 and check that records_N matches this count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, and we should also avoid using wrong calculation {idx + 2}. I am going to fix it as well

roman-yermilov-gl and others added 3 commits May 2, 2024 22:40
- unique states function improved
- records calculated based on sourceStats.recordCount
- test only per state
- update unit tests
@roman-yermilov-gl roman-yermilov-gl marked this pull request as ready for review May 2, 2024 21:07
@roman-yermilov-gl roman-yermilov-gl requested review from a team, alafanechere and girarda May 2, 2024 21:07
roman-yermilov-gl and others added 3 commits May 22, 2024 12:07
- avoid using states if two or more consecutive states don't change the output
- if data is spread over time, we may have multiple identical states with different record counts, so we should sum them
@roman-yermilov-gl
Copy link
Contributor Author

Curent implementation is been tested against multiple connectors to ensure it works correctly

raise Exception("`states` expected to be a non empty list.")

# Function to extract the record count from a state object
get_record_count = operator.attrgetter("state.sourceStats.recordCount")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this the simplest implementation?

why can't we do something like

sum([state.sourceStats.recordCount for state in states])

In general, and in this case too, I think list comprehensions are easier to reason about than mapping functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have prepared a function for this and wanted to reuse it, so we have a single point where we get the record count. After refactoring, get_record_count became a method and was reused multiple times in various methods.

total_count = sum(map(get_record_count, states))

# Create an iterator of states and their reverse cumulative expected record counts
expected_record_count_per_state = zip(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we need to create this mapping upfront. Why can't we refer to state.sourceStats.recordCount directly from _get_expected_record_count_per_state

current_state = next(state_messages)
record_count = current_state.sourceStats.recordCount

Removing the upfront computation would make the flow easier to parse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how can I predict how many records I expect to receive after a certain state is applied? sourceStats.recordCount is not the number I need because it only shows the amount of records we got before the current state checkpoint and the previous state checkpoint.

@roman-yermilov-gl roman-yermilov-gl requested a review from girarda May 29, 2024 12:15
result.append(states[next_idx])
current_idx = next_idx

return list(self.get_expected_record_count_per_state(result))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method? Can we just filter out states without records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored

assert (
# We assume that the output may be empty when we read the latest state, or it must produce some data if we are in the middle of our progression
len(records_N) >= expected_records_count
or idx == len(states_with_expected_record_count) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check if this is the last state? The expected record count should be correct for each state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 2}: {records_N} \n\n diff: {diff}"
diff = naive_diff_records(records_1, records_N)
assert (
diff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check for diff here? An example of correct read results where the record filter is inclusive from both sides, for which tests fail:

[
    {"type": "STATE", "name": "test_stream", "stream_state": {}, "sourceStats": {"recordCount": 0.0}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-08"}},
    {"type": "STATE", "name": "test_stream", "stream_state": {"date": "2022-05-08"}, "sourceStats": {"recordCount": 1.0}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-08"}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-09"}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-10"}},
    {"type": "STATE", "name": "test_stream", "stream_state": {"date": "2022-05-10"}, "sourceStats": {"recordCount": 2.0}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-10"}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-11"}},
    {"type": "RECORD", "name": "test_stream", "data": {"date": "2022-05-12"}},
    {"type": "STATE", "name": "test_stream", "stream_state": {"date": "2022-05-12"}, "sourceStats": {"recordCount": 3.0}}
]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need it because we can't rely only on record count

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment documenting this rationale @roman-yermilov-gl ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the issue for further investigation: https://github.com/airbytehq/airbyte-internal-issues/issues/8056

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tolik0 is going to prepare a new ticket to investigate this rationale. In the scope of this ticket, he will also do measurements and possibly make improvements for connectors that run slowly due to the diff operation

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just small nits. this looks great!


# Calculates the expected record count per state based on the total record count and distribution across states.
# The expected record count is the number of records we expect to receive when applying a specific state checkpoint.
unique_non_zero_state_messages_with_record_count = zip(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's call this unique_non_zero_state_messages_with_record_count for clarity

return list(unique_non_zero_state_messages_with_record_count)

def _states_with_expected_record_count_batch_selector(
self, unique_state_messages_with_record_count: List[Tuple[AirbyteMessage, float]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's call the parameter unique_non_zero_state_messages_with_record_count for clarity

), f"Records for subsequent reads with new state should be different.\n\n records_1: {records_1} \n\n state: {state_input} \n\n records_{idx + 2}: {records_N} \n\n diff: {diff}"
diff = naive_diff_records(records_1, records_N)
assert (
diff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment documenting this rationale @roman-yermilov-gl ?

@roman-yermilov-gl roman-yermilov-gl merged commit 1f325ec into master Jun 3, 2024
32 checks passed
@roman-yermilov-gl roman-yermilov-gl deleted the ryermilov/cat-fix-incremental-test branch June 3, 2024 11:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants