Skip to content

Commit

Permalink
✨Source Google Analytics (GAv4): change start date to optional; add s…
Browse files Browse the repository at this point in the history
…uggested streams and update error… (#30417)
  • Loading branch information
lazebnyi authored Sep 15, 2023
1 parent cad23ff commit a984b75
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.3.1
LABEL io.airbyte.version=1.4.0
LABEL io.airbyte.name=airbyte/source-google-analytics-data-api
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerImageTag: 1.3.1
dockerImageTag: 1.4.0
dockerRepository: airbyte/source-google-analytics-data-api
githubIssueLabel: source-google-analytics-data-api
icon: google-analytics.svg
Expand All @@ -19,6 +19,17 @@ data:
oss:
enabled: true
releaseStage: generally_available
suggestedStreams:
streams:
- website_overview
- daily_active_users
- traffic_sources
- pages
- weekly_active_users
- devices
- locations
- four_weekly_active_users
- sessions
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-data-api
tags:
- language:python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import dpath
import jsonschema
import pendulum
import requests
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.sources import AbstractSource
Expand Down Expand Up @@ -52,7 +53,19 @@ def __init__(self):
def __get__(self, instance, owner):
if not self._metadata:
stream = GoogleAnalyticsDataApiMetadataStream(config=instance.config, authenticator=instance.config["authenticator"])
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)

try:
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
except HTTPError as e:
if e.response.status_code == HTTPStatus.UNAUTHORIZED:
internal_message = "Unauthorized error reached."
message = "Can not get metadata with unauthorized credentials. Try to re-authenticate in source settings."

unauthorized_error = AirbyteTracedException(
message=message, internal_message=internal_message, failure_type=FailureType.config_error
)
raise unauthorized_error

if not metadata:
raise Exception("failed to get metadata, over quota, try later")
self._metadata = {
Expand Down Expand Up @@ -156,7 +169,10 @@ def get_json_schema(self) -> Mapping[str, Any]:

schema["properties"].update(
{
d: {"type": get_dimensions_type(d), "description": self.metadata["dimensions"].get(d, {}).get("description", d)}
d.replace(":", "_"): {
"type": get_dimensions_type(d),
"description": self.metadata["dimensions"].get(d, {}).get("description", d),
}
for d in self.config["dimensions"]
}
)
Expand All @@ -171,7 +187,7 @@ def get_json_schema(self) -> Mapping[str, Any]:

schema["properties"].update(
{
m: {
m.replace(":", "_"): {
"type": ["null", get_metrics_type(self.metadata["metrics"].get(m, {}).get("type"))],
"description": self.metadata["metrics"].get(m, {}).get("description", m),
}
Expand Down Expand Up @@ -213,9 +229,9 @@ def parse_response(
) -> Iterable[Mapping]:
r = response.json()

dimensions = [h.get("name") for h in r.get("dimensionHeaders", [{}])]
metrics = [h.get("name") for h in r.get("metricHeaders", [{}])]
metrics_type_map = {h.get("name"): h.get("type") for h in r.get("metricHeaders", [{}])}
dimensions = [h.get("name").replace(":", "_") for h in r.get("dimensionHeaders", [{}])]
metrics = [h.get("name").replace(":", "_") for h in r.get("metricHeaders", [{}])]
metrics_type_map = {h.get("name").replace(":", "_"): h.get("type") for h in r.get("metricHeaders", [{}])}

for row in r.get("rows", []):
record = {
Expand Down Expand Up @@ -252,7 +268,6 @@ def request_body_json(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:

payload = {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
Expand All @@ -268,7 +283,6 @@ def request_body_json(
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

today: datetime.date = datetime.date.today()

start_date = stream_state and stream_state.get(self.cursor_field)
Expand Down Expand Up @@ -358,6 +372,21 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp


class SourceGoogleAnalyticsDataApi(AbstractSource):
@property
def default_date_ranges_start_date(self) -> str:
# set default date ranges start date to 2 years ago
return pendulum.now(tz="UTC").subtract(years=2).format("YYYY-MM-DD")

def _validate_and_transform_start_date(self, start_date: str) -> datetime.date:
start_date = self.default_date_ranges_start_date if not start_date else start_date

try:
start_date = utils.string_to_date(start_date)
except ValueError as e:
raise ConfigurationError(str(e))

return start_date

def _validate_and_transform(self, config: Mapping[str, Any], report_names: Set[str]):
if "custom_reports" in config:
if isinstance(config["custom_reports"], str):
Expand Down Expand Up @@ -396,10 +425,7 @@ def _validate_and_transform(self, config: Mapping[str, Any], report_names: Set[s
except ValueError:
raise ConfigurationError("credentials.credentials_json is not valid JSON")

try:
config["date_ranges_start_date"] = utils.string_to_date(config["date_ranges_start_date"])
except ValueError as e:
raise ConfigurationError(str(e))
config["date_ranges_start_date"] = self._validate_and_transform_start_date(config.get("date_ranges_start_date"))

if not config.get("window_in_days"):
source_spec = self.spec(logging.getLogger("airbyte"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "https://json-schema.org/draft-07/schema#",
"title": "Google Analytics (Data API) Spec",
"type": "object",
"required": ["property_ids", "date_ranges_start_date"],
"required": ["property_ids"],
"additionalProperties": true,
"properties": {
"credentials": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import datetime
import json
from copy import deepcopy

import pytest

# json credentials with fake private key
json_credentials = """
{
"type": "service_account",
"project_id": "unittest-project-id",
"private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c",
"private_key": "-----BEGIN PRIVATE KEY-----\\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\\nzdTN1IwwQqjI\\n-----END PRIVATE KEY-----\\n",
"client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com",
"client_id": "213243192021686092537",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-analytics-access%40unittest-project-id.iam.gserviceaccount.com"
}
"""


@pytest.fixture
def one_year_ago():
return datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d")


@pytest.fixture
def config(one_year_ago):
return {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": one_year_ago,
"dimensions": ["date", "deviceCategory", "operatingSystem", "browser"],
"metrics": [
"totalUsers",
"newUsers",
"sessions",
"sessionsPerUser",
"averageSessionDuration",
"screenPageViews",
"screenPageViewsPerSession",
"bounceRate",
],
"custom_reports": json.dumps([{
"name": "report1",
"dimensions": ["date", "browser"],
"metrics": ["totalUsers", "sessions", "screenPageViews"],
}]),
}


@pytest.fixture
def config_without_date_range():
return {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"custom_reports": [],
}


@pytest.fixture
def patch_base_class(one_year_ago, config_without_date_range):
return {"config": config_without_date_range}


@pytest.fixture
def config_gen(config):
def inner(**kwargs):
new_config = deepcopy(config)
# WARNING, no support deep dictionaries
new_config.update(kwargs)
return {k: v for k, v in new_config.items() if v is not ...}

return inner
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
import json
from copy import deepcopy
from unittest.mock import MagicMock

import pytest
Expand All @@ -13,57 +10,6 @@
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi
from source_google_analytics_data_api.utils import NO_DIMENSIONS, NO_METRICS, NO_NAME, WRONG_JSON_SYNTAX

json_credentials = """
{
"type": "service_account",
"project_id": "unittest-project-id",
"private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c",
"private_key": "-----BEGIN PRIVATE KEY-----\\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\\nzdTN1IwwQqjI\\n-----END PRIVATE KEY-----\\n",
"client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com",
"client_id": "213243192021686092537",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-analytics-access%40unittest-project-id.iam.gserviceaccount.com"
}
"""


@pytest.fixture
def patch_base_class():
return {
"config": {
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"),
}
}


@pytest.fixture
def config():
return {
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"),
"custom_reports": json.dumps([{
"name": "report1",
"dimensions": ["date", "country"],
"metrics": ["totalUsers", "screenPageViews"]
}]),
}


@pytest.fixture
def config_gen(config):
def inner(**kwargs):
new_config = deepcopy(config)
# WARNING, no support deep dictionaries
new_config.update(kwargs)
return {k: v for k, v in new_config.items() if v is not ...}

return inner


@pytest.mark.parametrize(
"config_values, is_successful, message",
Expand Down
Loading

0 comments on commit a984b75

Please sign in to comment.