From 6d9139b223d61c125035d77cb9aa865863905c43 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 21 Dec 2023 04:30:08 +0200 Subject: [PATCH 1/8] Add possibility to sync all connected accounts --- .../source_google_ads/google_ads.py | 42 +++++++-- .../source_google_ads/models.py | 21 +++-- .../schemas/customer_client.json | 24 ++++++ .../source_google_ads/source.py | 60 +++++++++++-- .../source_google_ads/spec.json | 2 +- .../source_google_ads/streams.py | 86 +++++++++++++++++-- .../source-google-ads/unit_tests/common.py | 2 +- .../unit_tests/test_errors.py | 7 +- .../unit_tests/test_google_ads.py | 2 +- .../test_incremental_events_streams.py | 59 ++++++++++--- .../unit_tests/test_models.py | 4 +- .../unit_tests/test_source.py | 19 ++-- .../unit_tests/test_streams.py | 31 ++++--- 13 files changed, 289 insertions(+), 70 deletions(-) create mode 100644 airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/customer_client.json diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py index 05e8ef37a64f..04aebf548277 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py @@ -27,8 +27,28 @@ def __init__(self, credentials: MutableMapping[str, Any]): # `google-ads` library version `14.0.0` and higher requires an additional required parameter `use_proto_plus`. # More details can be found here: https://developers.google.com/google-ads/api/docs/client-libs/python/protobuf-messages credentials["use_proto_plus"] = True - self.client = self.get_google_ads_client(credentials) - self.ga_service = self.client.get_service("GoogleAdsService") + self.clients = {} + self.ga_services = {} + self.credentials = credentials + + self.clients["none"] = self.get_google_ads_client(credentials) + self.ga_services["none"] = self.clients["none"].get_service("GoogleAdsService") + + self.customer_service = self.clients["none"].get_service("CustomerService") + + def client(self, login_customer_id="none"): + if login_customer_id in self.clients: + return self.clients[login_customer_id] + new_creds = self.credentials.copy() + new_creds["login_customer_id"] = login_customer_id + self.clients[login_customer_id] = self.get_google_ads_client(new_creds) + return self.clients[login_customer_id] + + def ga_service(self, login_customer_id="none"): + if login_customer_id in self.ga_services: + return self.ga_services[login_customer_id] + self.ga_services[login_customer_id] = self.clients[login_customer_id].get_service("GoogleAdsService") + return self.ga_services[login_customer_id] @staticmethod def get_google_ads_client(credentials) -> GoogleAdsClient: @@ -38,6 +58,14 @@ def get_google_ads_client(credentials) -> GoogleAdsClient: message = "The authentication to Google Ads has expired. Re-authenticate to restore access to Google Ads." raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) from e + def get_accessible_accounts(self): + customer_resource_names = self.customer_service.list_accessible_customers().resource_names + logger.info(f"Found {len(customer_resource_names)} accessible accounts: {customer_resource_names}") + + for customer_resource_name in customer_resource_names: + customer_id = self.ga_service().parse_customer_path(customer_resource_name)["customer_id"] + yield customer_id + @backoff.on_exception( backoff.expo, (InternalServerError, ServerError, TooManyRequests), @@ -46,13 +74,13 @@ def get_google_ads_client(credentials) -> GoogleAdsClient: ), max_tries=5, ) - def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]: - client = self.client + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none") -> Iterator[SearchGoogleAdsResponse]: + client = self.client(login_customer_id) search_request = client.get_type("SearchGoogleAdsRequest") search_request.query = query search_request.page_size = self.DEFAULT_PAGE_SIZE search_request.customer_id = customer_id - return [self.ga_service.search(search_request)] + return [self.ga_service(login_customer_id).search(search_request, timeout=300)] def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]: """ @@ -61,8 +89,8 @@ def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]: :return dict of fields type info. """ - ga_field_service = self.client.get_service("GoogleAdsFieldService") - request = self.client.get_type("SearchGoogleAdsFieldsRequest") + ga_field_service = self.client().get_service("GoogleAdsFieldService") + request = self.client().get_type("SearchGoogleAdsFieldsRequest") request.page_size = len(fields) fields_sql = ",".join([f"'{field}'" for field in fields]) request.query = f""" diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py index c11ffaf0c57c..cf9048a63b0f 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py @@ -15,16 +15,23 @@ class CustomerModel: id: str time_zone: Union[timezone, str] = "local" is_manager_account: bool = False + login_customer_id: str = None @classmethod - def from_accounts(cls, accounts: Iterable[Iterable[Mapping[str, Any]]]): + def from_accounts(cls, accounts: Iterable[Mapping[str, Any]], table_name: str = "customer") -> Iterable["CustomerModel"]: data_objects = [] - for account_list in accounts: - for account in account_list: - time_zone_name = account.get("customer.time_zone") - tz = Timezone(time_zone_name) if time_zone_name else "local" + for account in accounts: + time_zone_name = account.get(f"{table_name}.time_zone") + tz = Timezone(time_zone_name) if time_zone_name else "local" - data_objects.append( - cls(id=str(account["customer.id"]), time_zone=tz, is_manager_account=bool(account.get("customer.manager"))) + login_customer_id = account.get("login_customer_id") + + data_objects.append( + cls( + id=str(account[f"{table_name}.id"]), + time_zone=tz, + is_manager_account=bool(account.get(f"{table_name}.manager")), + login_customer_id=login_customer_id, ) + ) return data_objects diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/customer_client.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/customer_client.json new file mode 100644 index 000000000000..efb4bfd93f78 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/schemas/customer_client.json @@ -0,0 +1,24 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "customer_client.client_customer": { + "type": ["null", "boolean"] + }, + "customer_client.level": { + "type": ["null", "string"] + }, + "customer_client.id": { + "type": ["null", "integer"] + }, + "customer_client.manager": { + "type": ["null", "boolean"] + }, + "customer_client.time_zone": { + "type": ["null", "number"] + }, + "customer_client.status": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index 1e23f20fe512..01c94a0699b5 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -47,6 +47,8 @@ ) from .utils import GAQL +logger = logging.getLogger("airbyte") + class SourceGoogleAds(AbstractSource): # Skip exceptions on missing streams @@ -65,6 +67,17 @@ def _validate_and_transform(config: Mapping[str, Any]): "https://developers.google.com/google-ads/api/fields/v15/query_validator" ) raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) + + if "login_customer_id" in config and not config["login_customer_id"].strip(): + config.pop("login_customer_id") + + if config.get("login_customer_id") and not config.get("credentials"): + message = ( + f"The `login_customer_id` property should be used only with `customer_id`. " + f"Please delete it to sync all connected customers. Or add `customer_id` to sync only specific customers." + ) + raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) + return config @staticmethod @@ -75,7 +88,7 @@ def get_credentials(config: Mapping[str, Any]) -> MutableMapping[str, Any]: credentials.update(use_proto_plus=True) # https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid - if "login_customer_id" in config and config["login_customer_id"].strip(): + if config.get("login_customer_id"): credentials["login_customer_id"] = config["login_customer_id"] return credentials @@ -98,12 +111,38 @@ def get_incremental_stream_config(google_api: GoogleAds, config: Mapping[str, An ) return incremental_stream_config - @staticmethod - def get_account_info(google_api: GoogleAds, config: Mapping[str, Any]) -> Iterable[Iterable[Mapping[str, Any]]]: - dummy_customers = [CustomerModel(id=_id) for _id in config["customer_id"].split(",")] + def get_all_accounts(self, google_api: GoogleAds, customers: List[CustomerModel]) -> List[str]: + customer_clients_stream = CustomerClient(google_api, customers=customers) + for slice in customer_clients_stream.stream_slices(): + n = 0 + for record in customer_clients_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice): + n += 1 + if n == 50: + break + yield record + + def _get_all_accessible_accounts(self, google_api: GoogleAds, config: Mapping[str, Any]) -> Iterable[Iterable[Mapping[str, Any]]]: + customer_ids = [customer_id for customer_id in google_api.get_accessible_accounts()] + dummy_customers = [CustomerModel(id=_id, login_customer_id=_id) for _id in customer_ids] + + yield from self.get_all_accounts(google_api, dummy_customers) + + def _get_accounts_by_id(self, google_api, config): + customer_ids = config["customer_id"].split(",") + + dummy_customers = [CustomerModel(id=_id) for _id in customer_ids] accounts_stream = ServiceAccounts(google_api, customers=dummy_customers) for slice_ in accounts_stream.stream_slices(): - yield accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_) + yield from accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_) + + def get_customers(self, google_api, config): + if config.get("customer_id"): + accounts = self._get_accounts_by_id(google_api, config) + accounts = [a for a in accounts] + return CustomerModel.from_accounts(accounts) + else: + accounts = self._get_all_accessible_accounts(google_api, config) + return CustomerModel.from_accounts(accounts, "customer_client") @staticmethod def is_metrics_in_custom_query(query: GAQL) -> bool: @@ -149,8 +188,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> logger.info("Checking the config") google_api = GoogleAds(credentials=self.get_credentials(config)) - accounts = self.get_account_info(google_api, config) - customers = CustomerModel.from_accounts(accounts) + customers = self.get_customers(google_api, config) + logger.info(f"Found {len(customers)} customers: {[customer.id for customer in customers]}") + # Check custom query request validity by sending metric request with non-existent time window for customer in customers: for query in config.get("custom_queries_array", []): @@ -177,8 +217,10 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self._validate_and_transform(config) google_api = GoogleAds(credentials=self.get_credentials(config)) - accounts = self.get_account_info(google_api, config) - customers = CustomerModel.from_accounts(accounts) + + customers = self.get_customers(google_api, config) + logger.info(f"Found {len(customers)} customers: {[customer.id for customer in customers]}") + non_manager_accounts = [customer for customer in customers if not customer.is_manager_account] default_config = dict(api=google_api, customers=customers) incremental_config = self.get_incremental_stream_config(google_api, config, customers) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json index b875b6d419d9..5267472ecfeb 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json @@ -4,7 +4,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Google Ads Spec", "type": "object", - "required": ["credentials", "customer_id"], + "required": ["credentials"], "additionalProperties": true, "properties": { "credentials": { diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index 695ad285c8a6..cf1e1a81eb46 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -42,7 +42,7 @@ def parse_response(self, response: SearchPager, stream_slice: Optional[Mapping[s def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: for customer in self.customers: - yield {"customer_id": customer.id} + yield {"customer_id": customer.id, "login_customer_id": customer.login_customer_id} @generator_backoff( wait_gen=backoff.constant, @@ -54,8 +54,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite interval=1, ) @detached(timeout_minutes=5) - def request_records_job(self, customer_id, query, stream_slice): - response_records = self.google_ads_client.send_request(query=query, customer_id=customer_id) + def request_records_job(self, customer_id, login_customer_id, query, stream_slice): + response_records = self.google_ads_client.send_request(query=query, customer_id=customer_id, login_customer_id=login_customer_id) yield from self.parse_records_with_backoff(response_records, stream_slice) def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: @@ -63,8 +63,10 @@ def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = No return [] customer_id = stream_slice["customer_id"] + login_customer_id = stream_slice["login_customer_id"] + try: - yield from self.request_records_job(customer_id, self.get_query(stream_slice), stream_slice) + yield from self.request_records_job(customer_id, login_customer_id, self.get_query(stream_slice), stream_slice) except (GoogleAdsException, Unauthenticated) as exception: traced_exception(exception, customer_id, self.CATCH_CUSTOMER_NOT_ENABLED_ERROR) except TimeoutError as exception: @@ -149,6 +151,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite ): if chunk: chunk["customer_id"] = customer.id + chunk["login_customer_id"] = customer.login_customer_id yield chunk def _update_state(self, customer_id: str, record: MutableMapping[str, Any]): @@ -228,6 +231,56 @@ def parse_response(self, response: SearchPager, stream_slice: Optional[Mapping[s yield record +class CustomerClient(GoogleAdsStream): + """ + Customer Client stream: https://developers.google.com/google-ads/api/fields/v15/customer_client + """ + + primary_key = ["customer_client.id"] + + def get_query(self, stream_slice: Mapping[str, Any] = None) -> str: + fields = GoogleAds.get_fields_from_schema(self.get_json_schema()) + table_name = get_resource_name(self.name) + + active_customers_condition = ["customer_client.status = 'ENABLED'"] + + query = GoogleAds.convert_schema_into_query(fields=fields, table_name=table_name, conditions=active_customers_condition) + return query + + def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + """ + This method is overridden to avoid using login_customer_id from dummy_customers. + + login_customer_id is used in the stream_slices to pass it to child customers, + but we don't need it here as this class iterate over customers accessible from user creds. + """ + if stream_slice is None: + return [] + + customer_id = stream_slice["customer_id"] + + try: + response_records = self.google_ads_client.send_request(self.get_query(stream_slice), customer_id=customer_id) + + yield from self.parse_records_with_backoff(response_records, stream_slice) + except GoogleAdsException as exception: + traced_exception(exception, customer_id, self.CATCH_CUSTOMER_NOT_ENABLED_ERROR) + + def parse_response(self, response: SearchPager, stream_slice: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping]: + """ + login_cusotmer_id is populated to child customers if they are under managers account + """ + records = [record for record in super().parse_response(response)] + + # read_records get all customers connected to customer_id from stream_slice + # if the result is more than one cusotmer, it's a manager, otherwise it is client account for which we don't need login_customer_id + is_manager = len(records) > 1 + for record in records: + if is_manager: + record["login_customer_id"] = stream_slice["login_customer_id"] + yield record + + class CustomerLabel(GoogleAdsStream): """ Customer Label stream: https://developers.google.com/google-ads/api/fields/v15/customer_label @@ -589,7 +642,13 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite yield from slices_generator else: for customer in self.customers: - yield {"customer_id": customer.id, "updated_ids": set(), "deleted_ids": set(), "record_changed_time_map": dict()} + yield { + "customer_id": customer.id, + "login_customer_id": customer.login_customer_id, + "updated_ids": set(), + "deleted_ids": set(), + "record_changed_time_map": dict(), + } def _process_parent_record(self, parent_record: MutableMapping[str, Any], child_slice: MutableMapping[str, Any]) -> bool: """Process a single parent_record and update the child_slice.""" @@ -613,7 +672,13 @@ def read_parent_stream( sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state.get(self.parent_stream_name) ): customer_id = parent_slice.get("customer_id") - child_slice = {"customer_id": customer_id, "updated_ids": set(), "deleted_ids": set(), "record_changed_time_map": dict()} + child_slice = { + "customer_id": customer_id, + "updated_ids": set(), + "deleted_ids": set(), + "record_changed_time_map": dict(), + "login_customer_id": parent_slice.get("login_customer_id"), + } if not self.get_current_state(customer_id): yield child_slice continue @@ -673,13 +738,20 @@ def _split_slice(child_slice: MutableMapping[str, Any], chunk_size: int = 10000) record_changed_time_map = child_slice["record_changed_time_map"] customer_id = child_slice["customer_id"] + login_customer_id = child_slice["login_customer_id"] # Split the updated_ids into chunks and yield them for i in range(0, len(updated_ids), chunk_size): chunk_ids = set(updated_ids[i : i + chunk_size]) chunk_time_map = {k: record_changed_time_map[k] for k in chunk_ids} - yield {"updated_ids": chunk_ids, "record_changed_time_map": chunk_time_map, "customer_id": customer_id, "deleted_ids": set()} + yield { + "updated_ids": chunk_ids, + "record_changed_time_map": chunk_time_map, + "customer_id": customer_id, + "deleted_ids": set(), + "login_customer_id": login_customer_id, + } def read_records( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: MutableMapping[str, Any] = None, **kwargs diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py index d1cc1033e31b..31b7a0871d67 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py @@ -44,7 +44,7 @@ def get_service(self, service): def load_from_dict(config, version=None): return MockGoogleAdsClient(config) - def send_request(self, query, customer_id): + def send_request(self, query, customer_id, login_customer_id="none"): yield from () diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py index ef599d97cbf8..20281e17849d 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py @@ -10,6 +10,7 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.utils import AirbyteTracedException from source_google_ads.google_ads import GoogleAds +from source_google_ads.models import CustomerModel from source_google_ads.source import SourceGoogleAds from source_google_ads.streams import AdGroupLabel, Label, ServiceAccounts @@ -74,7 +75,7 @@ def test_read_record_error_handling(mocker, config, customers, cls, raise_expect context = pytest.raises(AirbyteTracedException) if raise_expected else does_not_raise() with context as exception: - for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890"}): + for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890", "login_customer_id": "none"}): pass if raise_expected: @@ -131,8 +132,8 @@ def test_read_record_error_handling(mocker, config, customers, cls, raise_expect def test_check_custom_queries(mocker, config, custom_query, is_manager_account, error_message, warning): config["custom_queries_array"] = [custom_query] mocker.patch( - "source_google_ads.source.SourceGoogleAds.get_account_info", - Mock(return_value=[[{"customer.manager": is_manager_account, "customer.time_zone": "Europe/Berlin", "customer.id": "8765"}]]), + "source_google_ads.source.SourceGoogleAds.get_customers", + Mock(return_value=[CustomerModel(is_manager_account=is_manager_account, time_zone="Europe/Berlin", id="8765")]), ) mocker.patch("source_google_ads.google_ads.GoogleAdsClient", return_value=MockGoogleAdsClient) source = SourceGoogleAds() diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py index ecb65916c544..f541fb8e9bb1 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py @@ -169,7 +169,7 @@ def test_get_fields_metadata(mocker): response = google_ads_client.get_fields_metadata(fields) # Get the mock service to check the request query - mock_service = google_ads_client.client.get_service("GoogleAdsFieldService") + mock_service = google_ads_client.client().get_service("GoogleAdsFieldService") # Assert the constructed request query expected_query = """ diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py index 929d5f22f29c..3357d8dddfb9 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py @@ -54,7 +54,7 @@ class MockGoogleAds(GoogleAds): def parse_single_result(self, schema, result): return result - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): if query == "query_parent": return mock_response_parent() else: @@ -64,7 +64,7 @@ def send_request(self, query: str, customer_id: str): def test_change_status_stream(config, customers): """ """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id} + stream_slice = {"customer_id": customer_id, "login_customer_id": "none"} google_api = MockGoogleAds(credentials=config["credentials"]) @@ -78,7 +78,7 @@ def test_change_status_stream(config, customers): ) assert len(result) == 4 assert stream.get_query.call_count == 1 - stream.get_query.assert_called_with({"customer_id": customer_id}) + stream.get_query.assert_called_with({"customer_id": customer_id, "login_customer_id": "none"}) def test_child_incremental_events_read(config, customers): @@ -89,7 +89,7 @@ def test_child_incremental_events_read(config, customers): It shouldn't read records on 2021-01-01, 2021-01-02 """ customer_id = next(iter(customers)).id - parent_stream_slice = {"customer_id": customer_id, "resource_type": "CAMPAIGN_CRITERION"} + parent_stream_slice = {"customer_id": customer_id, "resource_type": "CAMPAIGN_CRITERION", "login_customer_id": "none"} stream_state = {"change_status": {customer_id: {"change_status.last_change_date_time": "2023-08-16 13:20:01.003295"}}} google_api = MockGoogleAds(credentials=config["credentials"]) @@ -121,6 +121,7 @@ def test_child_incremental_events_read(config, customers): "3": "2023-06-13 12:36:03.772447", "4": "2023-06-13 12:36:04.772447", }, + "login_customer_id": "none", } ] @@ -221,7 +222,7 @@ class MockGoogleAdsLimit(GoogleAds): def parse_single_result(self, schema, result): return result - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): self.count += 1 if self.count == 1: return mock_response_1() @@ -255,7 +256,12 @@ def test_query_limit_hit(config, customers): This test simulates a scenario where the limit is hit and slice start_date is updated with latest record cursor """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447"} + stream_slice = { + "customer_id": customer_id, + "start_date": "2023-06-13 11:35:04.772447", + "end_date": "2023-06-13 13:36:04.772447", + "login_customer_id": "none", + } google_api = MockGoogleAdsLimit(credentials=config["credentials"]) stream_config = dict( @@ -275,16 +281,37 @@ def test_query_limit_hit(config, customers): assert stream.get_query.call_count == 3 get_query_calls = [ - call({"customer_id": "123", "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447"}), - call({"customer_id": "123", "start_date": "2023-06-13 12:36:02.772447", "end_date": "2023-06-13 13:36:04.772447"}), - call({"customer_id": "123", "start_date": "2023-06-13 12:36:04.772447", "end_date": "2023-06-13 13:36:04.772447"}), + call( + { + "customer_id": "123", + "start_date": "2023-06-13 11:35:04.772447", + "end_date": "2023-06-13 13:36:04.772447", + "login_customer_id": "none", + } + ), + call( + { + "customer_id": "123", + "start_date": "2023-06-13 12:36:02.772447", + "end_date": "2023-06-13 13:36:04.772447", + "login_customer_id": "none", + } + ), + call( + { + "customer_id": "123", + "start_date": "2023-06-13 12:36:04.772447", + "end_date": "2023-06-13 13:36:04.772447", + "login_customer_id": "none", + } + ), ] get_query_mock.assert_has_calls(get_query_calls) class MockGoogleAdsLimitException(MockGoogleAdsLimit): - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): self.count += 1 if self.count == 1: return mock_response_1() @@ -302,7 +329,12 @@ def test_query_limit_hit_exception(config, customers): then error will be raised """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447"} + stream_slice = { + "customer_id": customer_id, + "start_date": "2023-06-13 11:35:04.772447", + "end_date": "2023-06-13 13:36:04.772447", + "login_customer_id": "none", + } google_api = MockGoogleAdsLimitException(credentials=config["credentials"]) stream_config = dict( @@ -342,6 +374,7 @@ def test_change_status_get_query(mocker, config, customers): "start_date": "2023-01-01 00:00:00.000000", "end_date": "2023-09-19 00:00:00.000000", "resource_type": "SOME_RESOURCE_TYPE", + "login_customer_id": "none", } # Call the get_query method with the stream_slice @@ -402,6 +435,7 @@ def test_incremental_events_stream_get_query(mocker, config, customers): "customers/1234567890/adGroupCriteria/111111111111~4": "2023-09-18 08:56:59.165599", "customers/1234567890/adGroupCriteria/111111111111~5": "2023-09-18 08:56:59.165599", }, + "login_customer_id": "none", } # Call the get_query method with the stream_slice @@ -431,6 +465,7 @@ def test_read_records_with_slice_splitting(mocker, config): "record_changed_time_map": {i: f"time_{i}" for i in range(15000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), + "login_customer_id": "none", } # Create a mock instance of the CampaignCriterion stream @@ -455,12 +490,14 @@ def test_read_records_with_slice_splitting(mocker, config): "record_changed_time_map": {i: f"time_{i}" for i in range(10000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), + "login_customer_id": "none", } expected_second_slice = { "updated_ids": set(range(10000, 15000)), "record_changed_time_map": {i: f"time_{i}" for i in range(10000, 15000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), + "login_customer_id": "none", } # Verify the arguments passed to the parent's read_records method for both calls diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py index 0edfdea1213c..5ba6194cb9fa 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py @@ -8,7 +8,7 @@ def test_time_zone(): - mock_account_info = [[{"customer.id": "8765"}]] + mock_account_info = [{"customer.id": "8765"}] customers = CustomerModel.from_accounts(mock_account_info) for customer in customers: assert customer.time_zone == "local" @@ -16,7 +16,7 @@ def test_time_zone(): @pytest.mark.parametrize("is_manager_account", (True, False)) def test_manager_account(is_manager_account): - mock_account_info = [[{"customer.manager": is_manager_account, "customer.id": "8765"}]] + mock_account_info = [{"customer.manager": is_manager_account, "customer.id": "8765"}] customers = CustomerModel.from_accounts(mock_account_info) for customer in customers: assert customer.is_manager_account is is_manager_account diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py index f39fa1e3f95d..3ae611df6ab4 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py @@ -14,6 +14,7 @@ from pendulum import today from source_google_ads.custom_query_stream import IncrementalCustomQuery from source_google_ads.google_ads import GoogleAds +from source_google_ads.models import CustomerModel from source_google_ads.source import SourceGoogleAds from source_google_ads.streams import AdGroupAdLegacy, chunk_date_range from source_google_ads.utils import GAQL @@ -22,10 +23,10 @@ @pytest.fixture -def mock_account_info(mocker): +def mock_get_customers(mocker): mocker.patch( - "source_google_ads.source.SourceGoogleAds.get_account_info", - Mock(return_value=[[{"customer.manager": False, "customer.time_zone": "Europe/Berlin", "customer.id": "8765"}]]), + "source_google_ads.source.SourceGoogleAds.get_customers", + Mock(return_value=[CustomerModel(is_manager_account=False, time_zone="Europe/Berlin", id="8765")]), ) @@ -113,7 +114,7 @@ def test_chunk_date_range(): ] == slices -def test_streams_count(config, mock_account_info): +def test_streams_count(config, mock_get_customers): source = SourceGoogleAds() streams = source.streams(config) expected_streams_number = 30 @@ -121,7 +122,7 @@ def test_streams_count(config, mock_account_info): assert len(streams) == expected_streams_number -def test_read_missing_stream(config, mock_account_info): +def test_read_missing_stream(config, mock_get_customers): source = SourceGoogleAds() catalog = ConfiguredAirbyteCatalog( @@ -437,8 +438,8 @@ def test_stream_slices(config, customers): ) slices = list(stream.stream_slices()) assert slices == [ - {"start_date": "2020-12-18", "end_date": "2021-01-01", "customer_id": "123"}, - {"start_date": "2021-01-02", "end_date": "2021-01-16", "customer_id": "123"}, - {"start_date": "2021-01-17", "end_date": "2021-01-31", "customer_id": "123"}, - {"start_date": "2021-02-01", "end_date": "2021-02-10", "customer_id": "123"}, + {"start_date": "2020-12-18", "end_date": "2021-01-01", "customer_id": "123", "login_customer_id": None}, + {"start_date": "2021-01-02", "end_date": "2021-01-16", "customer_id": "123", "login_customer_id": None}, + {"start_date": "2021-01-17", "end_date": "2021-01-31", "customer_id": "123", "login_customer_id": None}, + {"start_date": "2021-02-01", "end_date": "2021-02-10", "customer_id": "123", "login_customer_id": None}, ] diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py index 67015ec041df..c1cf3ea29ce4 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py @@ -51,7 +51,7 @@ class MockGoogleAds(GoogleAds): def parse_single_result(self, schema, result): return result - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): self.count += 1 if self.count == 1: return mock_response_1() @@ -67,7 +67,7 @@ def test_page_token_expired_retry_succeeds(config, customers): It shouldn't read records on 2021-01-01, 2021-01-02 """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2021-01-01", "end_date": "2021-01-15"} + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-01", "end_date": "2021-01-15", "login_customer_id": customer_id} google_api = MockGoogleAds(credentials=config["credentials"]) incremental_stream_config = dict( @@ -84,7 +84,9 @@ def test_page_token_expired_retry_succeeds(config, customers): result = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) assert len(result) == 9 assert stream.get_query.call_count == 2 - stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-15"}) + stream.get_query.assert_called_with( + {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-15", "login_customer_id": customer_id} + ) def mock_response_fails_1(): @@ -110,7 +112,7 @@ def mock_response_fails_2(): class MockGoogleAdsFails(MockGoogleAds): - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): self.count += 1 if self.count == 1: return mock_response_fails_1() @@ -124,7 +126,7 @@ def test_page_token_expired_retry_fails(config, customers): because Google Ads API doesn't allow filter by datetime. """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2021-01-01", "end_date": "2021-01-15"} + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-01", "end_date": "2021-01-15", "login_customer_id": customer_id} google_api = MockGoogleAdsFails(credentials=config["credentials"]) incremental_stream_config = dict( @@ -145,7 +147,9 @@ def test_page_token_expired_retry_fails(config, customers): "Please contact the Airbyte team with the link of your connection for assistance." ) - stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-15"}) + stream.get_query.assert_called_with( + {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-15", "login_customer_id": customer_id} + ) assert stream.get_query.call_count == 2 @@ -161,7 +165,7 @@ def mock_response_fails_one_date(): class MockGoogleAdsFailsOneDate(MockGoogleAds): - def send_request(self, query: str, customer_id: str): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): return mock_response_fails_one_date() @@ -172,7 +176,7 @@ def test_page_token_expired_it_should_fail_date_range_1_day(config, customers): Minimum date range is 1 day. """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"} + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": customer_id} google_api = MockGoogleAdsFailsOneDate(credentials=config["credentials"]) incremental_stream_config = dict( @@ -192,17 +196,21 @@ def test_page_token_expired_it_should_fail_date_range_1_day(config, customers): "Page token has expired during processing response. " "Please contact the Airbyte team with the link of your connection for assistance." ) - stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"}) + stream.get_query.assert_called_with( + {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": customer_id} + ) assert stream.get_query.call_count == 1 @pytest.mark.parametrize("error_cls", (ResourceExhausted, TooManyRequests, InternalServerError, DataLoss)) def test_retry_transient_errors(mocker, config, customers, error_cls): + customer_id = next(iter(customers)).id + mocker.patch("time.sleep") credentials = config["credentials"] credentials.update(use_proto_plus=True) api = GoogleAds(credentials=credentials) - mocked_search = mocker.patch.object(api.ga_service, "search", side_effect=error_cls("Error message")) + mocked_search = mocker.patch.object(api.ga_services["none"], "search", side_effect=error_cls("Error message")) incremental_stream_config = dict( api=api, conversion_window_days=config["conversion_window_days"], @@ -211,8 +219,7 @@ def test_retry_transient_errors(mocker, config, customers, error_cls): customers=customers, ) stream = ClickView(**incremental_stream_config) - customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"} + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": "none"} records = [] with pytest.raises(error_cls) as exception: records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) From b90ae1ed0c65e9c0ef50eff74b621de51fc64c73 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 21 Dec 2023 15:18:57 +0200 Subject: [PATCH 2/8] Delete login_customer_id property --- .../source_google_ads/models.py | 6 +- .../source_google_ads/source.py | 69 +++++++++---------- .../source_google_ads/spec.json | 35 ++++++---- .../source_google_ads/streams.py | 16 +++-- .../unit_tests/test_models.py | 4 +- docs/integrations/sources/google-ads.md | 30 ++++---- 6 files changed, 86 insertions(+), 74 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py index cf9048a63b0f..975336398f82 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py @@ -18,20 +18,18 @@ class CustomerModel: login_customer_id: str = None @classmethod - def from_accounts(cls, accounts: Iterable[Mapping[str, Any]], table_name: str = "customer") -> Iterable["CustomerModel"]: + def from_accounts(cls, accounts: Iterable[Mapping[str, Any]], table_name: str = "customer_client") -> Iterable["CustomerModel"]: data_objects = [] for account in accounts: time_zone_name = account.get(f"{table_name}.time_zone") tz = Timezone(time_zone_name) if time_zone_name else "local" - login_customer_id = account.get("login_customer_id") - data_objects.append( cls( id=str(account[f"{table_name}.id"]), time_zone=tz, is_manager_account=bool(account.get(f"{table_name}.manager")), - login_customer_id=login_customer_id, + login_customer_id=account.get("login_customer_id"), ) ) return data_objects diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index 01c94a0699b5..02835088d284 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -68,15 +68,9 @@ def _validate_and_transform(config: Mapping[str, Any]): ) raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) - if "login_customer_id" in config and not config["login_customer_id"].strip(): - config.pop("login_customer_id") - - if config.get("login_customer_id") and not config.get("credentials"): - message = ( - f"The `login_customer_id` property should be used only with `customer_id`. " - f"Please delete it to sync all connected customers. Or add `customer_id` to sync only specific customers." - ) - raise AirbyteTracedException(message=message, failure_type=FailureType.config_error) + if "customer_id" in config: + config["customer_ids"] = config["customer_id"].split(",") + config.pop("customer_id") return config @@ -86,10 +80,6 @@ def get_credentials(config: Mapping[str, Any]) -> MutableMapping[str, Any]: # use_proto_plus is set to True, because setting to False returned wrong value types, which breaks the backward compatibility. # For more info read the related PR's description: https://github.com/airbytehq/airbyte/pull/9996 credentials.update(use_proto_plus=True) - - # https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid - if config.get("login_customer_id"): - credentials["login_customer_id"] = config["login_customer_id"] return credentials @staticmethod @@ -111,38 +101,45 @@ def get_incremental_stream_config(google_api: GoogleAds, config: Mapping[str, An ) return incremental_stream_config - def get_all_accounts(self, google_api: GoogleAds, customers: List[CustomerModel]) -> List[str]: - customer_clients_stream = CustomerClient(google_api, customers=customers) + def get_all_accounts(self, google_api: GoogleAds, customers: List[CustomerModel], customer_status_filter: List[str]) -> List[str]: + customer_clients_stream = CustomerClient(api=google_api, customers=customers, customer_status_filter=customer_status_filter) for slice in customer_clients_stream.stream_slices(): - n = 0 for record in customer_clients_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice): - n += 1 - if n == 50: - break yield record - def _get_all_accessible_accounts(self, google_api: GoogleAds, config: Mapping[str, Any]) -> Iterable[Iterable[Mapping[str, Any]]]: + def _get_all_connected_accounts( + self, google_api: GoogleAds, customer_status_filter: List[str] + ) -> Iterable[Iterable[Mapping[str, Any]]]: customer_ids = [customer_id for customer_id in google_api.get_accessible_accounts()] dummy_customers = [CustomerModel(id=_id, login_customer_id=_id) for _id in customer_ids] - yield from self.get_all_accounts(google_api, dummy_customers) + yield from self.get_all_accounts(google_api, dummy_customers, customer_status_filter) - def _get_accounts_by_id(self, google_api, config): - customer_ids = config["customer_id"].split(",") + def get_customers(self, google_api: GoogleAds, config: Mapping[str, Any]) -> List[CustomerModel]: + customer_status_filter = config.get("customer_status_filter", []) + accounts = self._get_all_connected_accounts(google_api, customer_status_filter) + customers = CustomerModel.from_accounts(accounts) - dummy_customers = [CustomerModel(id=_id) for _id in customer_ids] - accounts_stream = ServiceAccounts(google_api, customers=dummy_customers) - for slice_ in accounts_stream.stream_slices(): - yield from accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_) - - def get_customers(self, google_api, config): - if config.get("customer_id"): - accounts = self._get_accounts_by_id(google_api, config) - accounts = [a for a in accounts] - return CustomerModel.from_accounts(accounts) - else: - accounts = self._get_all_accessible_accounts(google_api, config) - return CustomerModel.from_accounts(accounts, "customer_client") + # filter duplicates as one customer can be accessible from mutiple connected accounts + unique_customers = [] + seen_ids = set() + for customer in customers: + if customer.id in seen_ids: + continue + seen_ids.add(customer.id) + unique_customers.append(customer) + customers = unique_customers + customers_dict = {customer.id: customer for customer in customers} + + # filter only selected accounts + if config.get("customer_ids"): + customers = [] + for customer_id in config["customer_ids"]: + if customer_id not in customers_dict: + logging.warning(f"Customer with id {customer_id} is not accessible. Skipping it.") + else: + customers.append(customers_dict[customer_id]) + return customers @staticmethod def is_metrics_in_custom_query(query: GAQL) -> bool: diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json index 5267472ecfeb..78b88f50964f 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json @@ -64,6 +64,24 @@ "examples": ["6783948572,5839201945"], "order": 1 }, + "customer_status_filter" : { + "title" : "Customer Statuses Filter", + "description" : "A list of customer statuses to filter on. For detailed info about what each status mean refer to Google Ads documentation.", + "default" : [], + "order": 2, + "type" : "array", + "items" : { + "title" : "CustomerStatus", + "description" : "An enumeration.", + "enum" : [ + "UNKNOWN", + "ENABLED", + "CANCELED", + "SUSPENDED", + "CLOSED" + ] + } + }, "start_date": { "type": "string", "title": "Start Date", @@ -71,7 +89,7 @@ "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$", "pattern_descriptor": "YYYY-MM-DD", "examples": ["2017-01-25"], - "order": 2, + "order": 3, "format": "date" }, "end_date": { @@ -81,14 +99,14 @@ "pattern": "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$", "pattern_descriptor": "YYYY-MM-DD", "examples": ["2017-01-30"], - "order": 6, + "order": 4, "format": "date" }, "custom_queries_array": { "type": "array", "title": "Custom GAQL Queries", "description": "", - "order": 3, + "order": 5, "items": { "type": "object", "required": ["query", "table_name"], @@ -110,15 +128,6 @@ } } }, - "login_customer_id": { - "type": "string", - "title": "Login Customer ID for Managed Accounts", - "description": "If your access to the customer account is through a manager account, this field is required, and must be set to the 10-digit customer ID of the manager account. For more information about this field, refer to Google's documentation.", - "pattern_descriptor": ": 10 digits, with no dashes.", - "pattern": "^([0-9]{10})?$", - "examples": ["7349206847"], - "order": 4 - }, "conversion_window_days": { "title": "Conversion Window", "type": "integer", @@ -127,7 +136,7 @@ "maximum": 1095, "default": 14, "examples": [14], - "order": 5 + "order": 6 } } }, diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index cf1e1a81eb46..1b5decb0547b 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -238,11 +238,18 @@ class CustomerClient(GoogleAdsStream): primary_key = ["customer_client.id"] + def __init__(self, customer_status_filter: List[str], **kwargs): + self.customer_status_filter = customer_status_filter + super().__init__(**kwargs) + def get_query(self, stream_slice: Mapping[str, Any] = None) -> str: fields = GoogleAds.get_fields_from_schema(self.get_json_schema()) table_name = get_resource_name(self.name) - active_customers_condition = ["customer_client.status = 'ENABLED'"] + active_customers_condition = [] + if self.customer_status_filter: + customer_status_filter = ", ".join([f"'{status}'" for status in self.customer_status_filter]) + active_customers_condition = [f"customer_client.status in ({customer_status_filter})"] query = GoogleAds.convert_schema_into_query(fields=fields, table_name=table_name, conditions=active_customers_condition) return query @@ -273,11 +280,10 @@ def parse_response(self, response: SearchPager, stream_slice: Optional[Mapping[s records = [record for record in super().parse_response(response)] # read_records get all customers connected to customer_id from stream_slice - # if the result is more than one cusotmer, it's a manager, otherwise it is client account for which we don't need login_customer_id - is_manager = len(records) > 1 + # if the result is more than one customer, it's a manager, otherwise it is client account for which we don't need login_customer_id + root_is_manager = len(records) > 1 for record in records: - if is_manager: - record["login_customer_id"] = stream_slice["login_customer_id"] + record["login_customer_id"] = stream_slice["login_customer_id"] if root_is_manager else "none" yield record diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py index 5ba6194cb9fa..4c8970dcf211 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py @@ -8,7 +8,7 @@ def test_time_zone(): - mock_account_info = [{"customer.id": "8765"}] + mock_account_info = [{"customer_client.id": "8765"}] customers = CustomerModel.from_accounts(mock_account_info) for customer in customers: assert customer.time_zone == "local" @@ -16,7 +16,7 @@ def test_time_zone(): @pytest.mark.parametrize("is_manager_account", (True, False)) def test_manager_account(is_manager_account): - mock_account_info = [{"customer.manager": is_manager_account, "customer.id": "8765"}] + mock_account_info = [{"customer_client.manager": is_manager_account, "customer_client.id": "8765"}] customers = CustomerModel.from_accounts(mock_account_info) for customer in customers: assert customer.is_manager_account is is_manager_account diff --git a/docs/integrations/sources/google-ads.md b/docs/integrations/sources/google-ads.md index a56f046a29fd..86075c7a401a 100644 --- a/docs/integrations/sources/google-ads.md +++ b/docs/integrations/sources/google-ads.md @@ -62,13 +62,14 @@ To set up Google Ads as a source in Airbyte Cloud: 3. Find and select **Google Ads** from the list of available sources. 4. Enter a **Source name** of your choosing. 5. Click **Sign in with Google** to authenticate your Google Ads account. In the pop-up, select the appropriate Google account and click **Continue** to proceed. -6. Enter a comma-separated list of the **Customer ID(s)** for your account. These IDs are 10-digit numbers that uniquely identify your account. To find your Customer ID, please follow [Google's instructions](https://support.google.com/google-ads/answer/1704344). -7. (Optional) Enter a **Start Date** using the provided datepicker, or by programmatically entering the date in YYYY-MM-DD format. The data added on and after this date will be replicated. (Default start date is 2 years ago) -8. (Optional) You can use the **Custom GAQL Queries** field to enter a custom query using Google Ads Query Language. Click **Add** and enter your query, as well as the desired name of the table for this data in the destination. Multiple queries can be provided. For more information on formulating these queries, refer to our [guide below](#custom-query-understanding-google-ads-query-language). -9. (Required for Manager accounts) If accessing your account through a Google Ads Manager account, you must enter the [**Customer ID**](https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid) of the Manager account. -10. (Optional) Enter a **Conversion Window**. This is the number of days after an ad interaction during which a conversion is recorded in Google Ads. For more information on this topic, refer to the [Google Ads Help Center](https://support.google.com/google-ads/answer/3123169?hl=en). This field defaults to 14 days. -11. (Optional) Enter an **End Date** in YYYY-MM-DD format. Any data added after this date will not be replicated. Leaving this field blank will replicate all data from the start date onward. -12. Click **Set up source** and wait for the tests to complete. +6. (Optional) Enter a comma-separated list of the **Customer ID(s)** for your account. These IDs are 10-digit numbers that uniquely identify your account. To find your Customer ID, please follow [Google's instructions](https://support.google.com/google-ads/answer/1704344). Leaving this field blank will replicate data from all connected accounts. +7. (Optional) Enter customer statuses to filter customers. Leaving this field blank will replicate data from all accounts. Check [Google Ads documentation](https://developers.google.com/google-ads/api/reference/rpc/v15/CustomerStatusEnum.CustomerStatus) for more info. +8. (Optional) Enter a **Start Date** using the provided datepicker, or by programmatically entering the date in YYYY-MM-DD format. The data added on and after this date will be replicated. (Default start date is 2 years ago) +9. (Optional) You can use the **Custom GAQL Queries** field to enter a custom query using Google Ads Query Language. Click **Add** and enter your query, as well as the desired name of the table for this data in the destination. Multiple queries can be provided. For more information on formulating these queries, refer to our [guide below](#custom-query-understanding-google-ads-query-language). +10. (Required for Manager accounts) If accessing your account through a Google Ads Manager account, you must enter the [**Customer ID**](https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid) of the Manager account. +11. (Optional) Enter a **Conversion Window**. This is the number of days after an ad interaction during which a conversion is recorded in Google Ads. For more information on this topic, refer to the [Google Ads Help Center](https://support.google.com/google-ads/answer/3123169?hl=en). This field defaults to 14 days. +12. (Optional) Enter an **End Date** in YYYY-MM-DD format. Any data added after this date will not be replicated. Leaving this field blank will replicate all data from the start date onward. +13. Click **Set up source** and wait for the tests to complete. @@ -83,13 +84,14 @@ To set up Google Ads as a source in Airbyte Open Source: 4. Enter a **Source name** of your choosing. 5. Enter the **Developer Token** you obtained from Google. 6. To authenticate your Google account, enter your Google application's **Client ID**, **Client Secret**, **Refresh Token**, and optionally, the **Access Token**. -7. Enter a comma-separated list of the **Customer ID(s)** for your account. These IDs are 10-digit numbers that uniquely identify your account. To find your Customer ID, please follow [Google's instructions](https://support.google.com/google-ads/answer/1704344). -8. (Optional) Enter a **Start Date** using the provided datepicker, or by programmatically entering the date in YYYY-MM-DD format. The data added on and after this date will be replicated. (Default start date is 2 years ago) -9. (Optional) You can use the **Custom GAQL Queries** field to enter a custom query using Google Ads Query Language. Click **Add** and enter your query, as well as the desired name of the table for this data in the destination. Multiple queries can be provided. For more information on formulating these queries, refer to our [guide below](#custom-query-understanding-google-ads-query-language). -10. (Required for Manager accounts) If accessing your account through a Google Ads Manager account, you must enter the [**Customer ID**](https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid) of the Manager account. -11. (Optional) Enter a **Conversion Window**. This is the number of days after an ad interaction during which a conversion is recorded in Google Ads. For more information on this topic, see the section on [Conversion Windows](#note-on-conversion-windows) below, or refer to the [Google Ads Help Center](https://support.google.com/google-ads/answer/3123169?hl=en). This field defaults to 14 days. -12. (Optional) Enter an **End Date** in YYYY-MM-DD format. Any data added after this date will not be replicated. Leaving this field blank will replicate all data from the start date onward. -13. Click **Set up source** and wait for the tests to complete. +7. (Optional) Enter a comma-separated list of the **Customer ID(s)** for your account. These IDs are 10-digit numbers that uniquely identify your account. To find your Customer ID, please follow [Google's instructions](https://support.google.com/google-ads/answer/1704344). Leaving this field blank will replicate data from all connected accounts. +8. (Optional) Enter customer statuses to filter customers. Leaving this field blank will replicate data from all accounts. Check [Google Ads documentation](https://developers.google.com/google-ads/api/reference/rpc/v15/CustomerStatusEnum.CustomerStatus) for more info. +9. (Optional) Enter a **Start Date** using the provided datepicker, or by programmatically entering the date in YYYY-MM-DD format. The data added on and after this date will be replicated. (Default start date is 2 years ago) +10. (Optional) You can use the **Custom GAQL Queries** field to enter a custom query using Google Ads Query Language. Click **Add** and enter your query, as well as the desired name of the table for this data in the destination. Multiple queries can be provided. For more information on formulating these queries, refer to our [guide below](#custom-query-understanding-google-ads-query-language). +11. (Required for Manager accounts) If accessing your account through a Google Ads Manager account, you must enter the [**Customer ID**](https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid) of the Manager account. +12. (Optional) Enter a **Conversion Window**. This is the number of days after an ad interaction during which a conversion is recorded in Google Ads. For more information on this topic, see the section on [Conversion Windows](#note-on-conversion-windows) below, or refer to the [Google Ads Help Center](https://support.google.com/google-ads/answer/3123169?hl=en). This field defaults to 14 days. +13. (Optional) Enter an **End Date** in YYYY-MM-DD format. Any data added after this date will not be replicated. Leaving this field blank will replicate all data from the start date onward. +14. Click **Set up source** and wait for the tests to complete. From d5b4b25bb5d10385cd9ce6176fb6ee4f5dffc531 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 22 Dec 2023 14:19:42 +0200 Subject: [PATCH 3/8] Update version --- .../source_google_ads/google_ads.py | 2 +- .../source_google_ads/spec.json | 24 +-- .../source_google_ads/streams.py | 1 + .../source-google-ads/unit_tests/common.py | 3 + .../source-google-ads/unit_tests/conftest.py | 2 + .../unit_tests/test_errors.py | 13 ++ docs/integrations/sources/google-ads.md | 195 +++++++++--------- 7 files changed, 127 insertions(+), 113 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py index 04aebf548277..7230de6a470f 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py @@ -80,7 +80,7 @@ def send_request(self, query: str, customer_id: str, login_customer_id: str = "n search_request.query = query search_request.page_size = self.DEFAULT_PAGE_SIZE search_request.customer_id = customer_id - return [self.ga_service(login_customer_id).search(search_request, timeout=300)] + return [self.ga_service(login_customer_id).search(search_request)] def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json index 78b88f50964f..2b84f6bc1beb 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/spec.json @@ -64,22 +64,16 @@ "examples": ["6783948572,5839201945"], "order": 1 }, - "customer_status_filter" : { - "title" : "Customer Statuses Filter", - "description" : "A list of customer statuses to filter on. For detailed info about what each status mean refer to Google Ads documentation.", - "default" : [], + "customer_status_filter": { + "title": "Customer Statuses Filter", + "description": "A list of customer statuses to filter on. For detailed info about what each status mean refer to Google Ads documentation.", + "default": [], "order": 2, - "type" : "array", - "items" : { - "title" : "CustomerStatus", - "description" : "An enumeration.", - "enum" : [ - "UNKNOWN", - "ENABLED", - "CANCELED", - "SUSPENDED", - "CLOSED" - ] + "type": "array", + "items": { + "title": "CustomerStatus", + "description": "An enumeration.", + "enum": ["UNKNOWN", "ENABLED", "CANCELED", "SUSPENDED", "CLOSED"] } }, "start_date": { diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index 1b5decb0547b..2d8f04fc28c6 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -236,6 +236,7 @@ class CustomerClient(GoogleAdsStream): Customer Client stream: https://developers.google.com/google-ads/api/fields/v15/customer_client """ + CATCH_CUSTOMER_NOT_ENABLED_ERROR = False primary_key = ["customer_client.id"] def __init__(self, customer_status_filter: List[str], **kwargs): diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py index 31b7a0871d67..b2bff404d6e2 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/common.py @@ -47,6 +47,9 @@ def load_from_dict(config, version=None): def send_request(self, query, customer_id, login_customer_id="none"): yield from () + def get_accessible_accounts(self): + yield from ["fake_customer_id", "fake_customer_id_2"] + class MockGoogleAdsFieldService: _instance = None diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/conftest.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/conftest.py index 845780e9f383..859c7b31d81f 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/conftest.py @@ -3,6 +3,8 @@ # +from unittest.mock import Mock + import pytest from source_google_ads.models import CustomerModel diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py index 20281e17849d..b82388b89268 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py @@ -16,6 +16,15 @@ from .common import MockGoogleAdsClient, mock_google_ads_request_failure + +@pytest.fixture +def mock_get_customers(mocker): + mocker.patch( + "source_google_ads.source.SourceGoogleAds.get_customers", + Mock(return_value=[CustomerModel(is_manager_account=False, time_zone="Europe/Berlin", id="123")]), + ) + + params = [ ( ["USER_PERMISSION_DENIED"], @@ -52,6 +61,10 @@ @pytest.mark.parametrize(("exception", "error_message"), params) def test_expected_errors(mocker, config, exception, error_message): mock_google_ads_request_failure(mocker, exception) + mocker.patch( + "source_google_ads.google_ads.GoogleAds.get_accessible_accounts", + Mock(return_value=["123", "12345"]), + ) source = SourceGoogleAds() with pytest.raises(AirbyteTracedException) as exception: status_ok, error = source.check_connection(AirbyteLogger(), config) diff --git a/docs/integrations/sources/google-ads.md b/docs/integrations/sources/google-ads.md index 86075c7a401a..4f1b239d9ff1 100644 --- a/docs/integrations/sources/google-ads.md +++ b/docs/integrations/sources/google-ads.md @@ -278,100 +278,101 @@ Due to a limitation in the Google Ads API which does not allow getting performan ## Changelog -| Version | Date | Pull Request | Subject | -|:---------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------| -| `3.1.0` | 2024-01-09 | [33603](https://github.com/airbytehq/airbyte/pull/33603) | Fix two issues in the custom queries: automatic addition of `segments.date` in the query; incorrect field type for `DATE` fields. | -| `3.0.2` | 2024-01-08 | [33494](https://github.com/airbytehq/airbyte/pull/33494) | Add handling for 401 error while parsing response. Add `metrics.cost_micros` field to Ad Group stream. | -| `3.0.1` | 2023-12-26 | [33769](https://github.com/airbytehq/airbyte/pull/33769) | Run a read function in a separate thread to enforce a time limit for its execution | -| `3.0.0` | 2023-12-07 | [33120](https://github.com/airbytehq/airbyte/pull/33120) | Upgrade API version to v15 | -| `2.0.4` | 2023-11-10 | [32414](https://github.com/airbytehq/airbyte/pull/32414) | Add backoff strategy for read_records method | -| `2.0.3` | 2023-11-02 | [32102](https://github.com/airbytehq/airbyte/pull/32102) | Fix incremental events streams | -| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams | -| `2.0.1` | 2023-10-27 | [31908](https://github.com/airbytehq/airbyte/pull/31908) | Base image migration: remove Dockerfile and use the python-connector-base image | -| `2.0.0` | 2023-10-04 | [31048](https://github.com/airbytehq/airbyte/pull/31048) | Fix schem default streams, change names of streams. | -| `1.0.0` | 2023-09-28 | [30705](https://github.com/airbytehq/airbyte/pull/30705) | Fix schemas for custom queries | -| `0.11.1` | 2023-09-26 | [30758](https://github.com/airbytehq/airbyte/pull/30758) | Exception should not be raises if a stream is not found | -| `0.11.0` | 2023-09-23 | [30704](https://github.com/airbytehq/airbyte/pull/30704) | Update error handling | -| `0.10.0` | 2023-09-19 | [30091](https://github.com/airbytehq/airbyte/pull/30091) | Fix schemas for correct primary and foreign keys | -| `0.9.0` | 2023-09-14 | [28970](https://github.com/airbytehq/airbyte/pull/28970) | Add incremental deletes for Campaign and Ad Group Criterion streams | -| `0.8.1` | 2023-09-13 | [30376](https://github.com/airbytehq/airbyte/pull/30376) | Revert pagination changes from 0.8.0 | -| `0.8.0` | 2023-09-01 | [30071](https://github.com/airbytehq/airbyte/pull/30071) | Delete start_date from required parameters and fix pagination | -| `0.7.4` | 2023-07-28 | [28832](https://github.com/airbytehq/airbyte/pull/28832) | Update field descriptions | -| `0.7.3` | 2023-07-24 | [28510](https://github.com/airbytehq/airbyte/pull/28510) | Set dates with client's timezone | -| `0.7.2` | 2023-07-20 | [28535](https://github.com/airbytehq/airbyte/pull/28535) | UI improvement: Make the query field in custom reports a multi-line string field | -| `0.7.1` | 2023-07-17 | [28365](https://github.com/airbytehq/airbyte/pull/28365) | 0.3.1 and 0.3.2 follow up: make today the end date, not yesterday | -| `0.7.0` | 2023-07-12 | [28246](https://github.com/airbytehq/airbyte/pull/28246) | Add new streams: labels, criterions, biddig strategies | -| `0.6.1` | 2023-07-12 | [28230](https://github.com/airbytehq/airbyte/pull/28230) | Reduce amount of logs produced by the connector while working with big amount of data | -| `0.6.0` | 2023-07-10 | [28078](https://github.com/airbytehq/airbyte/pull/28078) | Add new stream `Campaign Budget` | -| `0.5.0` | 2023-07-07 | [28042](https://github.com/airbytehq/airbyte/pull/28042) | Add metrics & segment to `Campaigns` stream | -| `0.4.3` | 2023-07-05 | [27959](https://github.com/airbytehq/airbyte/pull/27959) | Add `audience` and `user_interest` streams | -| `0.3.3` | 2023-07-03 | [27913](https://github.com/airbytehq/airbyte/pull/27913) | Improve Google Ads exception handling (wrong customer ID) | -| `0.3.2` | 2023-06-29 | [27835](https://github.com/airbytehq/airbyte/pull/27835) | Fix bug introduced in 0.3.1: update query template | -| `0.3.1` | 2023-06-26 | [27711](https://github.com/airbytehq/airbyte/pull/27711) | Refactor date slicing; make start date inclusive | -| `0.3.0` | 2023-06-26 | [27738](https://github.com/airbytehq/airbyte/pull/27738) | License Update: Elv2 | -| `0.2.24` | 2023-06-06 | [27608](https://github.com/airbytehq/airbyte/pull/27608) | Improve Google Ads exception handling | -| `0.2.23` | 2023-06-06 | [26905](https://github.com/airbytehq/airbyte/pull/26905) | Replace deprecated `authSpecification` in the connector specification with `advancedAuth` | -| `0.2.22` | 2023-06-02 | [26948](https://github.com/airbytehq/airbyte/pull/26948) | Refactor error messages; add `pattern_descriptor` for fields in spec | -| `0.2.21` | 2023-05-30 | [25314](https://github.com/airbytehq/airbyte/pull/25314) | Add full refresh custom table `asset_group_listing_group_filter` | -| `0.2.20` | 2023-05-30 | [25624](https://github.com/airbytehq/airbyte/pull/25624) | Add `asset` Resource to full refresh custom tables (GAQL Queries) | -| `0.2.19` | 2023-05-15 | [26209](https://github.com/airbytehq/airbyte/pull/26209) | Handle Token Refresh errors as `config_error` | -| `0.2.18` | 2023-05-15 | [25947](https://github.com/airbytehq/airbyte/pull/25947) | Improve GAQL parser error message if multiple resources provided | -| `0.2.17` | 2023-05-11 | [25987](https://github.com/airbytehq/airbyte/pull/25987) | Categorized Config Errors Accurately | -| `0.2.16` | 2023-05-10 | [25965](https://github.com/airbytehq/airbyte/pull/25965) | Fix Airbyte date-time data-types | -| `0.2.14` | 2023-03-21 | [24945](https://github.com/airbytehq/airbyte/pull/24945) | For custom google query fixed schema type for "data_type: ENUM" and "is_repeated: true" to array of strings | -| `0.2.13` | 2023-03-21 | [24338](https://github.com/airbytehq/airbyte/pull/24338) | Migrate to v13 | -| `0.2.12` | 2023-03-17 | [22985](https://github.com/airbytehq/airbyte/pull/22985) | Specified date formatting in specification | -| `0.2.11` | 2023-03-13 | [23999](https://github.com/airbytehq/airbyte/pull/23999) | Fix incremental sync for Campaigns stream | -| `0.2.10` | 2023-02-11 | [22703](https://github.com/airbytehq/airbyte/pull/22703) | Add support for custom full_refresh streams | -| `0.2.9` | 2023-01-23 | [21705](https://github.com/airbytehq/airbyte/pull/21705) | Fix multibyte issue; Bump google-ads package to 19.0.0 | -| `0.2.8` | 2023-01-18 | [21517](https://github.com/airbytehq/airbyte/pull/21517) | Write fewer logs | -| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs | -| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors | -| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream | -| `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting | -| `0.2.3` | 2022-10-17 | [18069](https://github.com/airbytehq/airbyte/pull/18069) | Add `segments.hour`, `metrics.ctr`, `metrics.conversions` and `metrics.conversions_values` fields to `campaigns` report stream | -| `0.2.2` | 2022-10-21 | [17412](https://github.com/airbytehq/airbyte/pull/17412) | Release with CDK >= 0.2.2 | -| `0.2.1` | 2022-09-29 | [17412](https://github.com/airbytehq/airbyte/pull/17412) | Always use latest CDK version | -| `0.2.0` | 2022-08-23 | [15858](https://github.com/airbytehq/airbyte/pull/15858) | Mark the `query` and `table_name` fields in `custom_queries` as required | -| `0.1.44` | 2022-07-27 | [15084](https://github.com/airbytehq/airbyte/pull/15084) | Fix data type `ad_group_criterion.topic.path` in `display_topics_performance_report` and shifted `campaigns` to non-managers streams | -| `0.1.43` | 2022-07-12 | [14614](https://github.com/airbytehq/airbyte/pull/14614) | Update API version to `v11`, update `google-ads` to 17.0.0 | -| `0.1.42` | 2022-06-08 | [13624](https://github.com/airbytehq/airbyte/pull/13624) | Update `google-ads` to 15.1.1, pin `protobuf==3.20.0` to work on MacOS M1 machines (AMD) | -| `0.1.41` | 2022-06-08 | [13618](https://github.com/airbytehq/airbyte/pull/13618) | Add missing dependency | -| `0.1.40` | 2022-06-02 | [13423](https://github.com/airbytehq/airbyte/pull/13423) | Fix the missing data [issue](https://github.com/airbytehq/airbyte/issues/12999) | -| `0.1.39` | 2022-05-18 | [12914](https://github.com/airbytehq/airbyte/pull/12914) | Fix GAQL query validation and log auth errors instead of failing the sync | -| `0.1.38` | 2022-05-12 | [12807](https://github.com/airbytehq/airbyte/pull/12807) | Documentation updates | -| `0.1.37` | 2022-05-06 | [12651](https://github.com/airbytehq/airbyte/pull/12651) | Improve integration and unit tests | -| `0.1.36` | 2022-04-19 | [12158](https://github.com/airbytehq/airbyte/pull/12158) | Fix `*_labels` streams data type | -| `0.1.35` | 2022-04-18 | [9310](https://github.com/airbytehq/airbyte/pull/9310) | Add new fields to reports | -| `0.1.34` | 2022-03-29 | [11602](https://github.com/airbytehq/airbyte/pull/11602) | Add budget amount to campaigns stream. | -| `0.1.33` | 2022-03-29 | [11513](https://github.com/airbytehq/airbyte/pull/11513) | When `end_date` is configured in the future, use today's date instead. | -| `0.1.32` | 2022-03-24 | [11371](https://github.com/airbytehq/airbyte/pull/11371) | Improve how connection check returns error messages | -| `0.1.31` | 2022-03-23 | [11301](https://github.com/airbytehq/airbyte/pull/11301) | Update docs and spec to clarify usage | -| `0.1.30` | 2022-03-23 | [11221](https://github.com/airbytehq/airbyte/pull/11221) | Add `*_labels` streams to fetch the label text rather than their IDs | -| `0.1.29` | 2022-03-22 | [10919](https://github.com/airbytehq/airbyte/pull/10919) | Fix user location report schema and add to acceptance tests | -| `0.1.28` | 2022-02-25 | [10372](https://github.com/airbytehq/airbyte/pull/10372) | Add network fields to click view stream | -| `0.1.27` | 2022-02-16 | [10315](https://github.com/airbytehq/airbyte/pull/10315) | Make `ad_group_ads` and other streams support incremental sync. | -| `0.1.26` | 2022-02-11 | [10150](https://github.com/airbytehq/airbyte/pull/10150) | Add support for multiple customer IDs. | -| `0.1.25` | 2022-02-04 | [9812](https://github.com/airbytehq/airbyte/pull/9812) | Handle `EXPIRED_PAGE_TOKEN` exception and retry with updated state. | -| `0.1.24` | 2022-02-04 | [9996](https://github.com/airbytehq/airbyte/pull/9996) | Use Google Ads API version V9. | -| `0.1.23` | 2022-01-25 | [8669](https://github.com/airbytehq/airbyte/pull/8669) | Add end date parameter in spec. | -| `0.1.22` | 2022-01-24 | [9608](https://github.com/airbytehq/airbyte/pull/9608) | Reduce stream slice date range. | -| `0.1.21` | 2021-12-28 | [9149](https://github.com/airbytehq/airbyte/pull/9149) | Update title and description | -| `0.1.20` | 2021-12-22 | [9071](https://github.com/airbytehq/airbyte/pull/9071) | Fix: Keyword schema enum | -| `0.1.19` | 2021-12-14 | [8431](https://github.com/airbytehq/airbyte/pull/8431) | Add new streams: Geographic and Keyword | -| `0.1.18` | 2021-12-09 | [8225](https://github.com/airbytehq/airbyte/pull/8225) | Include time_zone to sync. Remove streams for manager account. | -| `0.1.16` | 2021-11-22 | [8178](https://github.com/airbytehq/airbyte/pull/8178) | Clarify setup fields | -| `0.1.15` | 2021-10-07 | [6684](https://github.com/airbytehq/airbyte/pull/6684) | Add new stream `click_view` | -| `0.1.14` | 2021-10-01 | [6565](https://github.com/airbytehq/airbyte/pull/6565) | Fix OAuth Spec File | -| `0.1.13` | 2021-09-27 | [6458](https://github.com/airbytehq/airbyte/pull/6458) | Update OAuth Spec File | -| `0.1.11` | 2021-09-22 | [6373](https://github.com/airbytehq/airbyte/pull/6373) | Fix inconsistent segments.date field type across all streams | -| `0.1.10` | 2021-09-13 | [6022](https://github.com/airbytehq/airbyte/pull/6022) | Annotate Oauth2 flow initialization parameters in connector spec | -| `0.1.9` | 2021-09-07 | [5302](https://github.com/airbytehq/airbyte/pull/5302) | Add custom query stream support | -| `0.1.8` | 2021-08-03 | [5509](https://github.com/airbytehq/airbyte/pull/5509) | Allow additionalProperties in spec.json | -| `0.1.7` | 2021-08-03 | [5422](https://github.com/airbytehq/airbyte/pull/5422) | Correct query to not skip dates | -| `0.1.6` | 2021-08-03 | [5423](https://github.com/airbytehq/airbyte/pull/5423) | Added new stream UserLocationReport | -| `0.1.5` | 2021-08-03 | [5159](https://github.com/airbytehq/airbyte/pull/5159) | Add field `login_customer_id` to spec | -| `0.1.4` | 2021-07-28 | [4962](https://github.com/airbytehq/airbyte/pull/4962) | Support new Report streams | -| `0.1.3` | 2021-07-23 | [4788](https://github.com/airbytehq/airbyte/pull/4788) | Support main streams, fix bug with exception `DATE_RANGE_TOO_NARROW` for incremental streams | -| `0.1.2` | 2021-07-06 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | -| `0.1.1` | 2021-06-23 | [4288](https://github.com/airbytehq/airbyte/pull/4288) | Fix `Bugfix: Correctly declare required parameters` | +| Version | Date | Pull Request | Subject | +|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------| +| `3.2.0` | 2024-01-09 | [33707](https://github.com/airbytehq/airbyte/pull/33707) | Add possibility to sync all connected accounts | +| `3.1.0` | 2024-01-09 | [33603](https://github.com/airbytehq/airbyte/pull/33603) | Fix two issues in the custom queries: automatic addition of `segments.date` in the query; incorrect field type for `DATE` fields. | +| `3.0.2` | 2024-01-08 | [33494](https://github.com/airbytehq/airbyte/pull/33494) | Add handling for 401 error while parsing response. Add `metrics.cost_micros` field to Ad Group stream. | +| `3.0.1` | 2023-12-26 | [33769](https://github.com/airbytehq/airbyte/pull/33769) | Run a read function in a separate thread to enforce a time limit for its execution | +| `3.0.0` | 2023-12-07 | [33120](https://github.com/airbytehq/airbyte/pull/33120) | Upgrade API version to v15 | +| `2.0.4` | 2023-11-10 | [32414](https://github.com/airbytehq/airbyte/pull/32414) | Add backoff strategy for read_records method | +| `2.0.3` | 2023-11-02 | [32102](https://github.com/airbytehq/airbyte/pull/32102) | Fix incremental events streams | +| `2.0.2` | 2023-10-31 | [32001](https://github.com/airbytehq/airbyte/pull/32001) | Added handling (retry) for `InternalServerError` while reading the streams | +| `2.0.1` | 2023-10-27 | [31908](https://github.com/airbytehq/airbyte/pull/31908) | Base image migration: remove Dockerfile and use the python-connector-base image | +| `2.0.0` | 2023-10-04 | [31048](https://github.com/airbytehq/airbyte/pull/31048) | Fix schem default streams, change names of streams. | +| `1.0.0` | 2023-09-28 | [30705](https://github.com/airbytehq/airbyte/pull/30705) | Fix schemas for custom queries | +| `0.11.1` | 2023-09-26 | [30758](https://github.com/airbytehq/airbyte/pull/30758) | Exception should not be raises if a stream is not found | +| `0.11.0` | 2023-09-23 | [30704](https://github.com/airbytehq/airbyte/pull/30704) | Update error handling | +| `0.10.0` | 2023-09-19 | [30091](https://github.com/airbytehq/airbyte/pull/30091) | Fix schemas for correct primary and foreign keys | +| `0.9.0` | 2023-09-14 | [28970](https://github.com/airbytehq/airbyte/pull/28970) | Add incremental deletes for Campaign and Ad Group Criterion streams | +| `0.8.1` | 2023-09-13 | [30376](https://github.com/airbytehq/airbyte/pull/30376) | Revert pagination changes from 0.8.0 | +| `0.8.0` | 2023-09-01 | [30071](https://github.com/airbytehq/airbyte/pull/30071) | Delete start_date from required parameters and fix pagination | +| `0.7.4` | 2023-07-28 | [28832](https://github.com/airbytehq/airbyte/pull/28832) | Update field descriptions | +| `0.7.3` | 2023-07-24 | [28510](https://github.com/airbytehq/airbyte/pull/28510) | Set dates with client's timezone | +| `0.7.2` | 2023-07-20 | [28535](https://github.com/airbytehq/airbyte/pull/28535) | UI improvement: Make the query field in custom reports a multi-line string field | +| `0.7.1` | 2023-07-17 | [28365](https://github.com/airbytehq/airbyte/pull/28365) | 0.3.1 and 0.3.2 follow up: make today the end date, not yesterday | +| `0.7.0` | 2023-07-12 | [28246](https://github.com/airbytehq/airbyte/pull/28246) | Add new streams: labels, criterions, biddig strategies | +| `0.6.1` | 2023-07-12 | [28230](https://github.com/airbytehq/airbyte/pull/28230) | Reduce amount of logs produced by the connector while working with big amount of data | +| `0.6.0` | 2023-07-10 | [28078](https://github.com/airbytehq/airbyte/pull/28078) | Add new stream `Campaign Budget` | +| `0.5.0` | 2023-07-07 | [28042](https://github.com/airbytehq/airbyte/pull/28042) | Add metrics & segment to `Campaigns` stream | +| `0.4.3` | 2023-07-05 | [27959](https://github.com/airbytehq/airbyte/pull/27959) | Add `audience` and `user_interest` streams | +| `0.3.3` | 2023-07-03 | [27913](https://github.com/airbytehq/airbyte/pull/27913) | Improve Google Ads exception handling (wrong customer ID) | +| `0.3.2` | 2023-06-29 | [27835](https://github.com/airbytehq/airbyte/pull/27835) | Fix bug introduced in 0.3.1: update query template | +| `0.3.1` | 2023-06-26 | [27711](https://github.com/airbytehq/airbyte/pull/27711) | Refactor date slicing; make start date inclusive | +| `0.3.0` | 2023-06-26 | [27738](https://github.com/airbytehq/airbyte/pull/27738) | License Update: Elv2 | +| `0.2.24` | 2023-06-06 | [27608](https://github.com/airbytehq/airbyte/pull/27608) | Improve Google Ads exception handling | +| `0.2.23` | 2023-06-06 | [26905](https://github.com/airbytehq/airbyte/pull/26905) | Replace deprecated `authSpecification` in the connector specification with `advancedAuth` | +| `0.2.22` | 2023-06-02 | [26948](https://github.com/airbytehq/airbyte/pull/26948) | Refactor error messages; add `pattern_descriptor` for fields in spec | +| `0.2.21` | 2023-05-30 | [25314](https://github.com/airbytehq/airbyte/pull/25314) | Add full refresh custom table `asset_group_listing_group_filter` | +| `0.2.20` | 2023-05-30 | [25624](https://github.com/airbytehq/airbyte/pull/25624) | Add `asset` Resource to full refresh custom tables (GAQL Queries) | +| `0.2.19` | 2023-05-15 | [26209](https://github.com/airbytehq/airbyte/pull/26209) | Handle Token Refresh errors as `config_error` | +| `0.2.18` | 2023-05-15 | [25947](https://github.com/airbytehq/airbyte/pull/25947) | Improve GAQL parser error message if multiple resources provided | +| `0.2.17` | 2023-05-11 | [25987](https://github.com/airbytehq/airbyte/pull/25987) | Categorized Config Errors Accurately | +| `0.2.16` | 2023-05-10 | [25965](https://github.com/airbytehq/airbyte/pull/25965) | Fix Airbyte date-time data-types | +| `0.2.14` | 2023-03-21 | [24945](https://github.com/airbytehq/airbyte/pull/24945) | For custom google query fixed schema type for "data_type: ENUM" and "is_repeated: true" to array of strings | +| `0.2.13` | 2023-03-21 | [24338](https://github.com/airbytehq/airbyte/pull/24338) | Migrate to v13 | +| `0.2.12` | 2023-03-17 | [22985](https://github.com/airbytehq/airbyte/pull/22985) | Specified date formatting in specification | +| `0.2.11` | 2023-03-13 | [23999](https://github.com/airbytehq/airbyte/pull/23999) | Fix incremental sync for Campaigns stream | +| `0.2.10` | 2023-02-11 | [22703](https://github.com/airbytehq/airbyte/pull/22703) | Add support for custom full_refresh streams | +| `0.2.9` | 2023-01-23 | [21705](https://github.com/airbytehq/airbyte/pull/21705) | Fix multibyte issue; Bump google-ads package to 19.0.0 | +| `0.2.8` | 2023-01-18 | [21517](https://github.com/airbytehq/airbyte/pull/21517) | Write fewer logs | +| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs | +| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors | +| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream | +| `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting | +| `0.2.3` | 2022-10-17 | [18069](https://github.com/airbytehq/airbyte/pull/18069) | Add `segments.hour`, `metrics.ctr`, `metrics.conversions` and `metrics.conversions_values` fields to `campaigns` report stream | +| `0.2.2` | 2022-10-21 | [17412](https://github.com/airbytehq/airbyte/pull/17412) | Release with CDK >= 0.2.2 | +| `0.2.1` | 2022-09-29 | [17412](https://github.com/airbytehq/airbyte/pull/17412) | Always use latest CDK version | +| `0.2.0` | 2022-08-23 | [15858](https://github.com/airbytehq/airbyte/pull/15858) | Mark the `query` and `table_name` fields in `custom_queries` as required | +| `0.1.44` | 2022-07-27 | [15084](https://github.com/airbytehq/airbyte/pull/15084) | Fix data type `ad_group_criterion.topic.path` in `display_topics_performance_report` and shifted `campaigns` to non-managers streams | +| `0.1.43` | 2022-07-12 | [14614](https://github.com/airbytehq/airbyte/pull/14614) | Update API version to `v11`, update `google-ads` to 17.0.0 | +| `0.1.42` | 2022-06-08 | [13624](https://github.com/airbytehq/airbyte/pull/13624) | Update `google-ads` to 15.1.1, pin `protobuf==3.20.0` to work on MacOS M1 machines (AMD) | +| `0.1.41` | 2022-06-08 | [13618](https://github.com/airbytehq/airbyte/pull/13618) | Add missing dependency | +| `0.1.40` | 2022-06-02 | [13423](https://github.com/airbytehq/airbyte/pull/13423) | Fix the missing data [issue](https://github.com/airbytehq/airbyte/issues/12999) | +| `0.1.39` | 2022-05-18 | [12914](https://github.com/airbytehq/airbyte/pull/12914) | Fix GAQL query validation and log auth errors instead of failing the sync | +| `0.1.38` | 2022-05-12 | [12807](https://github.com/airbytehq/airbyte/pull/12807) | Documentation updates | +| `0.1.37` | 2022-05-06 | [12651](https://github.com/airbytehq/airbyte/pull/12651) | Improve integration and unit tests | +| `0.1.36` | 2022-04-19 | [12158](https://github.com/airbytehq/airbyte/pull/12158) | Fix `*_labels` streams data type | +| `0.1.35` | 2022-04-18 | [9310](https://github.com/airbytehq/airbyte/pull/9310) | Add new fields to reports | +| `0.1.34` | 2022-03-29 | [11602](https://github.com/airbytehq/airbyte/pull/11602) | Add budget amount to campaigns stream. | +| `0.1.33` | 2022-03-29 | [11513](https://github.com/airbytehq/airbyte/pull/11513) | When `end_date` is configured in the future, use today's date instead. | +| `0.1.32` | 2022-03-24 | [11371](https://github.com/airbytehq/airbyte/pull/11371) | Improve how connection check returns error messages | +| `0.1.31` | 2022-03-23 | [11301](https://github.com/airbytehq/airbyte/pull/11301) | Update docs and spec to clarify usage | +| `0.1.30` | 2022-03-23 | [11221](https://github.com/airbytehq/airbyte/pull/11221) | Add `*_labels` streams to fetch the label text rather than their IDs | +| `0.1.29` | 2022-03-22 | [10919](https://github.com/airbytehq/airbyte/pull/10919) | Fix user location report schema and add to acceptance tests | +| `0.1.28` | 2022-02-25 | [10372](https://github.com/airbytehq/airbyte/pull/10372) | Add network fields to click view stream | +| `0.1.27` | 2022-02-16 | [10315](https://github.com/airbytehq/airbyte/pull/10315) | Make `ad_group_ads` and other streams support incremental sync. | +| `0.1.26` | 2022-02-11 | [10150](https://github.com/airbytehq/airbyte/pull/10150) | Add support for multiple customer IDs. | +| `0.1.25` | 2022-02-04 | [9812](https://github.com/airbytehq/airbyte/pull/9812) | Handle `EXPIRED_PAGE_TOKEN` exception and retry with updated state. | +| `0.1.24` | 2022-02-04 | [9996](https://github.com/airbytehq/airbyte/pull/9996) | Use Google Ads API version V9. | +| `0.1.23` | 2022-01-25 | [8669](https://github.com/airbytehq/airbyte/pull/8669) | Add end date parameter in spec. | +| `0.1.22` | 2022-01-24 | [9608](https://github.com/airbytehq/airbyte/pull/9608) | Reduce stream slice date range. | +| `0.1.21` | 2021-12-28 | [9149](https://github.com/airbytehq/airbyte/pull/9149) | Update title and description | +| `0.1.20` | 2021-12-22 | [9071](https://github.com/airbytehq/airbyte/pull/9071) | Fix: Keyword schema enum | +| `0.1.19` | 2021-12-14 | [8431](https://github.com/airbytehq/airbyte/pull/8431) | Add new streams: Geographic and Keyword | +| `0.1.18` | 2021-12-09 | [8225](https://github.com/airbytehq/airbyte/pull/8225) | Include time_zone to sync. Remove streams for manager account. | +| `0.1.16` | 2021-11-22 | [8178](https://github.com/airbytehq/airbyte/pull/8178) | Clarify setup fields | +| `0.1.15` | 2021-10-07 | [6684](https://github.com/airbytehq/airbyte/pull/6684) | Add new stream `click_view` | +| `0.1.14` | 2021-10-01 | [6565](https://github.com/airbytehq/airbyte/pull/6565) | Fix OAuth Spec File | +| `0.1.13` | 2021-09-27 | [6458](https://github.com/airbytehq/airbyte/pull/6458) | Update OAuth Spec File | +| `0.1.11` | 2021-09-22 | [6373](https://github.com/airbytehq/airbyte/pull/6373) | Fix inconsistent segments.date field type across all streams | +| `0.1.10` | 2021-09-13 | [6022](https://github.com/airbytehq/airbyte/pull/6022) | Annotate Oauth2 flow initialization parameters in connector spec | +| `0.1.9` | 2021-09-07 | [5302](https://github.com/airbytehq/airbyte/pull/5302) | Add custom query stream support | +| `0.1.8` | 2021-08-03 | [5509](https://github.com/airbytehq/airbyte/pull/5509) | Allow additionalProperties in spec.json | +| `0.1.7` | 2021-08-03 | [5422](https://github.com/airbytehq/airbyte/pull/5422) | Correct query to not skip dates | +| `0.1.6` | 2021-08-03 | [5423](https://github.com/airbytehq/airbyte/pull/5423) | Added new stream UserLocationReport | +| `0.1.5` | 2021-08-03 | [5159](https://github.com/airbytehq/airbyte/pull/5159) | Add field `login_customer_id` to spec | +| `0.1.4` | 2021-07-28 | [4962](https://github.com/airbytehq/airbyte/pull/4962) | Support new Report streams | +| `0.1.3` | 2021-07-23 | [4788](https://github.com/airbytehq/airbyte/pull/4788) | Support main streams, fix bug with exception `DATE_RANGE_TOO_NARROW` for incremental streams | +| `0.1.2` | 2021-07-06 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | +| `0.1.1` | 2021-06-23 | [4288](https://github.com/airbytehq/airbyte/pull/4288) | Fix `Bugfix: Correctly declare required parameters` | From 8d82028a7056b1ffe62dfcda26659d06eccaf6a8 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 28 Dec 2023 15:30:55 +0200 Subject: [PATCH 4/8] Minor refactoring --- .../source_google_ads/google_ads.py | 18 +++++++++--------- .../source_google_ads/models.py | 8 ++++---- .../source_google_ads/streams.py | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py index 7230de6a470f..c34833154ce6 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py @@ -31,12 +31,12 @@ def __init__(self, credentials: MutableMapping[str, Any]): self.ga_services = {} self.credentials = credentials - self.clients["none"] = self.get_google_ads_client(credentials) - self.ga_services["none"] = self.clients["none"].get_service("GoogleAdsService") + self.clients["default"] = self.get_google_ads_client(credentials) + self.ga_services["default"] = self.clients["default"].get_service("GoogleAdsService") - self.customer_service = self.clients["none"].get_service("CustomerService") + self.customer_service = self.clients["default"].get_service("CustomerService") - def client(self, login_customer_id="none"): + def get_client(self, login_customer_id="default"): if login_customer_id in self.clients: return self.clients[login_customer_id] new_creds = self.credentials.copy() @@ -44,7 +44,7 @@ def client(self, login_customer_id="none"): self.clients[login_customer_id] = self.get_google_ads_client(new_creds) return self.clients[login_customer_id] - def ga_service(self, login_customer_id="none"): + def ga_service(self, login_customer_id="default"): if login_customer_id in self.ga_services: return self.ga_services[login_customer_id] self.ga_services[login_customer_id] = self.clients[login_customer_id].get_service("GoogleAdsService") @@ -74,8 +74,8 @@ def get_accessible_accounts(self): ), max_tries=5, ) - def send_request(self, query: str, customer_id: str, login_customer_id: str = "none") -> Iterator[SearchGoogleAdsResponse]: - client = self.client(login_customer_id) + def send_request(self, query: str, customer_id: str, login_customer_id: str = "default") -> Iterator[SearchGoogleAdsResponse]: + client = self.get_client(login_customer_id) search_request = client.get_type("SearchGoogleAdsRequest") search_request.query = query search_request.page_size = self.DEFAULT_PAGE_SIZE @@ -89,8 +89,8 @@ def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]: :return dict of fields type info. """ - ga_field_service = self.client().get_service("GoogleAdsFieldService") - request = self.client().get_type("SearchGoogleAdsFieldsRequest") + ga_field_service = self.get_client().get_service("GoogleAdsFieldService") + request = self.get_client().get_type("SearchGoogleAdsFieldsRequest") request.page_size = len(fields) fields_sql = ",".join([f"'{field}'" for field in fields]) request.query = f""" diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py index 975336398f82..bec8be475f93 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py @@ -18,17 +18,17 @@ class CustomerModel: login_customer_id: str = None @classmethod - def from_accounts(cls, accounts: Iterable[Mapping[str, Any]], table_name: str = "customer_client") -> Iterable["CustomerModel"]: + def from_accounts(cls, accounts: Iterable[Mapping[str, Any]]) -> Iterable["CustomerModel"]: data_objects = [] for account in accounts: - time_zone_name = account.get(f"{table_name}.time_zone") + time_zone_name = account.get("customer_client.time_zone") tz = Timezone(time_zone_name) if time_zone_name else "local" data_objects.append( cls( - id=str(account[f"{table_name}.id"]), + id=str(account["customer_client.id"]), time_zone=tz, - is_manager_account=bool(account.get(f"{table_name}.manager")), + is_manager_account=bool(account.get("customer_client.manager")), login_customer_id=account.get("login_customer_id"), ) ) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index 2d8f04fc28c6..d843771b82e2 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -284,7 +284,7 @@ def parse_response(self, response: SearchPager, stream_slice: Optional[Mapping[s # if the result is more than one customer, it's a manager, otherwise it is client account for which we don't need login_customer_id root_is_manager = len(records) > 1 for record in records: - record["login_customer_id"] = stream_slice["login_customer_id"] if root_is_manager else "none" + record["login_customer_id"] = stream_slice["login_customer_id"] if root_is_manager else "default" yield record From 15746f3da694fe57fae79b1e455a09507e244719 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 29 Dec 2023 03:07:36 +0200 Subject: [PATCH 5/8] Add test for get_customers method --- .../source_google_ads/models.py | 8 +- .../unit_tests/test_errors.py | 2 +- .../unit_tests/test_google_ads.py | 2 +- .../test_incremental_events_streams.py | 34 ++++---- .../unit_tests/test_models.py | 9 ++- .../unit_tests/test_source.py | 78 ++++++++++++++++++- .../unit_tests/test_streams.py | 4 +- 7 files changed, 109 insertions(+), 28 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py index bec8be475f93..7da4ed7c2b9c 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/models.py @@ -4,16 +4,16 @@ from dataclasses import dataclass -from typing import Any, Iterable, Mapping, Union +from typing import Any, Iterable, Mapping -from pendulum import timezone +from pendulum import local_timezone, timezone from pendulum.tz.timezone import Timezone @dataclass class CustomerModel: id: str - time_zone: Union[timezone, str] = "local" + time_zone: timezone = local_timezone() is_manager_account: bool = False login_customer_id: str = None @@ -22,7 +22,7 @@ def from_accounts(cls, accounts: Iterable[Mapping[str, Any]]) -> Iterable["Custo data_objects = [] for account in accounts: time_zone_name = account.get("customer_client.time_zone") - tz = Timezone(time_zone_name) if time_zone_name else "local" + tz = Timezone(time_zone_name) if time_zone_name else local_timezone() data_objects.append( cls( diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py index b82388b89268..9bf943bb145d 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_errors.py @@ -88,7 +88,7 @@ def test_read_record_error_handling(mocker, config, customers, cls, raise_expect context = pytest.raises(AirbyteTracedException) if raise_expected else does_not_raise() with context as exception: - for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890", "login_customer_id": "none"}): + for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890", "login_customer_id": "default"}): pass if raise_expected: diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py index f541fb8e9bb1..3f66564846f4 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py @@ -169,7 +169,7 @@ def test_get_fields_metadata(mocker): response = google_ads_client.get_fields_metadata(fields) # Get the mock service to check the request query - mock_service = google_ads_client.client().get_service("GoogleAdsFieldService") + mock_service = google_ads_client.get_client().get_service("GoogleAdsFieldService") # Assert the constructed request query expected_query = """ diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py index 3357d8dddfb9..8ddf8bd80fba 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_incremental_events_streams.py @@ -54,7 +54,7 @@ class MockGoogleAds(GoogleAds): def parse_single_result(self, schema, result): return result - def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "default"): if query == "query_parent": return mock_response_parent() else: @@ -64,7 +64,7 @@ def send_request(self, query: str, customer_id: str, login_customer_id: str = "n def test_change_status_stream(config, customers): """ """ customer_id = next(iter(customers)).id - stream_slice = {"customer_id": customer_id, "login_customer_id": "none"} + stream_slice = {"customer_id": customer_id, "login_customer_id": "default"} google_api = MockGoogleAds(credentials=config["credentials"]) @@ -78,7 +78,7 @@ def test_change_status_stream(config, customers): ) assert len(result) == 4 assert stream.get_query.call_count == 1 - stream.get_query.assert_called_with({"customer_id": customer_id, "login_customer_id": "none"}) + stream.get_query.assert_called_with({"customer_id": customer_id, "login_customer_id": "default"}) def test_child_incremental_events_read(config, customers): @@ -89,7 +89,7 @@ def test_child_incremental_events_read(config, customers): It shouldn't read records on 2021-01-01, 2021-01-02 """ customer_id = next(iter(customers)).id - parent_stream_slice = {"customer_id": customer_id, "resource_type": "CAMPAIGN_CRITERION", "login_customer_id": "none"} + parent_stream_slice = {"customer_id": customer_id, "resource_type": "CAMPAIGN_CRITERION", "login_customer_id": "default"} stream_state = {"change_status": {customer_id: {"change_status.last_change_date_time": "2023-08-16 13:20:01.003295"}}} google_api = MockGoogleAds(credentials=config["credentials"]) @@ -121,7 +121,7 @@ def test_child_incremental_events_read(config, customers): "3": "2023-06-13 12:36:03.772447", "4": "2023-06-13 12:36:04.772447", }, - "login_customer_id": "none", + "login_customer_id": "default", } ] @@ -222,7 +222,7 @@ class MockGoogleAdsLimit(GoogleAds): def parse_single_result(self, schema, result): return result - def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "default"): self.count += 1 if self.count == 1: return mock_response_1() @@ -260,7 +260,7 @@ def test_query_limit_hit(config, customers): "customer_id": customer_id, "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447", - "login_customer_id": "none", + "login_customer_id": "default", } google_api = MockGoogleAdsLimit(credentials=config["credentials"]) @@ -286,7 +286,7 @@ def test_query_limit_hit(config, customers): "customer_id": "123", "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447", - "login_customer_id": "none", + "login_customer_id": "default", } ), call( @@ -294,7 +294,7 @@ def test_query_limit_hit(config, customers): "customer_id": "123", "start_date": "2023-06-13 12:36:02.772447", "end_date": "2023-06-13 13:36:04.772447", - "login_customer_id": "none", + "login_customer_id": "default", } ), call( @@ -302,7 +302,7 @@ def test_query_limit_hit(config, customers): "customer_id": "123", "start_date": "2023-06-13 12:36:04.772447", "end_date": "2023-06-13 13:36:04.772447", - "login_customer_id": "none", + "login_customer_id": "default", } ), ] @@ -311,7 +311,7 @@ def test_query_limit_hit(config, customers): class MockGoogleAdsLimitException(MockGoogleAdsLimit): - def send_request(self, query: str, customer_id: str, login_customer_id: str = "none"): + def send_request(self, query: str, customer_id: str, login_customer_id: str = "default"): self.count += 1 if self.count == 1: return mock_response_1() @@ -333,7 +333,7 @@ def test_query_limit_hit_exception(config, customers): "customer_id": customer_id, "start_date": "2023-06-13 11:35:04.772447", "end_date": "2023-06-13 13:36:04.772447", - "login_customer_id": "none", + "login_customer_id": "default", } google_api = MockGoogleAdsLimitException(credentials=config["credentials"]) @@ -374,7 +374,7 @@ def test_change_status_get_query(mocker, config, customers): "start_date": "2023-01-01 00:00:00.000000", "end_date": "2023-09-19 00:00:00.000000", "resource_type": "SOME_RESOURCE_TYPE", - "login_customer_id": "none", + "login_customer_id": "default", } # Call the get_query method with the stream_slice @@ -435,7 +435,7 @@ def test_incremental_events_stream_get_query(mocker, config, customers): "customers/1234567890/adGroupCriteria/111111111111~4": "2023-09-18 08:56:59.165599", "customers/1234567890/adGroupCriteria/111111111111~5": "2023-09-18 08:56:59.165599", }, - "login_customer_id": "none", + "login_customer_id": "default", } # Call the get_query method with the stream_slice @@ -465,7 +465,7 @@ def test_read_records_with_slice_splitting(mocker, config): "record_changed_time_map": {i: f"time_{i}" for i in range(15000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), - "login_customer_id": "none", + "login_customer_id": "default", } # Create a mock instance of the CampaignCriterion stream @@ -490,14 +490,14 @@ def test_read_records_with_slice_splitting(mocker, config): "record_changed_time_map": {i: f"time_{i}" for i in range(10000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), - "login_customer_id": "none", + "login_customer_id": "default", } expected_second_slice = { "updated_ids": set(range(10000, 15000)), "record_changed_time_map": {i: f"time_{i}" for i in range(10000, 15000)}, "customer_id": "sample_customer_id", "deleted_ids": set(), - "login_customer_id": "none", + "login_customer_id": "default", } # Verify the arguments passed to the parent's read_records method for both calls diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py index 4c8970dcf211..7606a76bc7bf 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_models.py @@ -3,15 +3,20 @@ # +from unittest.mock import Mock + import pytest +from pendulum.tz.timezone import Timezone from source_google_ads.models import CustomerModel -def test_time_zone(): +def test_time_zone(mocker): + mocker.patch("source_google_ads.models.local_timezone", Mock(return_value=Timezone("Europe/Riga"))) + mock_account_info = [{"customer_client.id": "8765"}] customers = CustomerModel.from_accounts(mock_account_info) for customer in customers: - assert customer.time_zone == "local" + assert customer.time_zone.name == Timezone("Europe/Riga").name @pytest.mark.parametrize("is_manager_account", (True, False)) diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py index 3ae611df6ab4..6394817edd99 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py @@ -5,7 +5,7 @@ import re from collections import namedtuple -from unittest.mock import Mock +from unittest.mock import Mock, call import pendulum import pytest @@ -443,3 +443,79 @@ def test_stream_slices(config, customers): {"start_date": "2021-01-17", "end_date": "2021-01-31", "customer_id": "123", "login_customer_id": None}, {"start_date": "2021-02-01", "end_date": "2021-02-10", "customer_id": "123", "login_customer_id": None}, ] + + +def mock_send_request(query: str, customer_id: str, login_customer_id: str = "default"): + print(query, customer_id, login_customer_id) + if customer_id == "123": + if "WHERE customer_client.status in ('active')" in query: + return [ + [ + {"customer_client.id": "123", "customer_client.status": "active"}, + ] + ] + else: + return [ + [ + {"customer_client.id": "123", "customer_client.status": "active"}, + {"customer_client.id": "456", "customer_client.status": "disabled"}, + ] + ] + else: + return [ + [ + {"customer_client.id": "789", "customer_client.status": "active"}, + ] + ] + + +@pytest.mark.parametrize( + "customer_status_filter, expected_ids, send_request_calls", + [ + ( + [], + ["123", "456", "789"], + [ + call( + "SELECT customer_client.client_customer, customer_client.level, customer_client.id, customer_client.manager, customer_client.time_zone, customer_client.status FROM customer_client", + customer_id="123", + ), + call( + "SELECT customer_client.client_customer, customer_client.level, customer_client.id, customer_client.manager, customer_client.time_zone, customer_client.status FROM customer_client", + customer_id="789", + ), + ], + ), # Empty filter, expect all customers + ( + ["active"], + ["123", "789"], + [ + call( + "SELECT customer_client.client_customer, customer_client.level, customer_client.id, customer_client.manager, customer_client.time_zone, customer_client.status FROM customer_client WHERE customer_client.status in ('active')", + customer_id="123", + ), + call( + "SELECT customer_client.client_customer, customer_client.level, customer_client.id, customer_client.manager, customer_client.time_zone, customer_client.status FROM customer_client WHERE customer_client.status in ('active')", + customer_id="789", + ), + ], + ), # Non-empty filter, expect filtered customers + ], +) +def test_get_customers(mocker, customer_status_filter, expected_ids, send_request_calls): + mock_google_api = Mock() + + mock_google_api.get_accessible_accounts.return_value = ["123", "789"] + mock_google_api.send_request.side_effect = mock_send_request + mock_google_api.parse_single_result.side_effect = lambda schema, result: result + + mock_config = {"customer_status_filter": customer_status_filter, "customer_ids": ["123", "456", "789"]} + + source = SourceGoogleAds() + + customers = source.get_customers(mock_google_api, mock_config) + + mock_google_api.send_request.assert_has_calls(send_request_calls) + + assert len(customers) == len(expected_ids) + assert {customer.id for customer in customers} == set(expected_ids) diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py index c1cf3ea29ce4..8bfaf149ebf4 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py @@ -210,7 +210,7 @@ def test_retry_transient_errors(mocker, config, customers, error_cls): credentials = config["credentials"] credentials.update(use_proto_plus=True) api = GoogleAds(credentials=credentials) - mocked_search = mocker.patch.object(api.ga_services["none"], "search", side_effect=error_cls("Error message")) + mocked_search = mocker.patch.object(api.ga_services["default"], "search", side_effect=error_cls("Error message")) incremental_stream_config = dict( api=api, conversion_window_days=config["conversion_window_days"], @@ -219,7 +219,7 @@ def test_retry_transient_errors(mocker, config, customers, error_cls): customers=customers, ) stream = ClickView(**incremental_stream_config) - stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": "none"} + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04", "login_customer_id": "default"} records = [] with pytest.raises(error_cls) as exception: records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) From 99ea4f6d79ebf9f61e1b236f459c84cff1eb6a0e Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 9 Jan 2024 20:33:21 +0200 Subject: [PATCH 6/8] Fix merge conflicts --- .../source-google-ads/source_google_ads/source.py | 1 + .../source-google-ads/unit_tests/test_streams.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index 02835088d284..9378c58c4ed5 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -34,6 +34,7 @@ CampaignLabel, ClickView, Customer, + CustomerClient, CustomerLabel, DisplayKeywordView, GeographicView, diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py index 8bfaf149ebf4..3323a6811a23 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py @@ -282,7 +282,8 @@ def test_read_records_unauthenticated(mocker, customers, config): ) stream = CustomerLabel(**stream_config) with pytest.raises(AirbyteTracedException) as exc_info: - list(stream.read_records(SyncMode.full_refresh, {"customer_id": "customer_id"})) + list(stream.read_records(SyncMode.full_refresh, {"customer_id": "customer_id", "login_customer_id": "default"})) - assert exc_info.value.message == ("Authentication failed for the customer 'customer_id'. " - "Please try to Re-authenticate your credentials on set up Google Ads page.") + assert exc_info.value.message == ( + "Authentication failed for the customer 'customer_id'. " "Please try to Re-authenticate your credentials on set up Google Ads page." + ) From a204c4c8605b9ee0dc95272d568b2415156d830f Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 9 Jan 2024 21:04:24 +0200 Subject: [PATCH 7/8] Update version --- airbyte-integrations/connectors/source-google-ads/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-google-ads/metadata.yaml b/airbyte-integrations/connectors/source-google-ads/metadata.yaml index f77549067f96..50a62c4282e3 100644 --- a/airbyte-integrations/connectors/source-google-ads/metadata.yaml +++ b/airbyte-integrations/connectors/source-google-ads/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50 - dockerImageTag: 3.1.0 + dockerImageTag: 3.2.0 dockerRepository: airbyte/source-google-ads documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads githubIssueLabel: source-google-ads From b4f3d5252475188ce0038ab0f0a760bb275ad502 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 11 Jan 2024 21:05:49 +0200 Subject: [PATCH 8/8] Small fix for check command --- .../connectors/source-google-ads/source_google_ads/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index 9378c58c4ed5..2402cd18adbe 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -206,7 +206,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> query = IncrementalCustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01") query = query.set_limit(1) - response = google_api.send_request(str(query), customer_id=customer.id) + response = google_api.send_request(str(query), customer_id=customer.id, login_customer_id=customer.login_customer_id) # iterate over the response otherwise exceptions will not be raised! for _ in response: pass