From 02bd0c70f50e77e5dc86a69f89cf047db2d268a0 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Mon, 4 Nov 2024 15:29:07 -0500 Subject: [PATCH] refactor to use native SDK methods --- tap_intacct/const.py | 55 +++--- tap_intacct/exceptions.py | 24 ++- tap_intacct/sage.py | 344 ++++++++++++++++++++------------------ tap_intacct/streams.py | 290 +++++++++++++++++++++++++------- tap_intacct/tap.py | 7 +- 5 files changed, 465 insertions(+), 255 deletions(-) diff --git a/tap_intacct/const.py b/tap_intacct/const.py index d5b6d2a..d12ec05 100644 --- a/tap_intacct/const.py +++ b/tap_intacct/const.py @@ -1,36 +1,36 @@ REQUIRED_CONFIG_KEYS = [ - 'company_id', - 'sender_id', - 'sender_password', - 'user_id', - 'user_password', + "company_id", + "sender_id", + "sender_password", + "user_id", + "user_password", ] KEY_PROPERTIES = { - 'accounts_payable_bills': ["RECORDNO"], - 'accounts_payable_payments': ["RECORDNO"], - 'accounts_payable_vendors': ["VENDORID"], + "accounts_payable_bills": ["RECORDNO"], + "accounts_payable_payments": ["RECORDNO"], + "accounts_payable_vendors": ["VENDORID"], "accounts_payable_bank_accounts": ["RECORDNO"], "checking_accounts": ["RECORDNO"], "savings_accounts": ["RECORDNO"], "card_accounts": ["RECORDNO"], "classes": ["RECORDNO"], "tasks": ["RECORDNO"], - 'general_ledger_accounts': ['RECORDNO'], - 'general_ledger_details': ["RECORDNO"], - 'general_ledger_journal_entries': ["RECORDNO"], - 'general_ledger_journal_entry_lines': ["RECORDNO"], - 'projects': ["RECORDNO"], - 'invoices': ["RECORDNO"], - 'adjustments': ["RECORDNO"], - 'customers': ["RECORDNO"], - 'deposits': ["RECORDNO"], - 'items': ["RECORDNO"], - 'invoice_items': ["RECORDNO"], - 'adjustment_items': ["RECORDNO"], - 'departments': ["DEPARTMENTID"], - 'audit_history': ["ID"], - 'locations': ["RECORDNO"], + "general_ledger_accounts": ["RECORDNO"], + "general_ledger_details": ["RECORDNO"], + "general_ledger_journal_entries": ["RECORDNO"], + "general_ledger_journal_entry_lines": ["RECORDNO"], + "projects": ["RECORDNO"], + "invoices": ["RECORDNO"], + "adjustments": ["RECORDNO"], + "customers": ["RECORDNO"], + "deposits": ["RECORDNO"], + "items": ["RECORDNO"], + "invoice_items": ["RECORDNO"], + "adjustment_items": ["RECORDNO"], + "departments": ["DEPARTMENTID"], + "audit_history": ["ID"], + "locations": ["RECORDNO"], } # List of available objects with their internal object-reference/endpoint name. @@ -61,14 +61,11 @@ "locations": "LOCATION" } -REP_KEYS = { - "audit_history" : "ACCESSTIME" -} - -IGNORE_FIELDS =["PASSWORD"] +REP_KEYS = {"audit_history": "ACCESSTIME"} +IGNORE_FIELDS = ["PASSWORD"] GET_BY_DATE_FIELD = "WHENMODIFIED" -DEFAULT_API_URL = 'https://api.intacct.com/ia/xml/xmlgw.phtml' +DEFAULT_API_URL = "https://api.intacct.com/ia/xml/xmlgw.phtml" diff --git a/tap_intacct/exceptions.py b/tap_intacct/exceptions.py index e024b4a..c289370 100644 --- a/tap_intacct/exceptions.py +++ b/tap_intacct/exceptions.py @@ -1,6 +1,4 @@ -""" -Sage Intacct SDK Exceptions -""" +"""Sage Intacct SDK Exceptions.""" class SageIntacctSDKError(Exception): @@ -50,3 +48,23 @@ class InvalidRequest(SageIntacctSDKError): class AuthFailure(SageIntacctSDKError): """The rest SageIntacctSDK errors, 500 error.""" + + +class InvalidXmlResponse(Exception): + pass + + +class BadGatewayError(Exception): + pass + + +class OfflineServiceError(Exception): + pass + + +class RateLimitError(Exception): + pass + + +class PleaseTryAgainLaterError(Exception): + pass diff --git a/tap_intacct/sage.py b/tap_intacct/sage.py index c6745e3..9aadd90 100644 --- a/tap_intacct/sage.py +++ b/tap_intacct/sage.py @@ -1,52 +1,58 @@ """ API Base class with util functions """ -import backoff + import datetime as dt import json +import logging import re import uuid -from typing import Dict, List, Union +from http.client import RemoteDisconnected +from typing import Union from urllib.parse import unquote +import backoff import requests import xmltodict -from xml.parsers.expat import ExpatError - -from calendar import monthrange - -import logging from tap_intacct.exceptions import ( + AuthFailure, ExpiredTokenError, InternalServerError, + InvalidRequest, InvalidTokenError, NoPrivilegeError, NotFoundItemError, SageIntacctSDKError, WrongParamsError, - InvalidRequest, - AuthFailure ) -from http.client import RemoteDisconnected class PleaseTryAgainLaterError(Exception): pass + from .const import GET_BY_DATE_FIELD, INTACCT_OBJECTS, KEY_PROPERTIES, REP_KEYS logger = logging.getLogger(__name__) + class InvalidXmlResponse(Exception): pass + + class BadGatewayError(Exception): pass + + class OfflineServiceError(Exception): pass + + class RateLimitError(Exception): pass + def _format_date_for_intacct(datetime: dt.datetime) -> str: """ Intacct expects datetimes in a 'MM/DD/YY HH:MM:SS' string format. @@ -56,9 +62,11 @@ def _format_date_for_intacct(datetime: dt.datetime) -> str: Returns: 'MM/DD/YY HH:MM:SS' formatted string. """ - return datetime.strftime('%m/%d/%Y %H:%M:%S') + return datetime.strftime("%m/%d/%Y %H:%M:%S") + + +IGNORE_FIELDS = ["PASSWORD"] -IGNORE_FIELDS =["PASSWORD"] class SageIntacctSDK: """The base class for all API classes.""" @@ -71,7 +79,7 @@ def __init__( sender_password: str, user_id: str, user_password: str, - headers: Dict, + headers: dict, ): self.__api_url = api_url self.__company_id = company_id @@ -103,27 +111,27 @@ def _set_session_id(self, user_id: str, company_id: str, user_password: str): timestamp = dt.datetime.now() dict_body = { - 'request': { - 'control': { - 'senderid': self.__sender_id, - 'password': self.__sender_password, - 'controlid': timestamp, - 'uniqueid': False, - 'dtdversion': 3.0, - 'includewhitespace': False, + "request": { + "control": { + "senderid": self.__sender_id, + "password": self.__sender_password, + "controlid": timestamp, + "uniqueid": False, + "dtdversion": 3.0, + "includewhitespace": False, }, - 'operation': { - 'authentication': { - 'login': { - 'userid': user_id, - 'companyid': company_id, - 'password': user_password, + "operation": { + "authentication": { + "login": { + "userid": user_id, + "companyid": company_id, + "password": user_password, } }, - 'content': { - 'function': { - '@controlid': str(uuid.uuid4()), - 'getAPISession': None, + "content": { + "function": { + "@controlid": str(uuid.uuid4()), + "getAPISession": None, } }, }, @@ -132,13 +140,13 @@ def _set_session_id(self, user_id: str, company_id: str, user_password: str): response = self._post_request(dict_body, self.__api_url) - if response['authentication']['status'] == 'success': - session_details = response['result']['data']['api'] - self.__api_url = session_details['endpoint'] - self.__session_id = session_details['sessionid'] + if response["authentication"]["status"] == "success": + session_details = response["result"]["data"]["api"] + self.__api_url = session_details["endpoint"] + self.__session_id = session_details["sessionid"] else: - raise SageIntacctSDKError('Error: {0}'.format(response['errormessage'])) + raise SageIntacctSDKError("Error: {0}".format(response["errormessage"])) @backoff.on_exception( backoff.expo, @@ -156,7 +164,7 @@ def _set_session_id(self, user_id: str, company_id: str, user_password: str): factor=3, ) # @singer.utils.ratelimit(10, 1) - def _post_request(self, dict_body: dict, api_url: str) -> Dict: + def _post_request(self, dict_body: dict, api_url: str) -> dict: """ Create a HTTP post request. @@ -168,7 +176,7 @@ def _post_request(self, dict_body: dict, api_url: str) -> Dict: A response from the request (dict). """ - api_headers = {'content-type': 'application/xml'} + api_headers = {"content-type": "application/xml"} api_headers.update(self.__headers) body = xmltodict.unparse(dict_body) logger.info(f"request to {api_url} with body {body}") @@ -198,34 +206,41 @@ def _post_request(self, dict_body: dict, api_url: str) -> Dict: ) if response.status_code == 200: - if parsed_response['response']['control']['status'] == 'success': - api_response = parsed_response['response']['operation'] + if parsed_response["response"]["control"]["status"] == "success": + api_response = parsed_response["response"]["operation"] - if parsed_response['response']['control']['status'] == 'failure': + if parsed_response["response"]["control"]["status"] == "failure": exception_msg = self.decode_support_id( - parsed_response['response']['errormessage'] + parsed_response["response"]["errormessage"] ) raise WrongParamsError( - 'Some of the parameters are wrong', exception_msg + "Some of the parameters are wrong", exception_msg ) - if api_response['authentication']['status'] == 'failure': + if api_response["authentication"]["status"] == "failure": raise InvalidTokenError( - 'Invalid token / Incorrect credentials', - api_response['errormessage'], + "Invalid token / Incorrect credentials", + api_response["errormessage"], ) - if api_response['result']['status'] == 'success': + if api_response["result"]["status"] == "success": return api_response - + logger.error(f"Intacct error response: {api_response}") - error = api_response.get('result', {}).get('errormessage', {}).get('error', {}) - desc_2 = error.get("description2") if isinstance(error, dict) else error[0].get("description2") if isinstance(error, list) and error else "" + error = ( + api_response.get("result", {}).get("errormessage", {}).get("error", {}) + ) + desc_2 = ( + error.get("description2") + if isinstance(error, dict) + else error[0].get("description2") + if isinstance(error, list) and error + else "" + ) if ( - api_response['result']['status'] == 'failure' + api_response["result"]["status"] == "failure" and error - and "There was an error processing the request" - in desc_2 + and "There was an error processing the request" in desc_2 and dict_body["request"]["operation"]["content"]["function"]["query"][ "object" ] @@ -233,39 +248,49 @@ def _post_request(self, dict_body: dict, api_url: str) -> Dict: ): return {"result": "skip_and_paginate"} - exception_msg = parsed_response.get("response", {}).get("errormessage", {}).get("error", {}) + exception_msg = ( + parsed_response.get("response", {}).get("errormessage", {}).get("error", {}) + ) correction = exception_msg.get("correction", {}) - + if response.status_code == 400: if exception_msg.get("errorno") == "GW-0011": - raise AuthFailure(f'One or more authentication values are incorrect. Response:{parsed_response}') - raise InvalidRequest("Invalid request", parsed_response) + raise AuthFailure( + f"One or more authentication values are incorrect. Response:{parsed_response}" + ) + raise InvalidRequest("Invalid request", parsed_response) if response.status_code == 401: raise InvalidTokenError( - f'Invalid token / Incorrect credentials. Response: {parsed_response}' + f"Invalid token / Incorrect credentials. Response: {parsed_response}" ) if response.status_code == 403: raise NoPrivilegeError( - f'Forbidden, the user has insufficient privilege. Response: {parsed_response}' + f"Forbidden, the user has insufficient privilege. Response: {parsed_response}" ) if response.status_code == 404: - raise NotFoundItemError(f'Not found item with ID. Response: {parsed_response}') + raise NotFoundItemError( + f"Not found item with ID. Response: {parsed_response}" + ) if response.status_code == 498: - raise ExpiredTokenError(f'Expired token, try to refresh it. Response: {parsed_response}') + raise ExpiredTokenError( + f"Expired token, try to refresh it. Response: {parsed_response}" + ) if response.status_code == 500: - raise InternalServerError(f'Internal server error. Response: {parsed_response}') + raise InternalServerError( + f"Internal server error. Response: {parsed_response}" + ) - if correction and 'Please Try Again Later' in correction: + if correction and "Please Try Again Later" in correction: raise PleaseTryAgainLaterError(parsed_response) - raise SageIntacctSDKError('Error: {0}'.format(parsed_response)) + raise SageIntacctSDKError("Error: {0}".format(parsed_response)) - def support_id_msg(self, errormessages) -> Union[List, Dict]: + def support_id_msg(self, errormessages) -> Union[list, dict]: """ Finds whether the error messages is list / dict and assign type and error assignment. @@ -276,16 +301,16 @@ def support_id_msg(self, errormessages) -> Union[List, Dict]: Error message assignment and type. """ error = {} - if isinstance(errormessages['error'], list): - error['error'] = errormessages['error'][0] - error['type'] = 'list' - elif isinstance(errormessages['error'], dict): - error['error'] = errormessages['error'] - error['type'] = 'dict' + if isinstance(errormessages["error"], list): + error["error"] = errormessages["error"][0] + error["type"] = "list" + elif isinstance(errormessages["error"], dict): + error["error"] = errormessages["error"] + error["type"] = "dict" return error - def decode_support_id(self, errormessages: Union[List, Dict]) -> Union[List, Dict]: + def decode_support_id(self, errormessages: Union[list, dict]) -> Union[list, dict]: """ Decodes Support ID. @@ -296,23 +321,23 @@ def decode_support_id(self, errormessages: Union[List, Dict]) -> Union[List, Dic Same error message with decoded Support ID. """ support_id_msg = self.support_id_msg(errormessages) - data_type = support_id_msg['type'] - error = support_id_msg['error'] - if error and error['description2']: - message = error['description2'] - support_id = re.search('Support ID: (.*)]', message) + data_type = support_id_msg["type"] + error = support_id_msg["error"] + if error and error["description2"]: + message = error["description2"] + support_id = re.search("Support ID: (.*)]", message) if support_id and support_id.group(1): decoded_support_id = unquote(support_id.group(1)) message = message.replace(support_id.group(1), decoded_support_id) - if data_type == 'list': - errormessages['error'][0]['description2'] = message if message else None - elif data_type == 'dict': - errormessages['error']['description2'] = message if message else None + if data_type == "list": + errormessages["error"][0]["description2"] = message if message else None + elif data_type == "dict": + errormessages["error"]["description2"] = message if message else None return errormessages - def format_and_send_request(self, data: Dict) -> Union[List, Dict]: + def format_and_send_request(self, data: dict) -> Union[list, dict]: """ Format data accordingly to convert them to xml. @@ -324,46 +349,45 @@ def format_and_send_request(self, data: Dict) -> Union[List, Dict]: """ key = next(iter(data)) - object_type = data[key]['object'] + object_type = data[key]["object"] timestamp = dt.datetime.now() dict_body = { - 'request': { - 'control': { - 'senderid': self.__sender_id, - 'password': self.__sender_password, - 'controlid': timestamp, - 'uniqueid': False, - 'dtdversion': 3.0, - 'includewhitespace': False, + "request": { + "control": { + "senderid": self.__sender_id, + "password": self.__sender_password, + "controlid": timestamp, + "uniqueid": False, + "dtdversion": 3.0, + "includewhitespace": False, }, - 'operation': { - 'authentication': {'sessionid': self.__session_id}, - 'content': { - 'function': {'@controlid': str(uuid.uuid4()), key: data[key]} + "operation": { + "authentication": {"sessionid": self.__session_id}, + "content": { + "function": {"@controlid": str(uuid.uuid4()), key: data[key]} }, }, } } # with singer.metrics.http_request_timer(endpoint=object_type): response = self._post_request(dict_body, self.__api_url) - return response['result'] - + return response["result"] def get_by_date( - self, *, object_type: str, fields: List[str], from_date: dt.datetime - ) -> List[Dict]: + self, *, object_type: str, fields: list[str], from_date: dt.datetime + ) -> list[dict]: """ Get multiple objects of a single type from Sage Intacct, filtered by GET_BY_DATE_FIELD (WHENMODIFIED) date. Returns: - List of Dict in object_type schema. + List of dict in object_type schema. """ # if stream is an audit_history stream filter by object type if object_type.startswith("audit_history"): filter_table = object_type.split("audit_history_")[-1] filter_table_value = INTACCT_OBJECTS[filter_table].lower() - object_type = "audit_history" + object_type = "audit_history" intacct_object_type = INTACCT_OBJECTS[object_type] pk = KEY_PROPERTIES[object_type][0] @@ -374,26 +398,26 @@ def get_by_date( # if it's an audit_history stream filter only created (C) and deleted (D) records if object_type == "audit_history": filter = { - "and":{ - 'greaterthanorequalto': { - 'field': rep_key, - 'value': _format_date_for_intacct(from_date), + "and": { + "greaterthanorequalto": { + "field": rep_key, + "value": _format_date_for_intacct(from_date), }, - "equalto":{ - 'field': "OBJECTTYPE", - 'value': filter_table_value, + "equalto": { + "field": "OBJECTTYPE", + "value": filter_table_value, + }, + "in": { + "field": "ACCESSMODE", + "value": ["C", "D"], }, - "in":{ - 'field': "ACCESSMODE", - 'value': ["C", "D"], - } } } else: filter = { - 'greaterthanorequalto': { - 'field': rep_key, - 'value': _format_date_for_intacct(from_date), + "greaterthanorequalto": { + "field": rep_key, + "value": _format_date_for_intacct(from_date), } } orderby = { @@ -404,39 +428,40 @@ def get_by_date( } get_count = { - 'query': { - 'object': intacct_object_type, - 'select': {'field': pk}, - 'filter': filter, - 'pagesize': '1', - 'options': {'showprivate': 'true'}, + "query": { + "object": intacct_object_type, + "select": {"field": pk}, + "filter": filter, + "pagesize": "1", + "options": {"showprivate": "true"}, } } response = self.format_and_send_request(get_count) - count = int(response['data']['@totalcount']) + count = int(response["data"]["@totalcount"]) pagesize = 1000 offset = 0 while offset < count: data = { - 'query': { - 'object': intacct_object_type, - 'select': {'field': fields}, - 'options': {'showprivate': 'true'}, - 'filter': filter, - 'pagesize': pagesize, - 'offset': offset, - 'orderby': orderby, + "query": { + "object": intacct_object_type, + "select": {"field": fields}, + "options": {"showprivate": "true"}, + "filter": filter, + "pagesize": pagesize, + "offset": offset, + "orderby": orderby, } } intacct_objects = self.format_and_send_request(data) - if intacct_objects == "skip_and_paginate" and object_type == "audit_history": + if ( + intacct_objects == "skip_and_paginate" + and object_type == "audit_history" + ): offset = offset + 99 continue - intacct_objects = intacct_objects['data'][ - intacct_object_type - ] + intacct_objects = intacct_objects["data"][intacct_object_type] # TODO: Can we use intacct_objects['data']["@numremaining"] for paginating? # It seems like it might be inconsistent. @@ -449,27 +474,21 @@ def get_by_date( offset = offset + pagesize - def get_fields_data_using_schema_name(self, object_type: str): """ Function to fetch fields data for a given object by taking the schema name through the API call.This function helps query via the api for any given schema name Returns: - List of Dict in object_type schema. + List of dict in object_type schema. """ intacct_object_type = INTACCT_OBJECTS[object_type] # First get the count of object that will be synchronized. - get_fields = { - 'lookup': { - 'object': intacct_object_type - } - } + get_fields = {"lookup": {"object": intacct_object_type}} response = self.format_and_send_request(get_fields) return response - def load_schema_from_api(self, stream: str): """ Function to load schema data via an api call for each INTACCT Object to get the fields list for each schema name @@ -482,33 +501,36 @@ def load_schema_from_api(self, stream: str): """ schema_dict = {} - schema_dict['type'] = 'object' - schema_dict['properties'] = {} + schema_dict["type"] = "object" + schema_dict["properties"] = {} required_list = ["RECORDNO", "WHENMODIFIED"] - fields_data_response = self.get_fields_data_using_schema_name(object_type=stream) - fields_data_list = fields_data_response['data']['Type']['Fields']['Field'] + fields_data_response = self.get_fields_data_using_schema_name( + object_type=stream + ) + fields_data_list = fields_data_response["data"]["Type"]["Fields"]["Field"] for rec in fields_data_list: - if rec['ID'] in IGNORE_FIELDS: + if rec["ID"] in IGNORE_FIELDS: continue - if rec['DATATYPE'] in ['PERCENT', 'DECIMAL']: - type_data_type = 'number' - elif rec['DATATYPE'] == 'BOOLEAN': - type_data_type = 'boolean' - elif rec['DATATYPE'] in ['DATE', 'TIMESTAMP']: - type_data_type = 'date-time' + if rec["DATATYPE"] in ["PERCENT", "DECIMAL"]: + type_data_type = "number" + elif rec["DATATYPE"] == "BOOLEAN": + type_data_type = "boolean" + elif rec["DATATYPE"] in ["DATE", "TIMESTAMP"]: + type_data_type = "date-time" else: - type_data_type = 'string' - if type_data_type in ['string', 'boolean', 'number']: - format_dict = {'type': ["null", type_data_type]} + type_data_type = "string" + if type_data_type in ["string", "boolean", "number"]: + format_dict = {"type": ["null", type_data_type]} else: - if type_data_type in ['date', 'date-time']: - format_dict = {'type': ["null", 'string'], 'format': type_data_type} + if type_data_type in ["date", "date-time"]: + format_dict = {"type": ["null", "string"], "format": type_data_type} - schema_dict['properties'][rec['ID']] = format_dict - schema_dict['required'] = required_list + schema_dict["properties"][rec["ID"]] = format_dict + schema_dict["required"] = required_list return schema_dict + def get_client( *, api_url: str, @@ -517,7 +539,7 @@ def get_client( sender_password: str, user_id: str, user_password: str, - headers: Dict, + headers: dict, ) -> SageIntacctSDK: """ Initializes and returns a SageIntacctSDK object. diff --git a/tap_intacct/streams.py b/tap_intacct/streams.py index 76c2bec..53c2ac7 100644 --- a/tap_intacct/streams.py +++ b/tap_intacct/streams.py @@ -2,15 +2,32 @@ from __future__ import annotations +import json import typing as t -from datetime import datetime -from importlib import resources +import uuid +from datetime import datetime, timezone -from singer_sdk.helpers.jsonpath import extract_jsonpath +import xmltodict from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002 from singer_sdk.streams import RESTStream -from tap_intacct.const import KEY_PROPERTIES +from tap_intacct.const import GET_BY_DATE_FIELD, KEY_PROPERTIES, REP_KEYS +from tap_intacct.exceptions import ( + AuthFailure, + BadGatewayError, + ExpiredTokenError, + InternalServerError, + InvalidRequest, + InvalidTokenError, + InvalidXmlResponse, + NoPrivilegeError, + NotFoundItemError, + OfflineServiceError, + PleaseTryAgainLaterError, + RateLimitError, + SageIntacctSDKError, + WrongParamsError, +) if t.TYPE_CHECKING: import requests @@ -21,10 +38,8 @@ class IntacctStream(RESTStream): """Intacct stream class.""" # Update this value if necessary or override `parse_response`. - records_jsonpath = "$[*]" - - # Update this value if necessary or override `get_new_paginator`. - next_page_token_jsonpath = "$.next_page" # noqa: S105 + rest_method = "POST" + path = None def __init__( self, @@ -39,29 +54,12 @@ def __init__( self.intacct_obj_name = intacct_obj_name self.replication_key = replication_key self.sage_client = sage_client - self.datetime_fields = [i for i, t in self.schema["properties"].items() if t.get("format") == "date-time"] - - # TODO refactor this out - def get_records( - self, - context: Context | None, # noqa: ARG002 - ) -> t.Iterable[dict]: - """Return a generator of record-type dictionary objects. - - The optional `context` argument is used to identify a specific slice of the - stream if partitioning is required for the stream. Most implementations do not - require partitioning and should ignore the `context` argument. - - Args: - context: Stream partition or context dictionary. - """ - for record in self.sage_client.get_by_date( - object_type=self.name, - fields=list(self.schema["properties"]), - from_date=self.get_starting_timestamp(context), - ): - yield self.post_process(record) - + self.session_id = sage_client._SageIntacctSDK__session_id + self.datetime_fields = [ + i + for i, t in self.schema["properties"].items() + if t.get("format") == "date-time" + ] @property def url_base(self) -> str: @@ -75,7 +73,7 @@ def http_headers(self) -> dict: Returns: A dictionary of HTTP headers. """ - headers = {} + headers = {"content-type": "application/xml"} if "user_agent" in self.config: headers["User-Agent"] = self.config.get("user_agent") # If not using an authenticator, you may also provide inline auth headers: @@ -97,27 +95,51 @@ def get_new_paginator(self) -> BaseAPIPaginator: """ return super().get_new_paginator() - def get_url_params( + def _format_date_for_intacct(self, datetime: datetime) -> str: + """Intacct expects datetimes in a 'MM/DD/YY HH:MM:SS' string format. + + Args: + datetime: The datetime to be converted. + + Returns: + 'MM/DD/YY HH:MM:SS' formatted string. + """ + return datetime.strftime("%m/%d/%Y %H:%M:%S") + + def prepare_request( self, - context: Context | None, # noqa: ARG002 - next_page_token: t.Any | None, # noqa: ANN401 - ) -> dict[str, t.Any]: - """Return a dictionary of values to be used in URL parameterization. + context: Context | None, + next_page_token: str | None, + ) -> requests.PreparedRequest: + """Prepare a request object for this stream. + + If partitioning is supported, the `context` object will contain the partition + definitions. Pagination information can be parsed from `next_page_token` if + `next_page_token` is not None. Args: - context: The stream context. - next_page_token: The next page index or value. + context: Stream partition or context dictionary. + next_page_token: Token, page number or any request argument to request the + next page of data. Returns: - A dictionary of URL query parameters. + Build a request with the stream's URL, path, query parameters, + HTTP headers and authenticator. """ - params: dict = {} - # if next_page_token: - # params["page"] = next_page_token - # if self.replication_key: - # params["sort"] = "asc" - # params["order_by"] = self.replication_key - return params + http_method = self.rest_method + url: str = self.get_url(context) + params: dict | str = self.get_url_params(context, next_page_token) + request_data = self.prepare_request_payload(context, next_page_token) + headers = self.http_headers + + return self.build_prepared_request( + method=http_method, + url=url, + params=params, + headers=headers, + # Note: Had to override this method to switch this to data instead of json + data=request_data, + ) def prepare_request_payload( self, @@ -135,8 +157,57 @@ def prepare_request_payload( Returns: A dictionary with the JSON body for a POST requests. """ - # TODO: Delete this method if no payload is required. (Most REST APIs.) - return None + if self.name == "audit_history": + raise Exception("TODO hanlde audit streams") + + rep_key = REP_KEYS.get(self.name, GET_BY_DATE_FIELD) + query_filter = { + "greaterthanorequalto": { + "field": rep_key, + "value": self._format_date_for_intacct( + self.get_starting_timestamp(context) + ), + } + } + orderby = { + "order": { + "field": rep_key, + "ascending": {}, + } + } + data = { + "query": { + "object": self.intacct_obj_name, + "select": {"field": list(self.schema["properties"])}, + "options": {"showprivate": "true"}, + "filter": query_filter, + "pagesize": 1000, + # TODO: need to paginate here + "offset": 0, + "orderby": orderby, + } + } + key = next(iter(data)) + timestamp = datetime.now(timezone.utc) + dict_body = { + "request": { + "control": { + "senderid": self.config["sender_id"], + "password": self.config["sender_password"], + "controlid": timestamp, + "uniqueid": False, + "dtdversion": 3.0, + "includewhitespace": False, + }, + "operation": { + "authentication": {"sessionid": self.session_id}, + "content": { + "function": {"@controlid": str(uuid.uuid4()), key: data[key]} + }, + }, + } + } + return xmltodict.unparse(dict_body) def parse_response(self, response: requests.Response) -> t.Iterable[dict]: """Parse the response and return an iterator of result records. @@ -147,20 +218,126 @@ def parse_response(self, response: requests.Response) -> t.Iterable[dict]: Yields: Each record from the source. """ - # TODO: Parse response body and return a set of records. - yield from extract_jsonpath(self.records_jsonpath, input=response.json()) - - def parse_datetime(self, date_str): + try: + parsed_xml = xmltodict.parse(response.text) + parsed_response = json.loads(json.dumps(parsed_xml)) + except: + if response.status_code == 502: + raise BadGatewayError( + f"Response status code: {response.status_code}, response: {response.text}" + ) + if response.status_code == 503: + raise OfflineServiceError( + f"Response status code: {response.status_code}, response: {response.text}" + ) + if response.status_code == 429: + raise RateLimitError( + f"Response status code: {response.status_code}, response: {response.text}" + ) + raise InvalidXmlResponse( + f"Response status code: {response.status_code}, response: {response.text}" + ) + + if response.status_code == 200: + if parsed_response["response"]["control"]["status"] == "success": + api_response = parsed_response["response"]["operation"] + + if parsed_response["response"]["control"]["status"] == "failure": + exception_msg = self.sage_client.decode_support_id( + parsed_response["response"]["errormessage"] + ) + raise WrongParamsError( + "Some of the parameters are wrong", exception_msg + ) + + if api_response["authentication"]["status"] == "failure": + raise InvalidTokenError( + "Invalid token / Incorrect credentials", + api_response["errormessage"], + ) + + if api_response["result"]["status"] == "success": + return api_response["result"]["data"][self.intacct_obj_name] + + self.logger.error(f"Intacct error response: {api_response}") + error = ( + api_response.get("result", {}).get("errormessage", {}).get("error", {}) + ) + desc_2 = ( + error.get("description2") + if isinstance(error, dict) + else error[0].get("description2") + if isinstance(error, list) and error + else "" + ) + # if ( + # api_response['result']['status'] == 'failure' + # and error + # and "There was an error processing the request" + # in desc_2 + # and dict_body["request"]["operation"]["content"]["function"]["query"][ + # "object" + # ] + # == "AUDITHISTORY" + # ): + # return {"result": "skip_and_paginate"} + + exception_msg = ( + parsed_response.get("response", {}).get("errormessage", {}).get("error", {}) + ) + correction = exception_msg.get("correction", {}) + + if response.status_code == 400: + if exception_msg.get("errorno") == "GW-0011": + raise AuthFailure( + f"One or more authentication values are incorrect. Response:{parsed_response}" + ) + raise InvalidRequest("Invalid request", parsed_response) + + if response.status_code == 401: + raise InvalidTokenError( + f"Invalid token / Incorrect credentials. Response: {parsed_response}" + ) + + if response.status_code == 403: + raise NoPrivilegeError( + f"Forbidden, the user has insufficient privilege. Response: {parsed_response}" + ) + + if response.status_code == 404: + raise NotFoundItemError( + f"Not found item with ID. Response: {parsed_response}" + ) + + if response.status_code == 498: + raise ExpiredTokenError( + f"Expired token, try to refresh it. Response: {parsed_response}" + ) + + if response.status_code == 500: + raise InternalServerError( + f"Internal server error. Response: {parsed_response}" + ) + + if correction and "Please Try Again Later" in correction: + raise PleaseTryAgainLaterError(parsed_response) + + raise SageIntacctSDKError("Error: {0}".format(parsed_response)) + + def _parse_to_datetime(self, date_str: str) -> datetime: # Try to parse with the full format first try: return datetime.strptime(date_str, "%m/%d/%Y %H:%M:%S") + # .replace(tzinfo=datetime.timezone.utc) except ValueError: # If it fails, try the date-only format try: return datetime.strptime(date_str, "%m/%d/%Y") - except ValueError: + # .replace(tzinfo=datetime.timezone.utc) + except ValueError as err: # Handle cases where the format is still incorrect - raise ValueError(f"Invalid date format: {date_str}") + msg = f"Invalid date format: {date_str}" + raise ValueError(msg) from err def post_process( self, @@ -176,8 +353,7 @@ def post_process( Returns: The updated record dictionary, or ``None`` to skip the record. """ - # TODO: Delete this method if not needed. for field in self.datetime_fields: if row[field] is not None: - row[field] = self.parse_datetime(row[field]) + row[field] = self._parse_to_datetime(row[field]) return row diff --git a/tap_intacct/tap.py b/tap_intacct/tap.py index 58956f1..6948e95 100644 --- a/tap_intacct/tap.py +++ b/tap_intacct/tap.py @@ -6,8 +6,9 @@ from singer_sdk import typing as th # JSON schema typing helpers from tap_intacct import streams -from tap_intacct.sage import get_client from tap_intacct.const import INTACCT_OBJECTS +from tap_intacct.sage import get_client + class TapIntacct(Tap): """Intacct tap class.""" @@ -55,10 +56,6 @@ class TapIntacct(Tap): ), ).to_dict() - def _get_stream_schema(self): - """Return the schema for the stream.""" - return streams.SageStream.schema - def discover_streams(self) -> list[streams.TableStream]: """Return a list of discovered streams.