Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Amazon Seller Partner: add stream name to report options list #38657

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@
"items": {
"type": "object",
"title": "Report Options",
"required": ["stream_name", "options_list"],
"required": ["report_name", "stream_name", "options_list"],
"properties": {
"stream_name": {
"title": "Stream Name",
"report_name": {
"title": "Report Name",
"type": "string",
"order": 0,
"enum": [
Expand Down Expand Up @@ -165,6 +165,7 @@
"GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE",
"GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",
"GET_XML_BROWSE_TREE_DATA",
"GET_VENDOR_REAL_TIME_INVENTORY_REPORT",
"GET_BRAND_ANALYTICS_MARKET_BASKET_REPORT",
"GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT",
"GET_BRAND_ANALYTICS_REPEAT_PURCHASE_REPORT",
Expand All @@ -175,10 +176,16 @@
"GET_VENDOR_TRAFFIC_REPORT"
]
},
"stream_name": {
"title": "Stream Name",
"type": "string",
"order": 1
},
"options_list": {
"title": "List of options",
"description": "List of options",
"type": "array",
"order": 2,
"items": {
"type": "object",
"required": ["option_name", "option_value"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerImageTag: 4.2.4
dockerImageTag: 4.3.0
dockerRepository: airbyte/source-amazon-seller-partner
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-seller-partner
githubIssueLabel: source-amazon-seller-partner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.2.4"
version = "4.3.0"
name = "source-amazon-seller-partner"
description = "Source implementation for Amazon Seller Partner."
authors = ["Airbyte <[email protected]>"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import abc
import json
import logging
from typing import Any, List, Mapping
Expand All @@ -16,38 +15,23 @@
logger = logging.getLogger("airbyte_logger")


class MigrateAccountType:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.

Specifically, starting from `2.0.1`, the `account_type` property becomes required.
For those connector configs that do not contain this key, the default value of `Seller` will be used.
Reverse operation is not needed as this field is ignored in previous versions of the connector.
"""

class Migration:
message_repository: MessageRepository = InMemoryMessageRepository()
migration_key: str = "account_type"

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config requires migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
return cls.migration_key not in config
@abc.abstractmethod
def _transform(cls, config: Mapping[str, Any]):
pass

@classmethod
def _populate_with_default_value(cls, config: Mapping[str, Any], source: SourceAmazonSellerPartner = None) -> Mapping[str, Any]:
config[cls.migration_key] = "Seller"
return config
@abc.abstractmethod
def _should_migrate(cls, config: Mapping[str, Any]):
pass

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceAmazonSellerPartner, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._populate_with_default_value(config, source)
migrated_config = cls._transform(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
Expand Down Expand Up @@ -78,7 +62,35 @@ def migrate(cls, args: List[str], source: SourceAmazonSellerPartner) -> None:
cls._emit_control_message(cls._modify_and_save(config_path, source, config))


class MigrateReportOptions:
class MigrateAccountType(Migration):
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.

Specifically, starting from `2.0.1`, the `account_type` property becomes required.
For those connector configs that do not contain this key, the default value of `Seller` will be used.
Reverse operation is not needed as this field is ignored in previous versions of the connector.
"""

migration_key: str = "account_type"

@classmethod
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config requires migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
return cls.migration_key not in config

@classmethod
def _transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config[cls.migration_key] = "Seller"
return config


class MigrateReportOptions(Migration):
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Expand All @@ -88,7 +100,6 @@ class MigrateReportOptions:
Reverse operation is not needed as this field is ignored in previous versions of the connector.
"""

message_repository: MessageRepository = InMemoryMessageRepository()
migration_key: str = "report_options_list"

@classmethod
Expand All @@ -102,7 +113,7 @@ def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
return cls.migration_key not in config and (config.get("report_options") or config.get("advanced_stream_options"))

@classmethod
def _transform_report_options(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
def _transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
try:
report_options = json.loads(config.get("report_options", "{}") or "{}")
except json.JSONDecodeError:
Expand All @@ -116,35 +127,42 @@ def _transform_report_options(cls, config: Mapping[str, Any]) -> Mapping[str, An
config[cls.migration_key] = report_options_list
return config

@classmethod
def _modify_and_save(cls, config_path: str, source: SourceAmazonSellerPartner, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls._transform_report_options(config)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository.consume_queue():
print(message.json(exclude_unset=True))
class MigrateStreamNameOption(Migration):
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.

Specifically, starting from `4.3.0`, the `report_options_list` property holds a list of objects,
each of which now is required to have a `stream_name` property.
For those connector configs that do not contain this key, the default value of <report_name> will be used.
Reverse operation is not needed as this field is ignored in previous versions of the connector.
"""

@classmethod
def migrate(cls, args: List[str], source: SourceAmazonSellerPartner) -> None:
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
This method determines whether config requires migration.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls._should_migrate(config):
cls._emit_control_message(cls._modify_and_save(config_path, source, config))
if "report_options_list" not in config:
return False

options_list = config["report_options_list"]
for options in options_list:
if "report_name" not in options:
return True
return False

@classmethod
def _transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
report_options_list = []
for report_options in config["report_options_list"]:
if "report_name" not in report_options:
report_options["report_name"] = report_options["stream_name"]
report_options_list.append(report_options)

config["report_options_list"] = report_options_list
return config
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

from airbyte_cdk.entrypoint import launch
from source_amazon_seller_partner import SourceAmazonSellerPartner
from source_amazon_seller_partner.config_migrations import MigrateAccountType, MigrateReportOptions
from source_amazon_seller_partner.config_migrations import MigrateAccountType, MigrateReportOptions, MigrateStreamNameOption


def run():
source = SourceAmazonSellerPartner()
MigrateAccountType.migrate(sys.argv[1:], source)
MigrateReportOptions.migrate(sys.argv[1:], source)
MigrateStreamNameOption.migrate(sys.argv[1:], source)
launch(source, sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
OrderReportDataShipping,
Orders,
RapidRetailAnalyticsInventoryReport,
ReportsAmazonSPStream,
RestockInventoryReports,
SellerAnalyticsSalesAndTrafficReports,
SellerFeedbackReports,
Expand Down Expand Up @@ -205,7 +206,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream_list += brand_analytics_reports

for stream in stream_list:
streams.append(stream(**stream_kwargs, report_options=self.get_stream_report_options_list(stream.name, config)))
if not issubclass(stream, ReportsAmazonSPStream):
streams.append(stream(**stream_kwargs))
continue
report_kwargs = list(self.get_stream_report_kwargs(stream.report_name, config))
if not report_kwargs:
report_kwargs.append((stream.report_name, {}))
for name, options in report_kwargs:
kwargs = {"stream_name": name, "report_options": options, **stream_kwargs}
streams.append(stream(**kwargs))
return streams

def spec(self, logger: Logger) -> ConnectorSpecification:
Expand All @@ -221,7 +230,7 @@ def spec(self, logger: Logger) -> ConnectorSpecification:
"GET_VENDOR_NET_PURE_PRODUCT_MARGIN_REPORT",
"GET_VENDOR_TRAFFIC_REPORT",
]
spec.connectionSpecification["properties"]["report_options_list"]["items"]["properties"]["stream_name"]["enum"].extend(
spec.connectionSpecification["properties"]["report_options_list"]["items"]["properties"]["report_name"]["enum"].extend(
oss_only_streams
)

Expand All @@ -238,19 +247,21 @@ def validate_replication_dates(config: Mapping[str, Any]) -> None:

@staticmethod
def validate_stream_report_options(config: Mapping[str, Any]) -> None:
if len([x.get("stream_name") for x in config.get("report_options_list", [])]) != len(
set(x.get("stream_name") for x in config.get("report_options_list", []))
):
options_list = config.get("report_options_list", [])
stream_names = [x.get("stream_name") for x in options_list]
if len(stream_names) != len(set(stream_names)):
raise AmazonConfigException(message="Stream name should be unique among all Report options list")
for stream_report_option in config.get("report_options_list", []):
if len([x.get("option_name") for x in stream_report_option.get("options_list")]) != len(
set(x.get("option_name") for x in stream_report_option.get("options_list"))
):

for report_option in options_list:
option_names = [x.get("option_name") for x in report_option.get("options_list")]
if len(option_names) != len(set(option_names)):
raise AmazonConfigException(
message=f"Option names should be unique for `{stream_report_option.get('stream_name')}` report options"
message=f"Option names should be unique for `{report_option.get('stream_name')}` report options"
)

@staticmethod
def get_stream_report_options_list(report_name: str, config: Mapping[str, Any]) -> Optional[List[Mapping[str, Any]]]:
if any(x for x in config.get("report_options_list", []) if x.get("stream_name") == report_name):
return [x.get("options_list") for x in config.get("report_options_list") if x.get("stream_name") == report_name][0]
def get_stream_report_kwargs(report_name: str, config: Mapping[str, Any]) -> List[Tuple[str, Optional[List[Mapping[str, Any]]]]]:
options_list = config.get("report_options_list", [])
for x in options_list:
if x.get("report_name") == report_name:
yield x.get("stream_name"), x.get("options_list")
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@
"items": {
"type": "object",
"title": "Report Options",
"required": ["stream_name", "options_list"],
"required": ["report_name", "stream_name", "options_list"],
"properties": {
"stream_name": {
"title": "Stream Name",
"report_name": {
"title": "Report Name",
"type": "string",
"order": 0,
"enum": [
Expand Down Expand Up @@ -164,13 +164,20 @@
"GET_STRANDED_INVENTORY_UI_DATA",
"GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE",
"GET_XML_ALL_ORDERS_DATA_BY_ORDER_DATE_GENERAL",
"GET_XML_BROWSE_TREE_DATA"
"GET_XML_BROWSE_TREE_DATA",
"GET_VENDOR_REAL_TIME_INVENTORY_REPORT"
]
},
"stream_name": {
"title": "Stream Name",
"type": "string",
"order": 1
},
"options_list": {
"title": "List of options",
"description": "List of options",
"type": "array",
"order": 2,
"items": {
"type": "object",
"required": ["option_name", "option_value"],
Expand Down
Loading
Loading