Skip to content

Commit

Permalink
Revert S3 changes; these will be done in a separate PR
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Jan 22, 2024
1 parent f2c6da2 commit ff0cb0b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@


class SourceS3(FileBasedSource):

@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
def read_config(self, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based S3 connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = super().read_config(config_path)
if not SourceS3._is_v4_config(config):
if not self._is_v4_config(config):
parsed_legacy_config = SourceS3Spec(**config)
converted_config = LegacyConfigTransformer.convert(parsed_legacy_config)
emit_configuration_as_airbyte_control_message(converted_config)
Expand Down Expand Up @@ -68,8 +66,7 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
connectionSpecification=s4_spec,
)

@staticmethod
def _is_v4_config(config: Mapping[str, Any]) -> bool:
def _is_v4_config(self, config: Mapping[str, Any]) -> bool:
return "streams" in config

@staticmethod
Expand Down
11 changes: 1 addition & 10 deletions airbyte-integrations/connectors/source-s3/v4_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,8 @@

def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceS3(
SourceS3StreamReader(),
Config,
SourceS3.read_catalog(catalog_path) if catalog_path else None,
SourceS3.read_config(config_path) if config_path else None,
SourceS3.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
except Exception:
print(
AirbyteMessage(
Expand Down

0 comments on commit ff0cb0b

Please sign in to comment.