Skip to content

Commit

Permalink
🐛 300-page-gate issue for Tickets Stream in Freshdesk source connector (
Browse files Browse the repository at this point in the history
airbytehq#4002)

airbytehq#4002 - 300-page-gate issue for Tickets Stream in Freshdesk source connector

Co-authored-by: Oleksandr Bazarnov <[email protected]>
  • Loading branch information
bazarnov and bazarnov authored Jun 25, 2021
1 parent 2ccb1c2 commit 62f478e
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ __pycache__
.venv
.mypy_cache
.ipynb_checkpoints
.pytest_

# dbt
profiles.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "ec4b9503-13cb-48ab-a4ab-6ade4be46567",
"name": "Freshdesk",
"dockerRepository": "airbyte/source-freshdesk",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-freshdesk",
"icon": "freshdesk.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
- sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567
name: Freshdesk
dockerRepository: airbyte/source-freshdesk
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://hub.docker.com/r/airbyte/source-freshdesk
icon: freshdesk.svg
- sourceDefinitionId: 396e4ca3-8a97-4b85-aa4e-c9d8c2d5f992
Expand Down
4 changes: 4 additions & 0 deletions airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Changelog

## 0.2.4
Fix the issue when server doesn't allow the client to fetch more than 300 pages from Tickets Stream:
`Validation failed: [{'field': 'page', 'message': 'You cannot access tickets beyond the 300th page. Please provide a smaller page number.', 'code': 'invalid_value'}]`

## 0.2.3
Fix discovery and set default cursor field as "updated_at"
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ RUN pip install .

ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/source-freshdesk
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-freshdesk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ For information about how to use this connector within Airbyte, see [the documen
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
python3 -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-freshdesk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"base-python",
"backoff==1.10.0",
"requests==2.25.1",
"pendulum==1.2.0",
"pendulum==2.1.2",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@

class API:
def __init__(
self, domain: str, api_key: str, requests_per_minute: int = None, verify: bool = True, proxies: MutableMapping[str, Any] = None
self,
domain: str,
api_key: str,
requests_per_minute: int = None,
verify: bool = True,
proxies: MutableMapping[str, Any] = None,
):
"""Basic HTTP interface to read from endpoints"""
self._api_prefix = f"https://{domain.rstrip('/')}/api/v2/"
Expand Down Expand Up @@ -141,12 +146,21 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Read using getter"""
params = params or {}

for page in range(1, self.maximum_page):
batch = list(getter(params={**params, "per_page": self.result_return_limit, "page": page}))
batch = list(
getter(
params={
**params,
"per_page": self.result_return_limit,
"page": page,
}
)
)
yield from batch

if len(batch) < self.result_return_limit:
break
return iter(())


class IncrementalStreamAPI(StreamAPI, ABC):
Expand Down Expand Up @@ -266,6 +280,66 @@ def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
params = {"include": "description"}
yield from self.read(partial(self._api_get, url="tickets"), params=params)

@staticmethod
def get_tickets(
result_return_limit: int, getter: Callable, params: Mapping[str, Any] = None, ticket_paginate_limit: int = 300
) -> Iterator:
"""
Read using getter
This block extends TicketsAPI Stream to overcome '300 page' server error.
Since the TicketsAPI Stream list has a 300 page pagination limit, after 300 pages, update the parameters with
query using 'updated_since' = last_record, if there is more data remaining.
"""
params = params or {}

# Start page
page = 1
# Initial request parameters
params = {
**params,
"order_type": "asc", # ASC order, to get the old records first
"order_by": "updated_at",
"per_page": result_return_limit,
}

while True:
params["page"] = page
batch = list(getter(params=params))
yield from batch

if len(batch) < result_return_limit:
return iter(())

# checkpoint & switch the pagination
if page == ticket_paginate_limit:
# get last_record from latest batch, pos. -1, because of ACS order of records
last_record_updated_at = batch[-1]["updated_at"]
page = 0 # reset page counter
last_record_updated_at = pendulum.parse(last_record_updated_at)
# updating request parameters with last_record state
params["updated_since"] = last_record_updated_at
# Increment page
page += 1

# Override the super().read() method with modified read for tickets
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Read using getter, patched to respect current state"""
params = params or {}
params = {**params, **self._state_params()}
latest_cursor = None
for record in self.get_tickets(self.result_return_limit, getter, params):
cursor = pendulum.parse(record[self.state_pk])
# filter out records older then state
if self._state and self._state >= cursor:
continue
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

if latest_cursor:
logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}")
self._state = max(latest_cursor, self._state) if self._state else latest_cursor


class TimeEntriesAPI(ClientIncrementalStreamAPI):
def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import pendulum
from source_freshdesk.api import TicketsAPI


class Test300PageLimit:

tickets_input = [
{"id": 1, "updated_at": "2018-01-02T00:00:00Z"},
{"id": 2, "updated_at": "2018-02-02T00:00:00Z"},
{"id": 3, "updated_at": "2018-03-02T00:00:00Z"},
{"id": 4, "updated_at": "2019-01-03T00:00:00Z"},
{"id": 5, "updated_at": "2019-02-03T00:00:00Z"},
{"id": 6, "updated_at": "2019-03-03T00:00:00Z"},
]

expected_output = [
{"id": 1, "updated_at": "2018-01-02T00:00:00Z"},
{"id": 2, "updated_at": "2018-02-02T00:00:00Z"},
{"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, # duplicate
{"id": 3, "updated_at": "2018-03-02T00:00:00Z"},
{"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, # duplicate
{"id": 4, "updated_at": "2019-01-03T00:00:00Z"},
{"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, # duplicate
{"id": 5, "updated_at": "2019-02-03T00:00:00Z"},
{"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, # duplicate
{"id": 6, "updated_at": "2019-03-03T00:00:00Z"},
{"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, # duplicate
]

# Mocking the getter: Callable to produce the server output
def _getter(self, params, **args):

tickets_stream = self.tickets_input
updated_since = params.get("updated_since", None)

if updated_since:
tickets_stream = filter(lambda ticket: pendulum.parse(ticket["updated_at"]) >= updated_since, self.tickets_input)

start_from = (params["page"] - 1) * params["per_page"]
output = list(tickets_stream)[start_from : start_from + params["per_page"]]

return output

def test_not_all_records(self):
"""
TEST 1 - not all records are retrieved
During test1 the tickets_stream changes the state of parameters on page: 2,
by updating the params:
`params["order_by"] = "updated_at"`
`params["updated_since"] = last_record`
continues to fetch records from the source, using new cycle, and so on.
NOTE:
After switch of the state on ticket_paginate_limit = 2, is this example, we will experience the
records duplication, because of the last_record state, starting at the point
where we stoped causes the duplication of the output. The solution for this is to add at least 1 second to the
last_record state. The DBT normalization should handle this for the end user, so the duplication issue is not a
blocker in such cases.
Main pricipal here is: airbyte is at-least-once delivery, but skipping records is data loss.
"""

# INT value of page number where the switch state should be triggered.
# in this test case values from: 1 - 4, assuming we want to switch state on this page.
ticket_paginate_limit = 2
# This parameter mocks the "per_page" parameter in the API Call
result_return_limit = 1
# Calling the TicketsAPI.get_tickets method directly from the module
test1 = list(
TicketsAPI.get_tickets(
result_return_limit=result_return_limit, getter=self._getter, ticket_paginate_limit=ticket_paginate_limit
)
)
# We're expecting 6 records to return from the tickets_stream
assert self.expected_output == test1

0 comments on commit 62f478e

Please sign in to comment.