Skip to content

Commit

Permalink
Source Facebook Marketing: fix permission error (#31132)
Browse files Browse the repository at this point in the history
Co-authored-by: roman-yermilov-gl <[email protected]>
  • Loading branch information
roman-yermilov-gl and roman-yermilov-gl authored Oct 11, 2023
1 parent 846d9a6 commit 865ff35
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=1.1.14
LABEL io.airbyte.version=1.1.15
LABEL io.airbyte.name=airbyte/source-facebook-marketing

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.1.14
dockerImageTag: 1.1.15
dockerRepository: airbyte/source-facebook-marketing
githubIssueLabel: source-facebook-marketing
icon: facebook.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def read_records(
job = stream_slice["insight_job"]
try:
for obj in job.get_result():
yield obj.export_all_data()
data = obj.export_all_data()
if self._response_data_is_valid(data):
yield data
except FacebookBadObjectError as e:
raise AirbyteTracedException(
message=f"API error occurs on Facebook side during job: {job}, wrong (empty) response received with errors: {e} "
Expand Down Expand Up @@ -221,6 +223,12 @@ def check_breakdowns(self):
}
self._api.account.get_insights(params=params, is_async=False)

def _response_data_is_valid(self, data: Iterable[Mapping[str, Any]]) -> bool:
"""
Ensure data contains all the fields specified in self.breakdowns
"""
return all([breakdown in data for breakdown in self.breakdowns])

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from facebook_business.adobjects.adaccount import AdAccount as FBAdAccount
from facebook_business.adobjects.adimage import AdImage
from facebook_business.adobjects.user import User
from facebook_business.exceptions import FacebookRequestError

from .base_insight_streams import AdsInsights
from .base_streams import FBMarketingIncrementalStream, FBMarketingReversedIncrementalStream, FBMarketingStream
Expand Down Expand Up @@ -187,15 +188,25 @@ def fields(self) -> List[str]:
# https://developers.facebook.com/docs/marketing-apis/guides/javascript-ads-dialog-for-payments/
# To access "funding_source_details", the user making the API call must have a MANAGE task permission for
# that specific ad account.
if "funding_source_details" in properties and "MANAGE" not in self.get_task_permissions():
permissions = self.get_task_permissions()
if "funding_source_details" in properties and "MANAGE" not in permissions:
properties.remove("funding_source_details")
if "is_prepay_account" in properties and "MANAGE" not in self.get_task_permissions():
if "is_prepay_account" in properties and "MANAGE" not in permissions:
properties.remove("is_prepay_account")
return properties

def list_objects(self, params: Mapping[str, Any]) -> Iterable:
"""noop in case of AdAccount"""
return [FBAdAccount(self._api.account.get_id()).api_get(fields=self.fields)]
fields = self.fields
try:
return [FBAdAccount(self._api.account.get_id()).api_get(fields=fields)]
except FacebookRequestError as e:
# This is a workaround for cases when account seem have all the required permissions
# but despite of that is not allowed to get `owner` field. See (https://github.com/airbytehq/oncall/issues/3167)
if e.api_error_code() == 200 and e.api_error_message() == "(#200) Requires business_management permission to manage the object":
fields.remove("owner")
return [FBAdAccount(self._api.account.get_id()).api_get(fields=fields)]
raise e


class Images(FBMarketingReversedIncrementalStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,20 @@ def test_level_custom(self, api):
)

assert stream.level == "adset"

def test_breackdowns_fields_present_in_response_data(self, api):
stream = AdsInsights(
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
breakdowns=["age", "gender"],
insights_lookback_window=28,
)

data = {"age": "0-100", "gender": "male"}

assert stream._response_data_is_valid(data)

data = {"id": "0000001", "name": "Pipenpodl Absakopalis"}

assert not stream._response_data_is_valid(data)
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from facebook_business import FacebookAdsApi, FacebookSession
from source_facebook_marketing.api import API
from source_facebook_marketing.streams import AdCreatives, AdsInsights
from source_facebook_marketing.streams import AdAccount, AdCreatives, AdsInsights

FB_API_VERSION = FacebookAdsApi.API_VERSION

account_id = "unknown_account"
some_config = {"start_date": "2021-01-23T00:00:00Z", "account_id": account_id, "access_token": "unknown_token"}
act_url = f"{FacebookSession.GRAPH}/{FB_API_VERSION}/act_{account_id}/"
base_url = f"{FacebookSession.GRAPH}/{FB_API_VERSION}/"
act_url = f"{base_url}act_{account_id}/"

ad_account_response = {
"json": {
Expand Down Expand Up @@ -105,6 +106,8 @@
# Re-authenticate (for cloud) or refresh access token (for oss) and check if all required permissions are granted
),
(
# One of possible reasons why this error happen is an attempt to access `owner` field:
# GET /act_<account-id>?fields=<field-1>,owner,...<field-n>
"error_403_requires_permission",
"Credentials don't have enough permissions. Re-authenticate if FB oauth is used or refresh access token with all required permissions.",
{
Expand Down Expand Up @@ -371,3 +374,40 @@ def test_config_error_insights_during_actual_nodes_read(self, requests_mock, nam
assert isinstance(error, AirbyteTracedException)
assert error.failure_type == FailureType.config_error
assert friendly_msg in error.message

def test_adaccount_list_objects_retry(self, requests_mock):
"""
Sometimes we get an error: "Requires business_management permission to manage the object" when account has all the required permissions:
[
'ads_management',
'ads_read',
'business_management',
'public_profile'
]
As a workaround for this case we can retry the API call excluding `owner` from `?fields=` GET query param.
"""
api = API(account_id=some_config["account_id"], access_token=some_config["access_token"], page_size=100)
stream = AdAccount(api=api)

business_user = {"account_id": account_id, "business": {"id": "1", "name": "TEST"}}
requests_mock.register_uri("GET", f"{base_url}me/business_users", status_code=200, json=business_user)

assigend_users = {"account_id": account_id, "tasks": ["TASK"]}
requests_mock.register_uri("GET", f"{act_url}assigned_users", status_code=200, json=assigend_users)

responses = [
{
"status_code": 403,
"json": {
"message": "(#200) Requires business_management permission to manage the object",
"type": "OAuthException",
"code": 200,
"fbtrace_id": "AOm48i-YaiRlzqnNEnECcW8",
},
},
{"status_code": 200, "json": {"account_id": account_id}},
]
requests_mock.register_uri("GET", f"{act_url}", responses)

record_gen = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=None, stream_state={})
assert list(record_gen) == [{"account_id": "unknown_account", "id": "act_unknown_account"}]
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.1.15 | 2023-10-06 | [31132](https://github.com/airbytehq/airbyte/pull/31132) | Fix permission error for `AdAccount` stream |
| 1.1.14 | 2023-09-26 | [30758](https://github.com/airbytehq/airbyte/pull/30758) | Exception should not be raises if a stream is not found |
| 1.1.13 | 2023-09-22 | [30706](https://github.com/airbytehq/airbyte/pull/30706) | Performance testing - include socat binary in docker image |
| 1.1.12 | 2023-09-22 | [30655](https://github.com/airbytehq/airbyte/pull/30655) | Updated doc; improved schema for custom insight streams; updated SAT or custom insight streams; removed obsolete optional max_batch_size option from spec |
Expand Down

0 comments on commit 865ff35

Please sign in to comment.