Skip to content

Commit

Permalink
FB OAuth switch
Browse files Browse the repository at this point in the history
  • Loading branch information
cristina.mariscal authored and cristina.mariscal committed May 16, 2024
1 parent b60755a commit dba3748
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,72 @@ def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config[stream_filter] = statuses
# return transformed config
return config


class MigrateSecretsPathInConnector:
"""
This class stands for migrating the config at runtime.
This migration is intended for backwards compatibility with the previous version, so existing secrets configurations gets migrated to new path.
Starting from `X.X.X`, the `client_id`, `client_secret` and `access_token` will be placed at `credentials` path.
"""

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to nest existing fields at credentials.
It is assumed if credentials does not exist on configuration, `client_id`, `client_secret` and `access_token` exists on root path.
Returns:
> True, if the migration is necessary
> False, otherwise.
"""
credentials = config.get("credentials", None)
return credentials is None or not all(key in credentials for key in ["client_id", "client_secret", "access_token"])

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)

@classmethod
def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
# transform the config
config["credentials"] = {}
if "client_id" in config:
config["credentials"]["client_id"] = config.pop("client_id")
if "client_secret" in config:
config["credentials"]["client_secret"] = config.pop("client_secret")
if "access_token" in config:
config["credentials"]["access_token"] = config.pop("access_token")
# return transformed config
return config

@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls.transform(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

from airbyte_cdk.entrypoint import launch

from .config_migrations import MigrateAccountIdToArray, MigrateIncludeDeletedToStatusFilters
from .config_migrations import MigrateAccountIdToArray, MigrateIncludeDeletedToStatusFilters, MigrateSecretsPathInConnector
from .source import SourceFacebookMarketing


def run():
source = SourceFacebookMarketing()
MigrateAccountIdToArray.migrate(sys.argv[1:], source)
MigrateIncludeDeletedToStatusFilters.migrate(sys.argv[1:], source)
MigrateSecretsPathInConnector.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if config.start_date and config.end_date < config.start_date:
return False, "End date must be equal or after start date."

if config.credentials.auth_type == 'Client':
api = API(access_token=config.credentials.refresh_token, page_size=config.page_size)
if config.credentials.auth_type == 'Service':
if config.credentials.auth_type == "Client":
api = API(access_token=config.credentials.access_token, page_size=config.page_size)
if config.credentials.auth_type == "Service":
api = API(access_token=config.credentials.service_account_info, page_size=config.page_size)

for account_id in config.account_ids:
Expand Down Expand Up @@ -132,9 +132,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
config.start_date = validate_start_date(config.start_date)
config.end_date = validate_end_date(config.start_date, config.end_date)

if config.credentials.auth_type == 'Client':
api = API(access_token=config.credentials.refresh_token, page_size=config.page_size)
if config.credentials.auth_type == 'Service':
if config.credentials.auth_type == "Client":
api = API(access_token=config.credentials.access_token, page_size=config.page_size)
if config.credentials.auth_type == "Service":
api = API(access_token=config.credentials.service_account_info, page_size=config.page_size)

# if start_date not specified then set default start_date for report streams to 2 years ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import List, Optional, Set, Literal, Union
from typing import List, Literal, Optional, Set, Union

from airbyte_cdk.sources.config import BaseConfig
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
Expand Down Expand Up @@ -44,11 +44,6 @@ class Config(OneOfOptionConfig):
description="Client Secret for the Facebook Marketing API",
airbyte_secret=True,
)
refresh_token: str = Field(
title="Refresh Token",
description="Refresh Token for the Facebook Marketing API",
airbyte_secret=True,
)


class ServiceAccountCredentials(BaseModel):
Expand All @@ -60,12 +55,13 @@ class Config(OneOfOptionConfig):
service_account_info: str = Field(
title="Service Account Information",
description="The value of the generated access token. "
'From your App’s Dashboard, click on "Marketing API" then "Tools". '
'Select permissions <b>ads_management, ads_read, read_insights, business_management</b>. Then click on "Get token". '
'See the <a href="https://docs.airbyte.com/integrations/sources/facebook-marketing">docs</a> for more information.',
'From your App’s Dashboard, click on "Marketing API" then "Tools". '
'Select permissions <b>ads_management, ads_read, read_insights, business_management</b>. Then click on "Get token". '
'See the <a href="https://docs.airbyte.com/integrations/sources/facebook-marketing">docs</a> for more information.',
airbyte_secret=True,
)


class InsightConfig(BaseModel):
"""Config for custom insights"""

Expand Down Expand Up @@ -181,7 +177,7 @@ class Config:
title="Authentication",
description="Credentials for connecting to the Facebook Marketing API",
discriminator="auth_type",
type="object"
type="object",
)

start_date: Optional[datetime] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from airbyte_cdk.test.mock_http.request import HttpRequest

from .config import SERVICE_ACCOUNT_INFO, ACCOUNT_ID
from .config import ACCOUNT_ID, SERVICE_ACCOUNT_INFO


def get_account_request(account_id: Optional[str] = ACCOUNT_ID) -> RequestBuilder:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airbyte_protocol.models import AirbyteStateMessage, StreamDescriptor, SyncMode
from source_facebook_marketing.streams.async_job import Status

from .config import SERVICE_ACCOUNT_INFO, ACCOUNT_ID, DATE_FORMAT, END_DATE, NOW, START_DATE, ConfigBuilder
from .config import ACCOUNT_ID, DATE_FORMAT, END_DATE, NOW, SERVICE_ACCOUNT_INFO, START_DATE, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, FacebookMarketingPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import build_response, error_reduce_amount_of_data_response, get_account_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import AirbyteStateMessage, SyncMode

from .config import SERVICE_ACCOUNT_INFO, ACCOUNT_ID, NOW, ConfigBuilder
from .config import ACCOUNT_ID, NOW, SERVICE_ACCOUNT_INFO, ConfigBuilder
from .pagination import NEXT_PAGE_TOKEN, FacebookMarketingPaginationStrategy
from .request_builder import RequestBuilder, get_account_request
from .response_builder import error_reduce_amount_of_data_response, get_account_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
import pytest
from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
from source_facebook_marketing.config_migrations import MigrateAccountIdToArray, MigrateIncludeDeletedToStatusFilters
from source_facebook_marketing.config_migrations import (
MigrateAccountIdToArray,
MigrateIncludeDeletedToStatusFilters,
MigrateSecretsPathInConnector,
)
from source_facebook_marketing.source import SourceFacebookMarketing

# BASE ARGS
Expand All @@ -19,6 +23,7 @@
_EXCLUDE_DELETE_CONFIGS_PATH = "test_migrations/include_deleted_to_status_filters/include_deleted_false"
_INCLUDE_DELETE_CONFIGS_PATH = "test_migrations/include_deleted_to_status_filters/include_deleted_true"
_ACCOUNT_ID_TO_ARRAY_CONFIGS_PATH = "test_migrations/account_id_to_array"
_SECRETS_TO_CREDENTIALS_CONFIGS_PATH = "test_migrations/secrets_to_credentials"


def load_config(config_path: str) -> Mapping[str, Any]:
Expand Down Expand Up @@ -162,3 +167,71 @@ def test_should_not_migrate_upgraded_config(self):
new_config = load_config(self.UPGRADED_TEST_CONFIG_PATH)
migration_instance = MigrateIncludeDeletedToStatusFilters()
assert not migration_instance.should_migrate(new_config)

class TestMigrateSecretsPathInConnector:
OLD_TEST_CONFIG_PATH_ACCESS_TOKEN = _config_path(f"{_SECRETS_TO_CREDENTIALS_CONFIGS_PATH}/test_old_access_token_config.json")
NEW_TEST_CONFIG_PATH_ACCESS_TOKEN = _config_path(f"{_SECRETS_TO_CREDENTIALS_CONFIGS_PATH}/test_new_access_token_config.json")
OLD_TEST_CONFIG_PATH_ACCESS_TOKEN = _config_path(f"{_SECRETS_TO_CREDENTIALS_CONFIGS_PATH}/test_old_client_config.json")
NEW_TEST_CONFIG_PATH_ACCESS_TOKEN = _config_path(f"{_SECRETS_TO_CREDENTIALS_CONFIGS_PATH}/test_new_client_config.json")

@staticmethod
def revert_migration(config_path: str = OLD_TEST_CONFIG_PATH_ACCESS_TOKEN) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
credentials = config.pop("credentials")
with open(config_path, "w") as updated_config:
config = json.dumps({**config, **credentials})
updated_config.write(config)

def test_migrate_access_token_config(self):
migration_instance = MigrateSecretsPathInConnector()
original_config = load_config(self.OLD_TEST_CONFIG_PATH_ACCESS_TOKEN)
# migrate the test_config
migration_instance.migrate([CMD, "--config", self.OLD_TEST_CONFIG_PATH_ACCESS_TOKEN], SOURCE)
# load the updated config
test_migrated_config = load_config(self.OLD_TEST_CONFIG_PATH_ACCESS_TOKEN)
# check migrated property
assert "credentials" in test_migrated_config
assert isinstance(test_migrated_config["credentials"], dict)
credentials = test_migrated_config["credentials"]
assert "access_token" in credentials
# check the migration should be skipped, once already done
assert not migration_instance.should_migrate(test_migrated_config)
# load the old custom reports VS migrated
assert original_config["access_token"] == credentials["access_token"]
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# revert the test_config to the starting point
self.revert_migration()

def test_migrate_client_config(self):
migration_instance = MigrateSecretsPathInConnector()
original_config = load_config(self.OLD_TEST_CONFIG_PATH_CLIENT)
# migrate the test_config
migration_instance.migrate([CMD, "--config", self.OLD_TEST_CONFIG_PATH_CLIENT], SOURCE)
# load the updated config
test_migrated_config = load_config(self.OLD_TEST_CONFIG_PATH_CLIENT)
# check migrated property
assert "credentials" in test_migrated_config
assert isinstance(test_migrated_config["credentials"], dict)
credentials = test_migrated_config["credentials"]
assert "client_id" in credentials
assert "client_secret" in credentials
# check the migration should be skipped, once already done
assert not migration_instance.should_migrate(test_migrated_config)
# load the old custom reports VS migrated
assert original_config["client_id"] == credentials["client_id"]
assert original_config["client_secret"] == credentials["client_secret"]
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# revert the test_config to the starting point
self.revert_migration()

def test_should_not_migrate_new_config(self):
new_config = load_config(self.NEW_TEST_CONFIG_PATH_CLIENT)
migration_instance = MigrateSecretsPathInConnector()
assert not migration_instance.should_migrate(new_config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"account_ids": ["01234567890"],
"credentials": {
"access_token": "access_token"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"account_ids": ["01234567890"],
"credentials": {
"access_token": "access_token"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"account_id": "01234567890",
"access_token": "access_token"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"start_date": "2021-02-08T00:00:00Z",
"end_date": "2021-02-15T00:00:00Z",
"custom_insights": [
{
"name": "custom_insight_stream",
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
"breakdowns": ["gender"],
"action_breakdowns": []
}
],
"account_id": "01234567890",
"access_token": "access_token"
}

0 comments on commit dba3748

Please sign in to comment.