From 62f478e75db197e461939e0e71b374b4e320568f Mon Sep 17 00:00:00 2001 From: Oleksandr Date: Fri, 25 Jun 2021 12:28:13 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20300-page-gate=20issue=20for=20Ti?= =?UTF-8?q?ckets=20Stream=20in=20Freshdesk=20source=20connector=20(#4002)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #4002 - 300-page-gate issue for Tickets Stream in Freshdesk source connector Co-authored-by: Oleksandr Bazarnov --- .gitignore | 1 + .../ec4b9503-13cb-48ab-a4ab-6ade4be46567.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../connectors/source-freshdesk/CHANGELOG.md | 4 + .../connectors/source-freshdesk/Dockerfile | 2 +- .../connectors/source-freshdesk/README.md | 2 +- .../connectors/source-freshdesk/setup.py | 2 +- .../source-freshdesk/source_freshdesk/api.py | 80 ++++++++++++++- .../unit_tests/test_300_page.py | 99 +++++++++++++++++++ 9 files changed, 186 insertions(+), 8 deletions(-) create mode 100644 airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py diff --git a/.gitignore b/.gitignore index f05a8eb0f0bf..786088117349 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ __pycache__ .venv .mypy_cache .ipynb_checkpoints +.pytest_ # dbt profiles.yml diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ec4b9503-13cb-48ab-a4ab-6ade4be46567.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ec4b9503-13cb-48ab-a4ab-6ade4be46567.json index ae1d6ec993c6..0240cd98b266 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ec4b9503-13cb-48ab-a4ab-6ade4be46567.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ec4b9503-13cb-48ab-a4ab-6ade4be46567.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "ec4b9503-13cb-48ab-a4ab-6ade4be46567", "name": "Freshdesk", "dockerRepository": "airbyte/source-freshdesk", - "dockerImageTag": "0.2.4", + "dockerImageTag": "0.2.5", "documentationUrl": "https://hub.docker.com/r/airbyte/source-freshdesk", "icon": "freshdesk.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 879d67b5ab5f..4b25eb0bdcaa 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -151,7 +151,7 @@ - sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567 name: Freshdesk dockerRepository: airbyte/source-freshdesk - dockerImageTag: 0.2.4 + dockerImageTag: 0.2.5 documentationUrl: https://hub.docker.com/r/airbyte/source-freshdesk icon: freshdesk.svg - sourceDefinitionId: 396e4ca3-8a97-4b85-aa4e-c9d8c2d5f992 diff --git a/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md b/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md index d63383bccf25..1014f199cc26 100644 --- a/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md +++ b/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md @@ -1,4 +1,8 @@ # Changelog +## 0.2.4 +Fix the issue when server doesn't allow the client to fetch more than 300 pages from Tickets Stream: +`Validation failed: [{'field': 'page', 'message': 'You cannot access tickets beyond the 300th page. Please provide a smaller page number.', 'code': 'invalid_value'}]` + ## 0.2.3 Fix discovery and set default cursor field as "updated_at" diff --git a/airbyte-integrations/connectors/source-freshdesk/Dockerfile b/airbyte-integrations/connectors/source-freshdesk/Dockerfile index edc4e6172307..47720f9ce23d 100644 --- a/airbyte-integrations/connectors/source-freshdesk/Dockerfile +++ b/airbyte-integrations/connectors/source-freshdesk/Dockerfile @@ -14,5 +14,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh" -LABEL io.airbyte.version=0.2.4 +LABEL io.airbyte.version=0.2.5 LABEL io.airbyte.name=airbyte/source-freshdesk diff --git a/airbyte-integrations/connectors/source-freshdesk/README.md b/airbyte-integrations/connectors/source-freshdesk/README.md index fec91ca211a8..8d701db758cd 100644 --- a/airbyte-integrations/connectors/source-freshdesk/README.md +++ b/airbyte-integrations/connectors/source-freshdesk/README.md @@ -13,7 +13,7 @@ For information about how to use this connector within Airbyte, see [the documen #### Build & Activate Virtual Environment and install dependencies From this connector directory, create a virtual environment: ``` -python -m venv .venv +python3 -m venv .venv ``` This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your diff --git a/airbyte-integrations/connectors/source-freshdesk/setup.py b/airbyte-integrations/connectors/source-freshdesk/setup.py index 0088c8035980..d404beb4155d 100644 --- a/airbyte-integrations/connectors/source-freshdesk/setup.py +++ b/airbyte-integrations/connectors/source-freshdesk/setup.py @@ -30,7 +30,7 @@ "base-python", "backoff==1.10.0", "requests==2.25.1", - "pendulum==1.2.0", + "pendulum==2.1.2", ] TEST_REQUIREMENTS = [ diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py index 311959137097..835a1ff035d7 100644 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py @@ -45,7 +45,12 @@ class API: def __init__( - self, domain: str, api_key: str, requests_per_minute: int = None, verify: bool = True, proxies: MutableMapping[str, Any] = None + self, + domain: str, + api_key: str, + requests_per_minute: int = None, + verify: bool = True, + proxies: MutableMapping[str, Any] = None, ): """Basic HTTP interface to read from endpoints""" self._api_prefix = f"https://{domain.rstrip('/')}/api/v2/" @@ -141,12 +146,21 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]: def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: """Read using getter""" params = params or {} + for page in range(1, self.maximum_page): - batch = list(getter(params={**params, "per_page": self.result_return_limit, "page": page})) + batch = list( + getter( + params={ + **params, + "per_page": self.result_return_limit, + "page": page, + } + ) + ) yield from batch if len(batch) < self.result_return_limit: - break + return iter(()) class IncrementalStreamAPI(StreamAPI, ABC): @@ -266,6 +280,66 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]: params = {"include": "description"} yield from self.read(partial(self._api_get, url="tickets"), params=params) + @staticmethod + def get_tickets( + result_return_limit: int, getter: Callable, params: Mapping[str, Any] = None, ticket_paginate_limit: int = 300 + ) -> Iterator: + """ + Read using getter + + This block extends TicketsAPI Stream to overcome '300 page' server error. + Since the TicketsAPI Stream list has a 300 page pagination limit, after 300 pages, update the parameters with + query using 'updated_since' = last_record, if there is more data remaining. + """ + params = params or {} + + # Start page + page = 1 + # Initial request parameters + params = { + **params, + "order_type": "asc", # ASC order, to get the old records first + "order_by": "updated_at", + "per_page": result_return_limit, + } + + while True: + params["page"] = page + batch = list(getter(params=params)) + yield from batch + + if len(batch) < result_return_limit: + return iter(()) + + # checkpoint & switch the pagination + if page == ticket_paginate_limit: + # get last_record from latest batch, pos. -1, because of ACS order of records + last_record_updated_at = batch[-1]["updated_at"] + page = 0 # reset page counter + last_record_updated_at = pendulum.parse(last_record_updated_at) + # updating request parameters with last_record state + params["updated_since"] = last_record_updated_at + # Increment page + page += 1 + + # Override the super().read() method with modified read for tickets + def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: + """Read using getter, patched to respect current state""" + params = params or {} + params = {**params, **self._state_params()} + latest_cursor = None + for record in self.get_tickets(self.result_return_limit, getter, params): + cursor = pendulum.parse(record[self.state_pk]) + # filter out records older then state + if self._state and self._state >= cursor: + continue + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + yield record + + if latest_cursor: + logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}") + self._state = max(latest_cursor, self._state) if self._state else latest_cursor + class TimeEntriesAPI(ClientIncrementalStreamAPI): def list(self, fields: Sequence[str] = None) -> Iterator[dict]: diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py new file mode 100644 index 000000000000..26312d77cfa1 --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py @@ -0,0 +1,99 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import pendulum +from source_freshdesk.api import TicketsAPI + + +class Test300PageLimit: + + tickets_input = [ + {"id": 1, "updated_at": "2018-01-02T00:00:00Z"}, + {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, + {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, + {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, + {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, + {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, + ] + + expected_output = [ + {"id": 1, "updated_at": "2018-01-02T00:00:00Z"}, + {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, + {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, # duplicate + {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, + {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, # duplicate + {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, + {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, # duplicate + {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, + {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, # duplicate + {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, + {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, # duplicate + ] + + # Mocking the getter: Callable to produce the server output + def _getter(self, params, **args): + + tickets_stream = self.tickets_input + updated_since = params.get("updated_since", None) + + if updated_since: + tickets_stream = filter(lambda ticket: pendulum.parse(ticket["updated_at"]) >= updated_since, self.tickets_input) + + start_from = (params["page"] - 1) * params["per_page"] + output = list(tickets_stream)[start_from : start_from + params["per_page"]] + + return output + + def test_not_all_records(self): + """ + TEST 1 - not all records are retrieved + + During test1 the tickets_stream changes the state of parameters on page: 2, + by updating the params: + `params["order_by"] = "updated_at"` + `params["updated_since"] = last_record` + continues to fetch records from the source, using new cycle, and so on. + + NOTE: + After switch of the state on ticket_paginate_limit = 2, is this example, we will experience the + records duplication, because of the last_record state, starting at the point + where we stoped causes the duplication of the output. The solution for this is to add at least 1 second to the + last_record state. The DBT normalization should handle this for the end user, so the duplication issue is not a + blocker in such cases. + Main pricipal here is: airbyte is at-least-once delivery, but skipping records is data loss. + """ + + # INT value of page number where the switch state should be triggered. + # in this test case values from: 1 - 4, assuming we want to switch state on this page. + ticket_paginate_limit = 2 + # This parameter mocks the "per_page" parameter in the API Call + result_return_limit = 1 + # Calling the TicketsAPI.get_tickets method directly from the module + test1 = list( + TicketsAPI.get_tickets( + result_return_limit=result_return_limit, getter=self._getter, ticket_paginate_limit=ticket_paginate_limit + ) + ) + # We're expecting 6 records to return from the tickets_stream + assert self.expected_output == test1