Skip to content

Commit

Permalink
🐛 Source Shopify: fix one-time retry after Internal Server Error
Browse files Browse the repository at this point in the history
…for BULK streams (#37468)
  • Loading branch information
bazarnov authored Apr 22, 2024
1 parent 5f1e4e6 commit 01381ae
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.0.5
dockerImageTag: 2.0.6
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.5"
version = "2.0.6"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class ShopifyBulkManager:
job_should_revert_slice: bool = field(init=False, default=False)
# running job log counter
log_job_state_msg_count: int = field(init=False, default=0)
# one time retryable error counter
_one_time_error_retried: bool = field(init=False, default=False)

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -185,6 +187,8 @@ def __reset_state(self) -> None:
self.job_self_canceled = False
# set the running job message counter to default
self.log_job_state_msg_count = 0
# set one time retry flag to default
self._one_time_error_retried = False

def job_completed(self) -> bool:
return self.job_state == ShopifyBulkStatus.COMPLETED.value
Expand Down Expand Up @@ -306,6 +310,15 @@ def job_check_for_errors(self, response: requests.Response) -> Union[AirbyteTrac
f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}."
)

def job_one_time_retry_error(self, response: requests.Response, exception: Exception) -> Optional[requests.Response]:
if not self._one_time_error_retried:
request = response.request
self.logger.info(f"Stream: `{self.stream_name}`, retrying `Bad Request`: {request.body}. Error: {repr(exception)}.")
self._one_time_error_retried = True
return self.job_retry_request(request)
else:
self.on_job_with_errors(self.job_check_for_errors(response))

def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
# format Job state check args
status_args = self.job_get_request_args(ShopifyBulkTemplates.status)
Expand All @@ -322,19 +335,19 @@ def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
else:
# execute ERRORS scenario
self.on_job_with_errors(errors)
except ShopifyBulkExceptions.BulkJobBadResponse as e:
request = response.request
self.logger.info(f"Stream: `{self.stream_name}`, retrying Bad Request: {request.body}. Error: {repr(e)}.")
return self.job_retry_request(request)
except (
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
) as error:
return self.job_one_time_retry_error(response, error)

def job_check_state(self) -> Optional[str]:
response: Optional[requests.Response] = None
while not self.job_completed():
if self.job_canceled():
response = None
break
else:
response = self.job_track_running()

# return `job_result_url` when status is `COMPLETED`
return self.job_get_result(response)

Expand Down Expand Up @@ -430,6 +443,7 @@ def job_check(self, created_job_response: requests.Response) -> Optional[str]:
ShopifyBulkExceptions.BulkJobFailed,
ShopifyBulkExceptions.BulkJobTimout,
ShopifyBulkExceptions.BulkJobAccessDenied,
# this one is one-time retriable
ShopifyBulkExceptions.BulkJobUnknownError,
) as bulk_job_error:
raise bulk_job_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import requests
from source_shopify.shopify_graphql.bulk.exceptions import ShopifyBulkExceptions
from source_shopify.shopify_graphql.bulk.job import ShopifyBulkStatus
from source_shopify.streams.base_streams import IncrementalShopifyGraphQlBulkStream
from source_shopify.streams.streams import (
Collections,
CustomerAddress,
Expand Down Expand Up @@ -119,28 +118,21 @@ def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, con


@pytest.mark.parametrize(
"job_response, error_type, patch_healthcheck, expected",
"job_response, error_type, expected",
[
(
"bulk_job_completed_response",
None,
False,
"bulk-123456789.jsonl",
),
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, False, "exited with FAILED"),
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, False, "exited with TIMEOUT"),
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, False, "exited with ACCESS_DENIED"),
("bulk_successful_response_with_errors", ShopifyBulkExceptions.BulkJobUnknownError, True, "Could not validate the status of the BULK Job"),
("bulk_job_completed_response", None, "bulk-123456789.jsonl"),
("bulk_job_failed_response", ShopifyBulkExceptions.BulkJobFailed, "exited with FAILED"),
("bulk_job_timeout_response", ShopifyBulkExceptions.BulkJobTimout, "exited with TIMEOUT"),
("bulk_job_access_denied_response", ShopifyBulkExceptions.BulkJobAccessDenied, "exited with ACCESS_DENIED"),
],
ids=[
"completed",
"failed",
"timeout",
"access_denied",
"success with errors (edge)",
],
)
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, patch_healthcheck, expected) -> None:
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
Expand All @@ -151,8 +143,6 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
if patch_healthcheck:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_healthcheck", value=job_response)
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
Expand All @@ -167,6 +157,40 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
requests_mock.get(job_result_url, json=request.getfixturevalue(job_response))
result = stream.job_manager.job_check(test_job_status_response)
assert expected == result


@pytest.mark.parametrize(
"job_response, error_type, expected",
[
(
"bulk_successful_response_with_errors",
ShopifyBulkExceptions.BulkJobUnknownError,
"Could not validate the status of the BULK Job",
),
],
ids=[
"success with errors (edge)",
],
)
def test_one_time_retry_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager.job_check_interval_sec = 1
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
with pytest.raises(error_type) as error:
stream.job_manager.job_check(test_job_status_response)
# The retried request should FAIL here, because we stil want to see the Exception raised
# We expect the call count to be 4 due to the status checks, the non-retried request would take 2 calls.
assert expected in repr(error.value) and requests_mock.call_count == 4


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |
| 2.0.5 | 2024-04-03 | [36788](https://github.com/airbytehq/airbyte/pull/36788) | Added ability to dynamically adjust the size of the `slice` |
| 2.0.4 | 2024-03-22 | [36355](https://github.com/airbytehq/airbyte/pull/36355) | Update CDK version to ensure Per-Stream Error Messaging and Record Counts In State (features were already there so just upping the version) |
| 2.0.3 | 2024-03-15 | [36170](https://github.com/airbytehq/airbyte/pull/36170) | Fixed the `STATE` messages emittion frequency for the `nested` sub-streams |
Expand Down

0 comments on commit 01381ae

Please sign in to comment.