Skip to content

Commit

Permalink
Source GitHub: support self hosted instances (#30647)
Browse files Browse the repository at this point in the history
Co-authored-by: artem1205 <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
3 people authored Sep 22, 2023
1 parent c5afc6c commit 32c08d7
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 70 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.1.1
LABEL io.airbyte.version=1.2.0
LABEL io.airbyte.name=airbyte/source-github
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-github/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
data:
allowedHosts:
hosts:
- api.github.com
- "${api_url}"
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.1.1
dockerImageTag: 1.2.0
maxSecondsBetweenMessages: 5400
dockerRepository: airbyte/source-github
githubIssueLabel: source-github
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, List, Mapping, Set, Tuple
from os import getenv
from typing import Any, Dict, List, Mapping, MutableMapping, Set, Tuple
from urllib.parse import urlparse

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import FailureType, SyncMode
Expand Down Expand Up @@ -92,7 +94,7 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke
unchecked_repos.add(org_repos)

if unchecked_orgs:
stream = Repositories(authenticator=authenticator, organizations=unchecked_orgs)
stream = Repositories(authenticator=authenticator, organizations=unchecked_orgs, api_url=config.get("api_url"))
for record in read_full_refresh(stream):
repositories.add(record["full_name"])
organizations.add(record["organization"])
Expand All @@ -102,6 +104,7 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke
stream = RepositoryStats(
authenticator=authenticator,
repositories=unchecked_repos,
api_url=config.get("api_url"),
# This parameter is deprecated and in future will be used sane default, page_size: 10
page_size_for_large_streams=config.get("page_size_for_large_streams", constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM),
)
Expand Down Expand Up @@ -139,6 +142,25 @@ def _get_authenticator(self, config: Mapping[str, Any]):
)
return MultipleTokenAuthenticator(tokens=tokens, auth_method="token")

def _ensure_default_values(self, config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
config.setdefault("api_url", "https://api.github.com")
api_url_parsed = urlparse(config["api_url"])

if not api_url_parsed.scheme.startswith("http"):
message = "Please enter a full URL starting with http..."
elif api_url_parsed.scheme == "http" and not self._is_http_allowed():
message = "HTTP connection is insecure and is not allowed in this environment. Please use `https` instead."
elif not api_url_parsed.netloc:
message = "Please provide a correct URL"
else:
return config

raise AirbyteTracedException(message=message, failure_type=FailureType.config_error)

@staticmethod
def _is_http_allowed() -> bool:
return getenv("DEPLOYMENT_MODE", "").upper() != "CLOUD"

@staticmethod
def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any] = None) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
selected_branches = set(filter(None, selected_branches.split(" ")))
Expand Down Expand Up @@ -194,6 +216,7 @@ def user_friendly_error_message(self, message: str) -> str:
return user_message

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
config = self._ensure_default_values(config)
try:
authenticator = self._get_authenticator(config)
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
Expand All @@ -211,6 +234,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self._get_authenticator(config)
config = self._ensure_default_values(config)
try:
organizations, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
except Exception as e:
Expand Down Expand Up @@ -238,11 +262,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
page_size = config.get("page_size_for_large_streams", constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM)
access_token_type, _ = self.get_access_token(config)

organization_args = {"authenticator": authenticator, "organizations": organizations, "access_token_type": access_token_type}
organization_args = {
"authenticator": authenticator,
"organizations": organizations,
"api_url": config.get("api_url"),
"access_token_type": access_token_type,
}
organization_args_with_start_date = {**organization_args, "start_date": config["start_date"]}

repository_args = {
"authenticator": authenticator,
"api_url": config.get("api_url"),
"repositories": repositories,
"page_size_for_large_streams": page_size,
"access_token_type": access_token_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,28 @@
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$",
"pattern_descriptor": "org/repo org/another-repo org/*"
},
"api_url": {
"type": "string",
"examples": ["https://github.com", "https://github.company.org"],
"title": "API URL",
"default": "https://api.github.com/",
"description": "Please enter your basic URL from self-hosted GitHub instance or leave it empty to use GitHub.",
"order": 3
},
"branch": {
"type": "string",
"title": "Branch",
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
"description": "Space-delimited list of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
"order": 3,
"order": 4,
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"
},
"requests_per_hour": {
"type": "integer",
"title": "Max requests per hour",
"description": "The GitHub API allows for a maximum of 5000 requests per hour (15000 for Github Enterprise). You can specify a lower value to limit your use of the API quota.",
"minimum": 1,
"order": 4
"order": 5
}
}
},
Expand Down
123 changes: 64 additions & 59 deletions airbyte-integrations/connectors/source-github/source_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
from .utils import getter


class GithubStream(HttpStream, ABC):
url_base = "https://api.github.com/"
class GithubStreamABC(HttpStream, ABC):

primary_key = "id"

Expand All @@ -30,30 +29,20 @@ class GithubStream(HttpStream, ABC):

stream_base_params = {}

def __init__(self, repositories: List[str], page_size_for_large_streams: int, access_token_type: str = "", **kwargs):
def __init__(self, api_url: str = "https://api.github.com", access_token_type: str = "", **kwargs):
super().__init__(**kwargs)
self.repositories = repositories
self.access_token_type = access_token_type

# GitHub pagination could be from 1 to 100.
# This parameter is deprecated and in future will be used sane default, page_size: 10
self.page_size = page_size_for_large_streams if self.large_stream else constants.DEFAULT_PAGE_SIZE
self.access_token_type = access_token_type
self.api_url = api_url

MAX_RETRIES = 3
adapter = requests.adapters.HTTPAdapter(max_retries=MAX_RETRIES)
self._session.mount("https://", adapter)
@property
def url_base(self) -> str:
return self.api_url

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/{self.name}"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for repository in self.repositories:
yield {"repository": repository}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
links = response.links
if "next" in links:
Expand All @@ -62,13 +51,32 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
page = dict(parse.parse_qsl(parsed_link.query)).get("page")
return {"page": page}

def check_graphql_rate_limited(self, response_json) -> bool:
errors = response_json.get("errors")
if errors:
for error in errors:
if error.get("type") == "RATE_LIMITED":
return True
return False
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:

params = {"per_page": self.page_size}

if next_page_token:
params.update(next_page_token)

params.update(self.stream_base_params)

return params

def request_headers(self, **kwargs) -> Mapping[str, Any]:
# Without sending `User-Agent` header we will be getting `403 Client Error: Forbidden for url` error.
return {"User-Agent": "PostmanRuntime/7.28.0"}

def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
for record in response.json(): # GitHub puts records in an array.
yield self.transform(record=record, stream_slice=stream_slice)

def should_retry(self, response: requests.Response) -> bool:
if super().should_retry(response):
Expand Down Expand Up @@ -119,16 +127,6 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
if reset_time:
return max(float(reset_time) - time.time(), min_backoff_time)

def get_error_display_message(self, exception: BaseException) -> Optional[str]:
if (
isinstance(exception, DefaultBackoffException)
and exception.response.status_code == requests.codes.BAD_GATEWAY
and self.large_stream
and self.page_size > 1
):
return f'Please try to decrease the "Page size for large streams" below {self.page_size}. The stream "{self.name}" is a large stream, such streams can fail with 502 for high "page_size" values.'
return super().get_error_display_message(exception)

def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
# get out the stream_slice parts for later use.
organisation = stream_slice.get("organization", "")
Expand Down Expand Up @@ -188,34 +186,41 @@ def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iter
self.logger.error(f"Undefined error while reading records: {e.response.text}")
raise e

self.logger.warn(error_msg)
self.logger.warning(error_msg)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:

params = {"per_page": self.page_size}

if next_page_token:
params.update(next_page_token)
class GithubStream(GithubStreamABC):
def __init__(self, repositories: List[str], page_size_for_large_streams: int, **kwargs):
super().__init__(**kwargs)
self.repositories = repositories
# GitHub pagination could be from 1 to 100.
# This parameter is deprecated and in future will be used sane default, page_size: 10
self.page_size = page_size_for_large_streams if self.large_stream else constants.DEFAULT_PAGE_SIZE

params.update(self.stream_base_params)
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/{self.name}"

return params
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for repository in self.repositories:
yield {"repository": repository}

def request_headers(self, **kwargs) -> Mapping[str, Any]:
# Without sending `User-Agent` header we will be getting `403 Client Error: Forbidden for url` error.
return {"User-Agent": "PostmanRuntime/7.28.0"}
def check_graphql_rate_limited(self, response_json) -> bool:
errors = response_json.get("errors")
if errors:
for error in errors:
if error.get("type") == "RATE_LIMITED":
return True
return False

def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
for record in response.json(): # GitHub puts records in an array.
yield self.transform(record=record, stream_slice=stream_slice)
def get_error_display_message(self, exception: BaseException) -> Optional[str]:
if (
isinstance(exception, DefaultBackoffException)
and exception.response.status_code == requests.codes.BAD_GATEWAY
and self.large_stream
and self.page_size > 1
):
return f'Please try to decrease the "Page size for large streams" below {self.page_size}. The stream "{self.name}" is a large stream, such streams can fail with 502 for high "page_size" values.'
return super().get_error_display_message(exception)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record["repository"] = stream_slice["repository"]
Expand Down Expand Up @@ -368,7 +373,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/labels"


class Organizations(GithubStream):
class Organizations(GithubStreamABC):
"""
API docs: https://docs.github.com/en/rest/reference/orgs#get-an-organization
"""
Expand All @@ -377,7 +382,7 @@ class Organizations(GithubStream):
page_size = 100

def __init__(self, organizations: List[str], access_token_type: str = "", **kwargs):
super(GithubStream, self).__init__(**kwargs)
super().__init__(**kwargs)
self.organizations = organizations
self.access_token_type = access_token_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#

import datetime
import logging
import os
import time
from unittest.mock import MagicMock

Expand All @@ -25,6 +27,25 @@ def check_source(repo_line: str) -> AirbyteConnectionStatus:
return source.check(logger_mock, config)


@pytest.mark.parametrize(
"api_url, deployment_env, expected_message",
(
("github.my.company.org", "CLOUD", "Please enter a full URL starting with http..."),
("http://github.my.company.org", "CLOUD", "HTTP connection is insecure and is not allowed in this environment. Please use `https` instead."),
("http:/github.my.company.org", "NOT_CLOUD", "Please provide a correct URL"),
("https:/github.my.company.org", "CLOUD", "Please provide a correct URL"),
),
)
def test_connection_fail_due_to_config_error(api_url, deployment_env, expected_message):
os.environ["DEPLOYMENT_MODE"] = deployment_env
source = SourceGithub()
config = {"access_token": "test_token", "repository": "airbyte/test", "api_url": api_url}

with pytest.raises(AirbyteTracedException) as e:
source.check_connection(logging.getLogger(), config)
assert e.value.message == expected_message


@responses.activate
def test_check_connection_repos_only():
responses.add("GET", "https://api.github.com/repos/airbytehq/airbyte", json={"full_name": "airbytehq/airbyte"})
Expand Down Expand Up @@ -108,9 +129,6 @@ def test_get_branches_data():

@responses.activate
def test_get_org_repositories():

source = SourceGithub()

responses.add(
"GET",
"https://api.github.com/repos/airbytehq/integration-test",
Expand All @@ -127,6 +145,8 @@ def test_get_org_repositories():
)

config = {"repository": "airbytehq/integration-test docker/*"}
source = SourceGithub()
config = source._ensure_default_values(config)
organisations, repositories = source._get_org_repositories(config, authenticator=None)

assert set(repositories) == {"airbytehq/integration-test", "docker/docker-py", "docker/compose"}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.2.0 | 2023-08-22 | [30647](https://github.com/airbytehq/airbyte/pull/30647) | Add support for self-hosted GitHub instances |
| 1.1.1 | 2023-09-21 | [30654](https://github.com/airbytehq/airbyte/pull/30654) | Rewrite source connection error messages |
| 1.1.0 | 2023-08-03 | [30615](https://github.com/airbytehq/airbyte/pull/30615) | Add new stream `Contributor Activity` |
| 1.0.4 | 2023-08-03 | [29031](https://github.com/airbytehq/airbyte/pull/29031) | Reverted `advancedAuth` spec changes |
Expand Down

0 comments on commit 32c08d7

Please sign in to comment.