Skip to content

Commit

Permalink
Source Facebook Marketing: fix job timeout (airbytehq#33828)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-yermilov-gl authored and jatinyadav-cc committed Feb 26, 2024
1 parent d9eb3ee commit 0ba82b6
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ acceptance_tests:
tests:
- spec_path: "integration_tests/spec.json"
backward_compatibility_tests_config:
disable_for_version: "1.1.12"
previous_connector_version: "1.1.11"
disable_for_version: "1.2.2"
previous_connector_version: "1.2.1"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -21,8 +21,8 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "1.1.12"
previous_connector_version: "1.1.11"
disable_for_version: "1.2.2"
previous_connector_version: "1.2.1"
basic_read:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@
"mininum": 1,
"exclusiveMinimum": 0,
"type": "integer"
},
"insights_job_timeout": {
"title": "Custom Insights Job Timeout",
"description": "The insights job timeout",
"default": 60,
"maximum": 60,
"mininum": 10,
"exclusiveMinimum": 0,
"type": "integer"
}
},
"required": ["name"]
Expand All @@ -347,6 +356,16 @@
"exclusiveMinimum": 0,
"type": "integer"
},
"insights_job_timeout": {
"title": "Insights Job Timeout",
"description": "Insights Job Timeout establishes the maximum amount of time (in minutes) of waiting for the report job to complete. When timeout is reached the job is considered failed and we are trying to request smaller amount of data by breaking the job to few smaller ones. If you definitely know that 60 minutes is not enough for your report to be processed then you can decrease the timeout value, so we start breaking job to smaller parts faster.",
"default": 60,
"order": 9,
"maximum": 60,
"mininum": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"action_breakdowns_allow_empty": {
"title": "Action Breakdowns Allow Empty",
"description": "Allows action_breakdowns to be an empty list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.2.1
dockerImageTag: 1.2.2
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
report_start_date = config.start_date or pendulum.now().add(years=-2)

insights_args = dict(
api=api, start_date=report_start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
api=api,
start_date=report_start_date,
end_date=config.end_date,
insights_lookback_window=config.insights_lookback_window,
insights_job_timeout=config.insights_job_timeout,
)
streams = [
AdAccount(api=api),
Expand Down Expand Up @@ -281,6 +285,7 @@ def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List
start_date=insight.start_date or config.start_date or pendulum.now().add(years=-2),
end_date=insight.end_date or config.end_date,
insights_lookback_window=insight.insights_lookback_window or config.insights_lookback_window,
insights_job_timeout=insight.insights_job_timeout or config.insights_job_timeout,
level=insight.level,
)
streams.append(stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ class Config:
mininum=1,
default=28,
)
insights_job_timeout: Optional[PositiveInt] = Field(
title="Custom Insights Job Timeout",
description="The insights job timeout",
maximum=60,
mininum=10,
default=60,
)


class ConnectorConfig(BaseConfig):
Expand Down Expand Up @@ -202,6 +209,20 @@ class Config:
default=28,
)

insights_job_timeout: Optional[PositiveInt] = Field(
title="Insights Job Timeout",
order=9,
description=(
"Insights Job Timeout establishes the maximum amount of time (in minutes) of waiting for the report job to complete. "
"When timeout is reached the job is considered failed and we are trying to request smaller amount of data by breaking the job to few smaller ones. "
"If you definitely know that 60 minutes is not enough for your report to be processed then you can decrease the timeout value, "
"so we start breaking job to smaller parts faster."
),
maximum=60,
mininum=10,
default=60,
)

action_breakdowns_allow_empty: bool = Field(
description="Allows action_breakdowns to be an empty list",
default=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from facebook_business.adobjects.campaign import Campaign
from facebook_business.adobjects.objectparser import ObjectParser
from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch, FacebookBadObjectError, FacebookResponse
from pendulum.duration import Duration
from source_facebook_marketing.streams.common import retry_pattern

from ..utils import validate_start_date
Expand Down Expand Up @@ -189,10 +190,9 @@ def __str__(self) -> str:
class InsightAsyncJob(AsyncJob):
"""AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job"""

job_timeout = pendulum.duration(hours=1)
page_size = 100

def __init__(self, edge_object: Union[AdAccount, Campaign, AdSet, Ad], params: Mapping[str, Any], **kwargs):
def __init__(self, edge_object: Union[AdAccount, Campaign, AdSet, Ad], params: Mapping[str, Any], job_timeout: Duration, **kwargs):
"""Initialize
:param api: FB API
Expand All @@ -205,6 +205,7 @@ def __init__(self, edge_object: Union[AdAccount, Campaign, AdSet, Ad], params: M
"since": self._interval.start.to_date_string(),
"until": self._interval.end.to_date_string(),
}
self._job_timeout = job_timeout

self._edge_object = edge_object
self._job: Optional[AdReportRun] = None
Expand Down Expand Up @@ -251,7 +252,12 @@ def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Ty
ids = set(row[pk_name] for row in result)
logger.info(f"Got {len(ids)} {pk_name}s for period {self._interval}: {ids}")

jobs = [InsightAsyncJob(api=self._api, edge_object=edge_class(pk), params=self._params, interval=self._interval) for pk in ids]
jobs = [
InsightAsyncJob(
api=self._api, edge_object=edge_class(pk), params=self._params, interval=self._interval, job_timeout=self._job_timeout
)
for pk in ids
]
return jobs

def start(self):
Expand Down Expand Up @@ -335,8 +341,8 @@ def _check_status(self) -> bool:
percent = self._job["async_percent_completion"]
logger.info(f"{self}: is {percent} complete ({job_status})")

if self.elapsed_time > self.job_timeout:
logger.info(f"{self}: run more than maximum allowed time {self.job_timeout}.")
if self.elapsed_time > self._job_timeout:
logger.info(f"{self}: run more than maximum allowed time {self._job_timeout}.")
self._finish_time = pendulum.now()
self._failed = True
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
action_report_time: str = "mixed",
time_increment: Optional[int] = None,
insights_lookback_window: int = None,
insights_job_timeout: int = 60,
level: str = "ad",
**kwargs,
):
Expand All @@ -82,6 +83,7 @@ def __init__(
self.action_report_time = action_report_time
self._new_class_name = name
self._insights_lookback_window = insights_lookback_window
self._insights_job_timeout = insights_job_timeout
self.level = level

# state
Expand Down Expand Up @@ -110,6 +112,10 @@ def insights_lookback_period(self):
"""
return pendulum.duration(days=self._insights_lookback_window)

@property
def insights_job_timeout(self):
return pendulum.duration(minutes=self._insights_job_timeout)

def list_objects(self, params: Mapping[str, Any]) -> Iterable:
"""Because insights has very different read_records we don't need this method anymore"""

Expand Down Expand Up @@ -209,7 +215,9 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
continue
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
interval = pendulum.Period(ts_start, ts_end)
yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params)
yield InsightAsyncJob(
api=self._api.api, edge_object=self._api.account, interval=interval, params=params, job_timeout=self.insights_job_timeout
)

def check_breakdowns(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def job_fixture(api, account):
}
interval = pendulum.Period(pendulum.Date(2019, 1, 1), pendulum.Date(2019, 1, 1))

return InsightAsyncJob(edge_object=account, api=api, interval=interval, params=params)
return InsightAsyncJob(edge_object=account, api=api, interval=interval, params=params, job_timeout= pendulum.duration(minutes=60))


@pytest.fixture(name="grouped_jobs")
Expand Down Expand Up @@ -207,7 +207,7 @@ def test_update_job(self, started_job, adreport):
adreport.api_get.assert_called_once()

def test_update_job_expired(self, started_job, adreport, mocker):
mocker.patch.object(started_job, "job_timeout", new=pendulum.Duration())
mocker.patch.object(started_job, "_job_timeout", new=pendulum.Duration())

started_job.update_job()
assert started_job.failed
Expand Down Expand Up @@ -285,6 +285,7 @@ def test_str(self, api, account):
api=api,
params={"breakdowns": [10, 20]},
interval=interval,
job_timeout=pendulum.duration(minutes=60)
)

assert str(job) == f"InsightAsyncJob(id=<None>, {account}, time_range=<Period [2010-01-01 -> 2011-01-01]>, breakdowns=[10, 20])"
Expand Down Expand Up @@ -335,7 +336,7 @@ def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field):
today = pendulum.today().date()
start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10)
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=pendulum.Period(start, end), params=params)
job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=pendulum.Period(start, end), params=params, job_timeout=pendulum.duration(minutes=60))
mocker.patch.object(edge_class, "get_insights", return_value=[{id_field: 1}, {id_field: 2}, {id_field: 3}])

small_jobs = job.split_job()
Expand Down Expand Up @@ -365,7 +366,7 @@ def test_split_job_smallest(self, mocker, api):
"""Test that split will correctly downsize edge_object"""
interval = pendulum.Period(pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10))
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params)
job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params, job_timeout=pendulum.duration(minutes=60))

with pytest.raises(ValueError, match="The job is already splitted to the smallest size."):
job.split_job()
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ You can use the [Access Token Tool](https://developers.facebook.com/tools/access

7. (Optional) For **Page Size of Requests**, you can specify the number of records per page for paginated responses. Most users do not need to set this field unless specific issues arise or there are unique use cases that require tuning the connector's settings. The default value is set to retrieve 100 records per page.
8. (Optional) For **Insights Window Lookback**, you may set a window in days to revisit data during syncing to capture updated conversion data from the API. Facebook allows for attribution windows of up to 28 days, during which time a conversion can be attributed to an ad. If you have set a custom attribution window in your Facebook account, please set the same value here. Otherwise, you may leave it at the default value of 28. For more information on action attributions, please refer to [the Meta Help Center](https://www.facebook.com/business/help/458681590974355?id=768381033531365).
8. (Optional) For **Insights Job Timeout**, you may set a custom value in range from 10 to 60. It establishes the maximum amount of time (in minutes) of waiting for the report job to complete.
9. Click **Set up source** and wait for the tests to complete.

<HideInUI>
Expand Down Expand Up @@ -202,6 +203,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.2.2 | 2024-01-02 | [33828](https://github.com/airbytehq/airbyte/pull/33828) | Add insights job timeout to be an option, so a user can specify their own value |
| 1.2.1 | 2023-11-22 | [32731](https://github.com/airbytehq/airbyte/pull/32731) | Removed validation that blocked personal ad accounts during `check` |
| 1.2.0 | 2023-10-31 | [31999](https://github.com/airbytehq/airbyte/pull/31999) | Extend the `AdCreatives` stream schema |
| 1.1.17 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
Expand Down

0 comments on commit 0ba82b6

Please sign in to comment.