Skip to content

Commit

Permalink
Add ARN Role auth option
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Dec 27, 2023
1 parent 333320e commit b8095e9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
#


from os import getenv
from typing import Optional

from pydantic import BaseModel, Field

from .source_files_abstract.spec import SourceFilesAbstractSpec

AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID")


class SourceS3Spec(SourceFilesAbstractSpec, BaseModel):
class Config:
Expand Down Expand Up @@ -39,6 +42,15 @@ class Config:
always_show=True,
order=2,
)
role_arn: Optional[str] = Field(
title="AWS Role ARN",
default=None,
description="Specifies the Amazon Resource Name (ARN) of an IAM role that you want to use to perform operations "
f"requested using this profile. Set External ID as '{AWS_EXTERNAL_ID}'.",
always_show=True,
airbyte_secret=True,
order=6,
)
path_prefix: str = Field(
default="",
description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, "
Expand Down
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from os import getenv
from typing import Any, Dict, Optional

import dpath.util
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.utils import is_cloud_environment
from pydantic import AnyUrl, Field, ValidationError, root_validator

AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID")


class Config(AbstractFileBasedSpec):
"""
Expand All @@ -31,6 +34,15 @@ def documentation_url(cls) -> AnyUrl:
order=2,
)

role_arn: Optional[str] = Field(
title="AWS Role ARN",
default=None,
description="Specifies the Amazon Resource Name (ARN) of an IAM role that you want to use to perform operations "
f"requested using this profile. Set External ID as {AWS_EXTERNAL_ID}.",
airbyte_secret=True,
order=6,
)

aws_secret_access_key: Optional[str] = Field(
title="AWS Secret Access Key",
default=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
#

import logging
import time
from datetime import datetime
from io import IOBase
from os import getenv
from typing import Iterable, List, Optional, Set

import boto3.session
Expand All @@ -16,10 +18,14 @@
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from botocore.client import BaseClient
from botocore.client import Config as ClientConfig
from botocore.credentials import RefreshableCredentials
from botocore.exceptions import ClientError
from botocore.session import get_session
from source_s3.v4.config import Config
from source_s3.v4.zip_reader import DecompressedStream, RemoteFileInsideArchive, ZipContentReader, ZipFileHandler

AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID")


class SourceS3StreamReader(AbstractFileBasedStreamReader):
def __init__(self):
Expand Down Expand Up @@ -52,12 +58,42 @@ def s3_client(self) -> BaseClient:
raise ValueError("Source config is missing; cannot create the S3 client.")
if self._s3_client is None:
client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {}
self._s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
**client_kv_args,
)

if self.config.role_arn:

def refresh():
client = boto3.client("sts")
role = client.assume_role(
RoleArn=self.config.role_arn,
RoleSessionName="airbyte-source-s3",
ExternalId=AWS_EXTERNAL_ID,
)
creds = role.get("Credentials", {})
return {
"access_key": creds["AccessKeyId"],
"secret_key": creds["SecretAccessKey"],
"token": creds["SessionToken"],
"expiry_time": creds["Expiration"].isoformat(),
}

session_credentials = RefreshableCredentials.create_from_metadata(
metadata=refresh(),
refresh_using=refresh,
method="sts-assume-role",
)

session = get_session()
session._credentials = session_credentials
autorefresh_session = boto3.Session(botocore_session=session)
self._s3_client = autorefresh_session.client("s3", **client_kv_args)
else:
self._s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
**client_kv_args,
)

return self._s3_client

def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]:
Expand Down

0 comments on commit b8095e9

Please sign in to comment.