From b8095e9e48d1183d04dbbce5f21b591df42d39bd Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 27 Dec 2023 22:45:06 +0200 Subject: [PATCH] Add ARN Role auth option --- .../connectors/source-s3/source_s3/source.py | 12 +++++ .../source-s3/source_s3/v4/config.py | 12 +++++ .../source-s3/source_s3/v4/stream_reader.py | 48 ++++++++++++++++--- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source.py index 8621fe4bbb5a..99ac3710b337 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source.py @@ -3,12 +3,15 @@ # +from os import getenv from typing import Optional from pydantic import BaseModel, Field from .source_files_abstract.spec import SourceFilesAbstractSpec +AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID") + class SourceS3Spec(SourceFilesAbstractSpec, BaseModel): class Config: @@ -39,6 +42,15 @@ class Config: always_show=True, order=2, ) + role_arn: Optional[str] = Field( + title="AWS Role ARN", + default=None, + description="Specifies the Amazon Resource Name (ARN) of an IAM role that you want to use to perform operations " + f"requested using this profile. Set External ID as '{AWS_EXTERNAL_ID}'.", + always_show=True, + airbyte_secret=True, + order=6, + ) path_prefix: str = Field( default="", description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, " diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py index 6275c0954388..40c526f3f016 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/config.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from os import getenv from typing import Any, Dict, Optional import dpath.util @@ -9,6 +10,8 @@ from airbyte_cdk.utils import is_cloud_environment from pydantic import AnyUrl, Field, ValidationError, root_validator +AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID") + class Config(AbstractFileBasedSpec): """ @@ -31,6 +34,15 @@ def documentation_url(cls) -> AnyUrl: order=2, ) + role_arn: Optional[str] = Field( + title="AWS Role ARN", + default=None, + description="Specifies the Amazon Resource Name (ARN) of an IAM role that you want to use to perform operations " + f"requested using this profile. Set External ID as {AWS_EXTERNAL_ID}.", + airbyte_secret=True, + order=6, + ) + aws_secret_access_key: Optional[str] = Field( title="AWS Secret Access Key", default=None, diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py index d8bfbd5b16bc..60f9a49de3ba 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py @@ -3,8 +3,10 @@ # import logging +import time from datetime import datetime from io import IOBase +from os import getenv from typing import Iterable, List, Optional, Set import boto3.session @@ -16,10 +18,14 @@ from airbyte_cdk.sources.file_based.remote_file import RemoteFile from botocore.client import BaseClient from botocore.client import Config as ClientConfig +from botocore.credentials import RefreshableCredentials from botocore.exceptions import ClientError +from botocore.session import get_session from source_s3.v4.config import Config from source_s3.v4.zip_reader import DecompressedStream, RemoteFileInsideArchive, ZipContentReader, ZipFileHandler +AWS_EXTERNAL_ID = getenv("AWS_EXTERNAL_ID") + class SourceS3StreamReader(AbstractFileBasedStreamReader): def __init__(self): @@ -52,12 +58,42 @@ def s3_client(self) -> BaseClient: raise ValueError("Source config is missing; cannot create the S3 client.") if self._s3_client is None: client_kv_args = _get_s3_compatible_client_args(self.config) if self.config.endpoint else {} - self._s3_client = boto3.client( - "s3", - aws_access_key_id=self.config.aws_access_key_id, - aws_secret_access_key=self.config.aws_secret_access_key, - **client_kv_args, - ) + + if self.config.role_arn: + + def refresh(): + client = boto3.client("sts") + role = client.assume_role( + RoleArn=self.config.role_arn, + RoleSessionName="airbyte-source-s3", + ExternalId=AWS_EXTERNAL_ID, + ) + creds = role.get("Credentials", {}) + return { + "access_key": creds["AccessKeyId"], + "secret_key": creds["SecretAccessKey"], + "token": creds["SessionToken"], + "expiry_time": creds["Expiration"].isoformat(), + } + + session_credentials = RefreshableCredentials.create_from_metadata( + metadata=refresh(), + refresh_using=refresh, + method="sts-assume-role", + ) + + session = get_session() + session._credentials = session_credentials + autorefresh_session = boto3.Session(botocore_session=session) + self._s3_client = autorefresh_session.client("s3", **client_kv_args) + else: + self._s3_client = boto3.client( + "s3", + aws_access_key_id=self.config.aws_access_key_id, + aws_secret_access_key=self.config.aws_secret_access_key, + **client_kv_args, + ) + return self._s3_client def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]: