Skip to content

Commit

Permalink
Source Azure Blob Storage: add unit tests (#37467)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored Apr 28, 2024
1 parent ca9db8d commit abba3a7
Show file tree
Hide file tree
Showing 18 changed files with 793 additions and 486 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_protocol.models import ConfiguredAirbyteCatalog
from source_azure_blob_storage import Config, SourceAzureBlobStorage, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader


@pytest.mark.parametrize(
Expand All @@ -23,7 +23,7 @@ def test_read_files(configured_catalog: ConfiguredAirbyteCatalog, config: Mappin
config = request.getfixturevalue(config)
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
spec_class=Config,
spec_class=SourceAzureBlobStorageSpec,
catalog=configured_catalog,
config=config,
state=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/azure-blob-storage",
"connectionSpecification": {
"title": "Config",
"title": "SourceAzureBlobStorageSpec",
"description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be modified to uptake the changes\nbecause it is responsible for converting legacy Azure Blob Storage v0 configs into v1 configs using the File-Based CDK.",
"type": "object",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.4.0
dockerImageTag: 0.4.1
dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
Expand Down
957 changes: 585 additions & 372 deletions airbyte-integrations/connectors/source-azure-blob-storage/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.4.0"
version = "0.4.1"
name = "source-azure-blob-storage"
description = "Source implementation for Azure Blob Storage."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -28,6 +28,7 @@ source-azure-blob-storage = "source_azure_blob_storage.run:run"

[tool.poetry.group.dev.dependencies]
docker = "^7.0.0"
freezegun = "^1.4.0"
pytest-mock = "^3.6.1"
requests-mock = "^1.9.3"
pandas = "2.2.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#


from .config import Config
from .source import SourceAzureBlobStorage
from .spec import SourceAzureBlobStorageSpec
from .stream_reader import SourceAzureBlobStorageStreamReader

__all__ = ["SourceAzureBlobStorage", "SourceAzureBlobStorageStreamReader", "Config"]
__all__ = ["SourceAzureBlobStorage", "SourceAzureBlobStorageStreamReader", "SourceAzureBlobStorageSpec"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import logging
from abc import ABC, abstractmethod
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
Expand All @@ -13,22 +14,16 @@
logger = logging.getLogger("airbyte_logger")


class MigrateCredentials:
"""
This class stands for migrating the config azure_blob_storage_account_key inside object `credentials`
"""

class MigrateConfig(ABC):
@classmethod
@abstractmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
return "credentials" not in config
...

@classmethod
def set_azure_blob_storage_account_key(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config["credentials"] = {
"auth_type": "storage_account_key",
"azure_blob_storage_account_key": config.pop("azure_blob_storage_account_key"),
}
return config
@abstractmethod
def migrate_config(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
...

@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -43,7 +38,7 @@ def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str,
Returns:
- Mapping[str, Any]: The updated configuration.
"""
migrated_config = cls.set_azure_blob_storage_account_key(config)
migrated_config = cls.migrate_config(config)
source.write_config(migrated_config, config_path)
return migrated_config

Expand Down Expand Up @@ -75,3 +70,50 @@ def migrate(cls, args: List[str], source: Source) -> None:
config = source.read_config(config_path)
if cls.should_migrate(config):
cls.emit_control_message(cls.modify_and_save(config_path, source, config))


class MigrateLegacyConfig(MigrateConfig):
"""
Class that takes in Azure Blob Storage source configs in the legacy format and transforms them into
configs that can be used by the new Azure Blob Storage source built with the file-based CDK.
"""

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
return "streams" not in config

@classmethod
def migrate_config(cls, legacy_config: Mapping[str, Any]) -> Mapping[str, Any]:
azure_blob_storage_blobs_prefix = legacy_config.get("azure_blob_storage_blobs_prefix", "")
return {
"azure_blob_storage_endpoint": legacy_config.get("azure_blob_storage_endpoint", None),
"azure_blob_storage_account_name": legacy_config["azure_blob_storage_account_name"],
"azure_blob_storage_account_key": legacy_config["azure_blob_storage_account_key"],
"azure_blob_storage_container_name": legacy_config["azure_blob_storage_container_name"],
"streams": [
{
"name": legacy_config["azure_blob_storage_container_name"],
"legacy_prefix": azure_blob_storage_blobs_prefix,
"validation_policy": "Emit Record",
"format": {"filetype": "jsonl"},
}
],
}


class MigrateCredentials(MigrateConfig):
"""
This class stands for migrating the config azure_blob_storage_account_key inside object `credentials`
"""

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
return "credentials" not in config

@classmethod
def migrate_config(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config["credentials"] = {
"auth_type": "storage_account_key",
"azure_blob_storage_account_key": config.pop("azure_blob_storage_account_key"),
}
return config

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from source_azure_blob_storage import Config, SourceAzureBlobStorage, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials, MigrateLegacyConfig


def run():
Expand All @@ -21,12 +21,13 @@ def run():
try:
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
Config,
SourceAzureBlobStorageSpec,
SourceAzureBlobStorage.read_catalog(catalog_path) if catalog_path else None,
SourceAzureBlobStorage.read_config(config_path) if catalog_path else None,
SourceAzureBlobStorage.read_state(state_path) if catalog_path else None,
cursor_cls=DefaultFileBasedCursor,
)
MigrateLegacyConfig.migrate(sys.argv[1:], source)
MigrateCredentials.migrate(sys.argv[1:], source)
except Exception:
print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,8 @@
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_protocol.models import AdvancedAuth, ConnectorSpecification

from .legacy_config_transformer import LegacyConfigTransformer


class SourceAzureBlobStorage(FileBasedSource):
@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based Azure Blob Storage connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = FileBasedSource.read_config(config_path)
if not cls._is_v1_config(config):
converted_config = LegacyConfigTransformer.convert(config)
emit_configuration_as_airbyte_control_message(converted_config)
return converted_config
return config

@staticmethod
def _is_v1_config(config: Mapping[str, Any]) -> bool:
return "streams" in config

def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
"""
Returns the specification describing what fields can be configured by a user when setting up a file-based source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Config(OneOfOptionConfig):
)


class Config(AbstractFileBasedSpec):
class SourceAzureBlobStorageSpec(AbstractFileBasedSpec):
"""
NOTE: When this Spec is changed, legacy_config_transformer.py must also be modified to uptake the changes
because it is responsible for converting legacy Azure Blob Storage v0 configs into v1 configs using the File-Based CDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from azure.storage.blob import BlobServiceClient, ContainerClient
from smart_open import open

from .config import Config
from .spec import SourceAzureBlobStorageSpec


class AzureOauth2Authenticator(Oauth2Authenticator):
Expand All @@ -35,11 +35,11 @@ def __init__(self, *args, **kwargs):
self._config = None

@property
def config(self) -> Config:
def config(self) -> SourceAzureBlobStorageSpec:
return self._config

@config.setter
def config(self, value: Config) -> None:
def config(self, value: SourceAzureBlobStorageSpec) -> None:
self._config = value

@property
Expand Down Expand Up @@ -83,8 +83,7 @@ def get_matching_files(
for prefix in prefixes:
for blob in self.azure_container_client.list_blobs(name_starts_with=prefix):
remote_file = RemoteFile(uri=blob.name, last_modified=blob.last_modified.astimezone(pytz.utc).replace(tzinfo=None))
if not globs or self.file_matches_globs(remote_file, globs):
yield remote_file
yield from self.filter_files_by_globs_and_start_date([remote_file], globs)

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from azure.core.credentials import AccessToken
from source_azure_blob_storage.stream_reader import AzureOauth2Authenticator


def test_custom_authenticator(requests_mock):

authenticator = AzureOauth2Authenticator(
token_refresh_endpoint="https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token",
client_id="client_id",
client_secret="client_secret",
refresh_token="refresh_token",
)
token_refresh_response = {
"token_type": "Bearer",
"scope": "https://storage.azure.com/user_impersonation https://storage.azure.com/.default",
"expires_in": 5144,
"ext_expires_in": 5144,
"access_token": "access_token",
"refresh_token": "refresh_token"
}
requests_mock.post("https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token", json=token_refresh_response)
new_token = authenticator.get_token()
assert isinstance(new_token, AccessToken)
assert new_token.token == "access_token"
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from typing import Any, Mapping

from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from source_azure_blob_storage import Config, SourceAzureBlobStorage, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials, MigrateLegacyConfig


# HELPERS
Expand All @@ -15,13 +15,43 @@ def load_config(config_path: str) -> Mapping[str, Any]:
return json.load(config)


def test_mailchimp_config_migration():
def test_legacy_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_legacy_config.json"
migration_instance = MigrateLegacyConfig
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
spec_class=SourceAzureBlobStorageSpec,
catalog={},
config=load_config(config_path),
state=None,
cursor_cls=DefaultFileBasedCursor,
)
migration_instance.migrate(["check", "--config", config_path], source)
test_migrated_config = load_config(config_path)
expected_config = {
"azure_blob_storage_account_key": "secret/key==",
"azure_blob_storage_account_name": "airbyteteststorage",
"azure_blob_storage_container_name": "airbyte-source-azure-blob-storage-test",
"azure_blob_storage_endpoint": "https://airbyteteststorage.blob.core.windows.net",
"streams": [
{
"format": {"filetype": "jsonl"},
"legacy_prefix": "subfolder/",
"name": "airbyte-source-azure-blob-storage-test",
"validation_policy": "Emit Record",
}
],
}
assert test_migrated_config == expected_config


def test_credentials_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_config_without_credentials.json"
initial_config = load_config(config_path)
migration_instance = MigrateCredentials
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
spec_class=Config,
spec_class=SourceAzureBlobStorageSpec,
catalog={},
config=load_config(config_path),
state=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"azure_blob_storage_endpoint": "https://airbyteteststorage.blob.core.windows.net",
"azure_blob_storage_account_name": "airbyteteststorage",
"azure_blob_storage_account_key": "secret/key==",
"azure_blob_storage_container_name": "airbyte-source-azure-blob-storage-test",
"azure_blob_storage_blobs_prefix": "subfolder/",
"azure_blob_storage_schema_inference_limit": 500,
"format": "jsonl"
}
Loading

0 comments on commit abba3a7

Please sign in to comment.