diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index ea92bac5..dfadc3da 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -1901,3 +1901,145 @@ def validate_refs(yaml_file: str) -> List[str]: / "airbyte_cdk/sources/declarative/declarative_component_schema.yaml" ) assert not validate_refs(yaml_file_path) + + +@pytest.mark.parametrize( + "test_name, manifest, pages, expected_states_qty", + [ + ( + "test_with_pagination_and_partition_router", + { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Rates", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + "partition": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.apilayer.com", + "path": "/exchangerates_data/latest", + "http_method": "GET", + "request_parameters": {}, + "request_headers": {}, + "request_body_json": {}, + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "partition_router": { + "type": "ListPartitionRouter", + "values": ["0", "1"], + "cursor_field": "partition", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, + }, + "paginator": { + "type": "DefaultPaginator", + "page_size": 2, + "page_size_option": { + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 2, + }, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%S.%fZ"], + "datetime_format": "%Y-%m-%dT%H:%M:%S.%fZ", + "cursor_field": "updated_at", + "start_datetime": { + "datetime": "{{ config.get('start_date', '2020-10-16T00:00:00.000Z') }}" + }, + }, + } + ], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "title": "API Key", + "airbyte_secret": True, + }, + "start_date": { + "title": "Start Date", + "description": "UTC date and time in the format YYYY-MM-DDTHH:MM:SS.000Z. During incremental sync, any data generated before this date will not be replicated. If left blank, the start date will be set to 2 years before the present date.", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + "pattern_descriptor": "YYYY-MM-DDTHH:MM:SS.000Z", + "examples": ["2020-11-16T00:00:00.000Z"], + "type": "string", + "format": "date-time", + }, + }, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, + }, + ( + _create_page( + { + "rates": [ + {"ABC": 0, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"}, + {"AED": 1, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"}, + ], + "_metadata": {"next": "next"}, + } + ), + _create_page( + { + "rates": [ + {"USD": 3, "partition": 0, "updated_at": "2020-11-16T00:00:00.000Z"} + ], + "_metadata": {}, + } + ), + _create_page( + { + "rates": [ + {"ABC": 2, "partition": 1, "updated_at": "2020-11-16T00:00:00.000Z"} + ], + "_metadata": {}, + } + ), + ), + 2, + ), + ], +) +def test_slice_checkpoint(test_name, manifest, pages, expected_states_qty): + _stream_name = "Rates" + with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): + states = [message.state for message in _run_read(manifest, _stream_name) if message.state] + assert len(states) == expected_states_qty