Skip to content

Commit

Permalink
add custom general ledger stream using module partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed Nov 8, 2024
1 parent 02bd0c7 commit 909c431
Showing 1 changed file with 86 additions and 8 deletions.
94 changes: 86 additions & 8 deletions tap_intacct/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timezone

import xmltodict
from singer_sdk import typing as th # JSON schema typing helpers
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002
from singer_sdk.streams import RESTStream

Expand Down Expand Up @@ -141,6 +142,20 @@ def prepare_request(
data=request_data,
)

def _get_query_filter(
self,
rep_key: str,
context: Context | None,
) -> dict:
return {
"greaterthanorequalto": {
"field": rep_key,
"value": self._format_date_for_intacct(
self.get_starting_timestamp(context)
),
}
}

def prepare_request_payload(
self,
context: Context | None, # noqa: ARG002
Expand All @@ -161,20 +176,13 @@ def prepare_request_payload(
raise Exception("TODO hanlde audit streams")

rep_key = REP_KEYS.get(self.name, GET_BY_DATE_FIELD)
query_filter = {
"greaterthanorequalto": {
"field": rep_key,
"value": self._format_date_for_intacct(
self.get_starting_timestamp(context)
),
}
}
orderby = {
"order": {
"field": rep_key,
"ascending": {},
}
}
query_filter = self._get_query_filter(rep_key, context)
data = {
"query": {
"object": self.intacct_obj_name,
Expand Down Expand Up @@ -357,3 +365,73 @@ def post_process(
if row[field] is not None:
row[field] = self._parse_to_datetime(row[field])
return row


class GeneralLedgerDetailsStream(IntacctStream):
""" "General Ledger Details" stream."""

def __init__(
self,
*args,
**kwargs,
):
# Add MODULEKEY to discovered schema so it can be manually added in post_process
kwargs["schema"]["properties"]["MODULEKEY"] = th.StringType
super().__init__(*args, **kwargs)

def _get_query_filter(
self,
rep_key: str,
context: Context | None,
) -> dict:
return {
"and": {
**super()._get_query_filter(rep_key, context),
"equalto": {
"field": "MODULEKEY",
"value": context["key"],
},
}
}

@property
def partitions(self) -> list[dict] | None:
"""Get stream partitions.
Developers may override this property to provide a default partitions list.
By default, this method returns a list of any partitions which are already
defined in state, otherwise None.
Returns:
A list of partition key dicts (if applicable), otherwise `None`.
"""
return [
{"key": "2.GL", "name": "General Ledger"},
{"key": "3.AP", "name": "Accounts Payable"},
{"key": "4.AR", "name": "Accounts Receivable"},
{"key": "6.EE", "name": "Employee Expenses"},
{"key": "7.INV", "name": "Inventory Control"},
{"key": "8.SO", "name": "Order Entry"},
{"key": "9.PO", "name": "Purchasing"},
{"key": "11.CM", "name": "Cash Management"},
{"key": "48.PROJACCT", "name": "Project and Resource Management"},
{"key": "55.CONTRACT", "name": "Contracts and Revenue Management"},
]

def post_process(
self,
row: dict,
context: Context | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: An individual record from the stream.
context: The stream context.
Returns:
The updated record dictionary, or ``None`` to skip the record.
"""
row["MODULEKEY"] = context["key"]
return row

0 comments on commit 909c431

Please sign in to comment.