Skip to content

Commit

Permalink
Define constant for api version and pylint fix
Browse files Browse the repository at this point in the history
  • Loading branch information
prijendev committed Oct 11, 2024
1 parent ac9d892 commit 516a726
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
13 changes: 6 additions & 7 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from singer import metadata, metrics

from tap_salesforce.salesforce.bulk import Bulk
from tap_salesforce.salesforce.rest import Rest
from tap_salesforce.salesforce.rest import Rest, API_VERSION
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException,
TapSalesforceQuotaExceededException)
Expand Down Expand Up @@ -206,9 +206,8 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches

return property_schema, mdata

#pylint: disable=too-many-positional-arguments
class Salesforce():
# pylint: disable=too-many-instance-attributes,too-many-arguments
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-positional-arguments
def __init__(self,
refresh_token=None,
token=None,
Expand Down Expand Up @@ -243,7 +242,7 @@ def __init__(self,
self.rest_requests_attempted = 0
self.jobs_completed = 0
self.login_timer = None
self.data_url = "{}/services/data/v61.0/{}"
self.data_url = "{}/services/data/v{}.0/{}"
self.pk_chunking = False
self.lookback_window = lookback_window

Expand Down Expand Up @@ -284,7 +283,7 @@ def check_rest_quota_usage(self, headers):
self.quota_percent_per_run)
raise TapSalesforceQuotaExceededException(partial_message)

# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments,too-many-positional-arguments
@backoff.on_exception(backoff.expo,
(requests.exceptions.ConnectionError, requests.exceptions.Timeout),
max_tries=10,
Expand Down Expand Up @@ -369,11 +368,11 @@ def describe(self, sobject=None):
if sobject is None:
endpoint = "sobjects"
endpoint_tag = "sobjects"
url = self.data_url.format(self.instance_url, endpoint)
url = self.data_url.format(self.instance_url, API_VERSION, endpoint)
else:
endpoint = "sobjects/{}/describe".format(sobject)
endpoint_tag = sobject
url = self.data_url.format(self.instance_url, endpoint)
url = self.data_url.format(self.instance_url, API_VERSION, endpoint)

with metrics.http_request_timer("describe") as timer:
timer.tags['endpoint'] = endpoint_tag
Expand Down
19 changes: 10 additions & 9 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import xmltodict

from tap_salesforce.salesforce.rest import API_VERSION
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException, TapSalesforceQuotaExceededException)

Expand Down Expand Up @@ -41,7 +42,7 @@ def find_parent(stream):

class Bulk():

bulk_url = "{}/services/async/61.0/{}"
bulk_url = "{}/services/async/{}.0/{}"

def __init__(self, sf):
# Set csv max reading size to the platform's max size available.
Expand Down Expand Up @@ -163,7 +164,7 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
return batch_status

def _create_job(self, catalog_entry, pk_chunking=False):
url = self.bulk_url.format(self.sf.instance_url, "job")
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, "job")
body = {"operation": "queryAll", "object": catalog_entry['stream'], "contentType": "CSV"}

headers = self._get_bulk_headers()
Expand Down Expand Up @@ -194,7 +195,7 @@ def _create_job(self, catalog_entry, pk_chunking=False):
#pylint: disable=too-many-positional-arguments
def _add_batch(self, catalog_entry, job_id, start_date, end_date=None, order_by_clause=True):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)

body = self.sf._build_query_string(catalog_entry, start_date, end_date, order_by_clause=order_by_clause)

Expand Down Expand Up @@ -238,7 +239,7 @@ def _poll_on_batch_status(self, job_id, batch_id):
def job_exists(self, job_id):
try:
endpoint = "job/{}".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)
headers = self._get_bulk_headers()

with metrics.http_request_timer("get_job"):
Expand All @@ -255,7 +256,7 @@ def job_exists(self, job_id):

def _get_batches(self, job_id):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)
headers = self._get_bulk_headers()

with metrics.http_request_timer("get_batches"):
Expand All @@ -269,7 +270,7 @@ def _get_batches(self, job_id):

def _get_batch(self, job_id, batch_id):
endpoint = "job/{}/batch/{}".format(job_id, batch_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)
headers = self._get_bulk_headers()

with metrics.http_request_timer("get_batch"):
Expand All @@ -284,7 +285,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry):
CSV lines yielding each line as a record."""
headers = self._get_bulk_headers()
endpoint = "job/{}/batch/{}/result".format(job_id, batch_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)

with metrics.http_request_timer("batch_result_list") as timer:
timer.tags['sobject'] = catalog_entry['stream']
Expand All @@ -299,7 +300,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry):

for result in batch_result_list['result']:
endpoint = "job/{}/batch/{}/result/{}".format(job_id, batch_id, result)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)
headers['Content-Type'] = 'text/csv'

with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8") as csv_file:
Expand All @@ -322,7 +323,7 @@ def get_batch_results(self, job_id, batch_id, catalog_entry):

def _close_job(self, job_id):
endpoint = "job/{}".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
url = self.bulk_url.format(self.sf.instance_url, API_VERSION, endpoint)
body = {"state": "Closed"}

with metrics.http_request_timer("close_job"):
Expand Down
4 changes: 2 additions & 2 deletions tap_salesforce/salesforce/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tap_salesforce.salesforce.exceptions import TapSalesforceException

LOGGER = singer.get_logger()

API_VERSION = '61'
MAX_RETRIES = 4

class Rest():
Expand All @@ -28,7 +28,7 @@ def _query_recur(
end_date=None,
retries=MAX_RETRIES):
params = {"q": query}
url = "{}/services/data/v61.0/queryAll".format(self.sf.instance_url)
url = "{}/services/data/v{}.0/queryAll".format(self.sf.instance_url, API_VERSION)
headers = self.sf._get_standard_headers()

sync_start = singer_utils.now()
Expand Down

0 comments on commit 516a726

Please sign in to comment.