From 01635d18a4da0757a0d56f87d4058cb0599372f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 17 Dec 2024 14:12:22 -0600 Subject: [PATCH] refactor: Use a base balance stream interface --- tap_intacct/streams.py | 64 +++++++++++++++++++++++++++++++++--------- tap_intacct/tap.py | 8 +----- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/tap_intacct/streams.py b/tap_intacct/streams.py index b4797d1..5b435b7 100644 --- a/tap_intacct/streams.py +++ b/tap_intacct/streams.py @@ -93,11 +93,10 @@ class BaseIntacctStream(RESTStream[int], metaclass=abc.ABCMeta): #: The operation/entity is defined in the payload, not the path path = None - def __init__(self, *args: t.Any, intacct_obj_name: str | None = None, **kwargs: t.Any) -> None: + def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: """Initialize stream.""" super().__init__(*args, **kwargs) self.session_id = self._get_session_id() - self.intacct_obj_name = intacct_obj_name self.datetime_fields = [ i for i, t in self.schema["properties"].items() if t.get("format", "") == "date-time" ] @@ -430,6 +429,11 @@ def get_request_data( ) -> dict: """Generate request data for a general Intacct stream.""" + @property + @abc.abstractmethod + def intacct_obj_name(self) -> str: + """Return the Intacct object name.""" + class IntacctStream(BaseIntacctStream): """Intacct stream class.""" @@ -437,14 +441,21 @@ class IntacctStream(BaseIntacctStream): def __init__( self, *args: t.Any, + intacct_obj_name: str, replication_key: str | None = None, **kwargs: t.Any, ) -> None: """Initialize stream.""" super().__init__(*args, **kwargs) self.primary_keys = KEY_PROPERTIES[self.name] + self._intacct_obj_name = intacct_obj_name self.replication_key = replication_key + @property + def intacct_obj_name(self) -> str: + """The Intacct object name.""" + return self._intacct_obj_name + def _format_date_for_intacct(self, datetime: datetime) -> str: """Intacct expects datetimes in a 'MM/DD/YY HH:MM:SS' string format. @@ -549,13 +560,32 @@ def partitions(self) -> list[dict] | None: ] -class TrialBalancesStream(BaseIntacctStream): - """Trial balances. +class _LegacyFunctionStream(BaseIntacctStream, metaclass=abc.ABCMeta): + """Base Intacct stream class for legacy functions.""" - https://developer.intacct.com/api/general-ledger/trial-balances/ - """ + def get_request_data( + self, + context: Context | None, + next_page_token: int | None, + ) -> dict: + """Generate request data for a "legacy" Intacct stream.""" + return { + self.function_name: self.get_function_arguments(context, next_page_token), + } + + @property + @abc.abstractmethod + def function_name(self) -> str: + """Return the function name.""" + + @abc.abstractmethod + def get_function_arguments(self, context: Context | None, next_page_token: int | None) -> dict: + """Return the function arguments.""" + + +class _BaseBalancesStream(_LegacyFunctionStream): + """Generic balances stream.""" - name = "trial_balances" primary_keys = ("glaccountno",) schema = th.PropertiesList( @@ -570,7 +600,7 @@ class TrialBalancesStream(BaseIntacctStream): th.Property("currency", th.StringType), ).to_dict() - def get_request_data( + def get_function_arguments( self, context: Context | None, # noqa: ARG002 next_page_token: int | None, # noqa: ARG002 @@ -595,10 +625,18 @@ def get_request_data( "month": end_date.month, "day": end_date.day, } - return { - "get_trialbalance": { - "startdate": start_date_obj, - "enddate": end_date_obj, - } + "startdate": start_date_obj, + "enddate": end_date_obj, } + + +class TrialBalancesStream(_BaseBalancesStream): + """Trial balances. + + https://developer.intacct.com/api/general-ledger/trial-balances/ + """ + + name = "trial_balances" + intacct_obj_name = "trialbalance" + function_name = "get_trialbalance" diff --git a/tap_intacct/tap.py b/tap_intacct/tap.py index 8ac338d..00c816e 100644 --- a/tap_intacct/tap.py +++ b/tap_intacct/tap.py @@ -100,13 +100,7 @@ def discover_streams(self) -> list[streams.IntacctStream]: # ) # discovered_streams.append(audit_stream) - discovered_streams.append( - streams.TrialBalancesStream( - tap=self, - name="trial_balances", - intacct_obj_name="trialbalance", - ) - ) + discovered_streams.append(streams.TrialBalancesStream(tap=self)) return discovered_streams