From 516a726bf77e84bcfd821c976c7c27a5f50541ce Mon Sep 17 00:00:00 2001 From: prijendev Date: Fri, 11 Oct 2024 09:56:27 +0530 Subject: [PATCH] Define constant for api version and pylint fix --- tap_salesforce/salesforce/__init__.py | 13 ++++++------- tap_salesforce/salesforce/bulk.py | 19 ++++++++++--------- tap_salesforce/salesforce/rest.py | 4 ++-- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index f144b53..7e3e7f9 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -10,7 +10,7 @@ from singer import metadata, metrics from tap_salesforce.salesforce.bulk import Bulk -from tap_salesforce.salesforce.rest import Rest +from tap_salesforce.salesforce.rest import Rest, API_VERSION from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException) @@ -206,9 +206,8 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches return property_schema, mdata -#pylint: disable=too-many-positional-arguments class Salesforce(): - # pylint: disable=too-many-instance-attributes,too-many-arguments + # pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-positional-arguments def __init__(self, refresh_token=None, token=None, @@ -243,7 +242,7 @@ def __init__(self, self.rest_requests_attempted = 0 self.jobs_completed = 0 self.login_timer = None - self.data_url = "{}/services/data/v61.0/{}" + self.data_url = "{}/services/data/v{}.0/{}" self.pk_chunking = False self.lookback_window = lookback_window @@ -284,7 +283,7 @@ def check_rest_quota_usage(self, headers): self.quota_percent_per_run) raise TapSalesforceQuotaExceededException(partial_message) - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments,too-many-positional-arguments @backoff.on_exception(backoff.expo, (requests.exceptions.ConnectionError, requests.exceptions.Timeout), max_tries=10, @@ -369,11 +368,11 @@ def describe(self, sobject=None): if sobject is None: endpoint = "sobjects" endpoint_tag = "sobjects" - url = self.data_url.format(self.instance_url, endpoint) + url = self.data_url.format(self.instance_url, API_VERSION, endpoint) else: endpoint = "sobjects/{}/describe".format(sobject) endpoint_tag = sobject - url = self.data_url.format(self.instance_url, endpoint) + url = self.data_url.format(self.instance_url, API_VERSION, endpoint) with metrics.http_request_timer("describe") as timer: timer.tags['endpoint'] = endpoint_tag diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index bc3c68c..14420e7 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -12,6 +12,7 @@ import xmltodict +from tap_salesforce.salesforce.rest import API_VERSION from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException) @@ -41,7 +42,7 @@ def find_parent(stream): class Bulk(): - bulk_url = "{}/services/async/61.0/{}" + bulk_url = "{}/services/async/{}.0/{}" def __init__(self, sf): # Set csv max reading size to the platform's max size available. @@ -163,7 +164,7 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): return batch_status def _create_job(self, catalog_entry, pk_chunking=False): - url = self.bulk_url.format(self.sf.instance_url, "job") + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, "job") body = {"operation": "queryAll", "object": catalog_entry['stream'], "contentType": "CSV"} headers = self._get_bulk_headers() @@ -194,7 +195,7 @@ def _create_job(self, catalog_entry, pk_chunking=False): #pylint: disable=too-many-positional-arguments def _add_batch(self, catalog_entry, job_id, start_date, end_date=None, order_by_clause=True): endpoint = "job/{}/batch".format(job_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) body = self.sf._build_query_string(catalog_entry, start_date, end_date, order_by_clause=order_by_clause) @@ -238,7 +239,7 @@ def _poll_on_batch_status(self, job_id, batch_id): def job_exists(self, job_id): try: endpoint = "job/{}".format(job_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) headers = self._get_bulk_headers() with metrics.http_request_timer("get_job"): @@ -255,7 +256,7 @@ def job_exists(self, job_id): def _get_batches(self, job_id): endpoint = "job/{}/batch".format(job_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) headers = self._get_bulk_headers() with metrics.http_request_timer("get_batches"): @@ -269,7 +270,7 @@ def _get_batches(self, job_id): def _get_batch(self, job_id, batch_id): endpoint = "job/{}/batch/{}".format(job_id, batch_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) headers = self._get_bulk_headers() with metrics.http_request_timer("get_batch"): @@ -284,7 +285,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry): CSV lines yielding each line as a record.""" headers = self._get_bulk_headers() endpoint = "job/{}/batch/{}/result".format(job_id, batch_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) with metrics.http_request_timer("batch_result_list") as timer: timer.tags['sobject'] = catalog_entry['stream'] @@ -299,7 +300,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry): for result in batch_result_list['result']: endpoint = "job/{}/batch/{}/result/{}".format(job_id, batch_id, result) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) headers['Content-Type'] = 'text/csv' with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8") as csv_file: @@ -322,7 +323,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry): def _close_job(self, job_id): endpoint = "job/{}".format(job_id) - url = self.bulk_url.format(self.sf.instance_url, endpoint) + url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint) body = {"state": "Closed"} with metrics.http_request_timer("close_job"): diff --git a/tap_salesforce/salesforce/rest.py b/tap_salesforce/salesforce/rest.py index 8ddeddd..a8807fe 100644 --- a/tap_salesforce/salesforce/rest.py +++ b/tap_salesforce/salesforce/rest.py @@ -5,7 +5,7 @@ from tap_salesforce.salesforce.exceptions import TapSalesforceException LOGGER = singer.get_logger() - +API_VERSION = '61' MAX_RETRIES = 4 class Rest(): @@ -28,7 +28,7 @@ def _query_recur( end_date=None, retries=MAX_RETRIES): params = {"q": query} - url = "{}/services/data/v61.0/queryAll".format(self.sf.instance_url) + url = "{}/services/data/v{}.0/queryAll".format(self.sf.instance_url, API_VERSION) headers = self.sf._get_standard_headers() sync_start = singer_utils.now()