Skip to content

Commit

Permalink
Source GitHub: migrate repo and branches to array in spec (#31056)
Browse files Browse the repository at this point in the history
Co-authored-by: artem1205 <[email protected]>
  • Loading branch information
artem1205 and artem1205 authored Oct 4, 2023
1 parent b819b19 commit 747f152
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 57 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.4.5
LABEL io.airbyte.version=1.4.6
LABEL io.airbyte.name=airbyte/source-github
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/source-github/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

from airbyte_cdk.entrypoint import launch
from source_github import SourceGithub
from source_github.config_migrations import MigrateBranch, MigrateRepository

if __name__ == "__main__":
source = SourceGithub()
MigrateRepository.migrate(sys.argv[1:], source)
MigrateBranch.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.4.5
dockerImageTag: 1.4.6
maxSecondsBetweenMessages: 5400
dockerRepository: airbyte/source-github
githubIssueLabel: source-github
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import abc
import logging
from abc import ABC
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository

from .source import SourceGithub

logger = logging.getLogger("airbyte_logger")


class MigrateStringToArray(ABC):
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Specifically, starting from `1.4.6`, the `repository` and `branch` properties should be like :
> List(["<repository_1>", "<repository_2>", ..., "<repository_n>"])
instead of, in `1.4.5`:
> JSON STR: "repository_1 repository_2"
"""

message_repository: MessageRepository = InMemoryMessageRepository()

@property
@abc.abstractmethod
def migrate_from_key(self) -> str:
...

@property
@abc.abstractmethod
def migrate_to_key(self) -> str:
...

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config require migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
if cls.migrate_from_key in config and cls.migrate_to_key not in config:
return True
return False

@classmethod
def _transform_to_array(cls, config: Mapping[str, Any], source: SourceGithub = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_to_key] if cls.migrate_to_key in config else []
data = set(filter(None, config.get(cls.migrate_from_key).split(" ")))
config[cls.migrate_to_key] = list(data | set(config[cls.migrate_to_key]))
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceGithub, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_to_array(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: SourceGithub) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(
cls._modify_and_save(config_path, source, config),
)


class MigrateRepository(MigrateStringToArray):

migrate_from_key: str = "repository"
migrate_to_key: str = "repositories"


class MigrateBranch(MigrateStringToArray):

migrate_from_key: str = "branch"
migrate_to_key: str = "branches"
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from os import getenv
from typing import Any, Dict, List, Mapping, MutableMapping, Set, Tuple
from typing import Any, Dict, List, Mapping, MutableMapping, Tuple
from urllib.parse import urlparse

from airbyte_cdk import AirbyteLogger
Expand Down Expand Up @@ -61,27 +61,15 @@


class SourceGithub(AbstractSource):
@staticmethod
def _get_and_prepare_repositories_config(config: Mapping[str, Any]) -> Set[str]:
"""
_get_and_prepare_repositories_config gets set of repositories names from config and removes simple errors that user could provide
Args:
config: Dict representing connector's config
Returns:
set of provided repositories
"""
config_repositories = set(filter(None, config["repository"].split(" ")))
return config_repositories

@staticmethod
def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
"""
Parse config.repository and produce two lists: organizations, repositories.
Parse config/repositories and produce two lists: organizations, repositories.
Args:
config (dict): Dict representing connector's config
authenticator(MultipleTokenAuthenticator): authenticator object
"""
config_repositories = SourceGithub._get_and_prepare_repositories_config(config)
config_repositories = set(config.get("repositories"))

repositories = set()
organizations = set()
Expand Down Expand Up @@ -144,6 +132,12 @@ def _get_authenticator(self, config: Mapping[str, Any]):
)
return MultipleTokenAuthenticator(tokens=tokens, auth_method="token")

def _validate_and_transform_config(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config = self._ensure_default_values(config)
config = self._validate_repositories(config)
config = self._validate_branches(config)
return config

def _ensure_default_values(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config.setdefault("api_url", "https://api.github.com")
api_url_parsed = urlparse(config["api_url"])
Expand All @@ -159,13 +153,31 @@ def _ensure_default_values(self, config: MutableMapping[str, Any]) -> MutableMap

raise AirbyteTracedException(message=message, failure_type=FailureType.config_error)

def _validate_repositories(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
if config.get("repositories"):
pass
elif config.get("repository"):
config["repositories"] = set(filter(None, config["repository"].split(" ")))

return config

def _validate_branches(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
if config.get("branches"):
pass
elif config.get("branch"):
config["branches"] = set(filter(None, config["branch"].split(" ")))

return config

@staticmethod
def _is_http_allowed() -> bool:
return getenv("DEPLOYMENT_MODE", "").upper() != "CLOUD"

@staticmethod
def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any] = None) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
selected_branches = set(filter(None, selected_branches.split(" ")))
def _get_branches_data(
selected_branches: List, full_refresh_args: Dict[str, Any] = None
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
selected_branches = set(selected_branches)

# Get the default branch for each repository
default_branches = {}
Expand Down Expand Up @@ -218,7 +230,7 @@ def user_friendly_error_message(self, message: str) -> str:
return user_message

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
config = self._ensure_default_values(config)
config = self._validate_and_transform_config(config)
try:
authenticator = self._get_authenticator(config)
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
Expand All @@ -236,7 +248,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self._get_authenticator(config)
config = self._ensure_default_values(config)
config = self._validate_and_transform_config(config)
try:
organizations, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
except Exception as e:
Expand Down Expand Up @@ -283,7 +295,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
repository_args_with_start_date = {**repository_args, "start_date": start_date}

default_branches, branches_to_pull = self._get_branches_data(config.get("branch", ""), repository_args)
default_branches, branches_to_pull = self._get_branches_data(config.get("branch", []), repository_args)
pull_requests_stream = PullRequests(**repository_args_with_start_date)
projects_stream = Projects(**repository_args_with_start_date)
project_columns_stream = ProjectColumns(projects_stream, **repository_args_with_start_date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GitHub Source Spec",
"type": "object",
"required": ["repository"],
"required": ["credentials", "repositories"],
"additionalProperties": true,
"properties": {
"credentials": {
Expand Down Expand Up @@ -72,17 +72,35 @@
"airbytehq/airbyte"
],
"title": "GitHub Repositories",
"description": "Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"order": 1,
"description": "(DEPRCATED) Space-delimited list of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"airbyte_hidden": true,
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$",
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"repositories": {
"type": "array",
"items": {
"type": "string",
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$"
},
"minItems": 1,
"examples": [
"airbytehq/airbyte airbytehq/another-repo",
"airbytehq/*",
"airbytehq/airbyte"
],
"title": "GitHub Repositories",
"description": "List of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
"order": 1,
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"start_date": {
"type": "string",
"title": "Start date",
"description": "The date from which you'd like to replicate data from GitHub in the format YYYY-MM-DDT00:00:00Z. If the date is not set, all data will be replicated. For the streams which support this configuration, only data generated on or after the start date will be replicated. This field doesn't apply to all streams, see the <a href=\"https://docs.airbyte.com/integrations/sources/github\">docs</a> for more info",
"examples": ["2021-03-01T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:mm:ssZ",
"order": 2,
"format": "date-time"
},
Expand All @@ -98,7 +116,18 @@
"type": "string",
"title": "Branch",
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
"description": "Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"description": "(DEPRCATED) Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"airbyte_hidden": true,
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"
},
"branches": {
"type": "array",
"items": {
"type": "string"
},
"title": "Branches",
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
"description": "List of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"order": 4,
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"credentials": {
"personal_access_token": "personal_access_token"
},
"repository": "airbytehq/airbyte airbytehq/airbyte-platform",
"start_date": "2000-01-01T00:00:00Z",
"branch": "airbytehq/airbyte/master airbytehq/airbyte-platform/main"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import json
import os
from typing import Any, Mapping

from airbyte_cdk.models import OrchestratorType, Type
from airbyte_cdk.sources import Source
from source_github.config_migrations import MigrateBranch, MigrateRepository
from source_github.source import SourceGithub

# BASE ARGS
CMD = "check"
TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_config.json"
NEW_TEST_CONFIG_PATH = f"{os.path.dirname(__file__)}/test_new_config.json"
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
SOURCE: Source = SourceGithub()


# HELPERS
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
with open(config_path, "r") as test_config:
config = json.load(test_config)
config.pop("repositories")
with open(config_path, "w") as updated_config:
config = json.dumps(config)
updated_config.write(config)


def test_migrate_config():
migration_instance = MigrateRepository
# migrate the test_config
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
# load the updated config
test_migrated_config = load_config()
# check migrated property
assert "repositories" in test_migrated_config
assert isinstance(test_migrated_config["repositories"], list)
# check the old property is in place
assert "repository" in test_migrated_config
assert isinstance(test_migrated_config["repository"], str)
# test CONTROL MESSAGE was emitted
control_msg = migration_instance.message_repository._message_queue[0]
assert control_msg.type == Type.CONTROL
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
# new repositories is of type(list)
assert isinstance(control_msg.control.connectorConfig.config["repositories"], list)
# check the migrated values
revert_migration()


def test_config_is_reverted():
# check the test_config state, it has to be the same as before tests
test_config = load_config()
# check the config no longer has the migrated property
assert "repositories" not in test_config
assert "branches" not in test_config
# check the old property is still there
assert "repository" in test_config
assert "branch" in test_config
assert isinstance(test_config["repository"], str)
assert isinstance(test_config["branch"], str)


def test_should_not_migrate_new_config():
new_config = load_config(NEW_TEST_CONFIG_PATH)
for instance in MigrateBranch, MigrateRepository:
assert not instance._should_migrate(new_config)
Loading

0 comments on commit 747f152

Please sign in to comment.