Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.0 add additional methods #27

Merged
merged 24 commits into from
Sep 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update client.py
  • Loading branch information
SeaBlooms committed Aug 28, 2024
commit 3247b9c9b8db0e59b557b245fcdd44ffa6a51d36
172 changes: 168 additions & 4 deletions jupiterone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
CREATE_RELATIONSHIP,
DELETE_RELATIONSHIP,
CURSOR_QUERY_V1,
CREATE_INSTANCE,
ALL_PROPERTIES
)


Expand All @@ -41,6 +43,7 @@
# pylint: disable=too-many-instance-attributes

DEFAULT_URL = "https://graphql.us.jupiterone.io"
SYNC_API_URL = "https://api.us.jupiterone.io"

RETRY_OPTS = {
"wait_exponential_multiplier": 1000,
Expand All @@ -49,12 +52,13 @@
"retry_on_exception": retry_on_429,
}

def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL):
def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL, sync_url: str = SYNC_API_URL):
self.account = account
self.token = token
self.url = url
self.query_endpoint = self.url
self.rules_endpoint = self.url + "/rules/graphql"
self.sync_url = sync_url
self.headers = {
"Authorization": "Bearer {}".format(self.token),
"JupiterOne-Account": self.account,
Expand Down Expand Up @@ -98,7 +102,7 @@

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 502, 503, 504])
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
Expand Down Expand Up @@ -127,8 +131,6 @@
)

elif response.status_code in [429, 503]:
print(response.status_code)
print(response.content)
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
Expand Down Expand Up @@ -233,6 +235,57 @@

return {"data": results}

def _execute_syncapi_request(self, endpoint: str, headers: Dict = None, payload: Dict = None) -> Dict:
Fixed Show fixed Hide fixed
"""Executes POST request to SyncAPI endpoints"""

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60
)

# It is still unclear if all responses will have a status
# code of 200 or if 429 will eventually be used to
# indicate rate limits being hit. J1 devs are aware.
if response.status_code == 200:
if response._content:
content = json.loads(response._content)
if "errors" in content:
errors = content["errors"]
if len(errors) == 1:
if "429" in errors[0]["message"]:
raise JupiterOneApiRetryError(
"JupiterOne API rate limit exceeded"
)
raise JupiterOneApiError(content.get("errors"))
return response.json()

elif response.status_code == 401:
raise JupiterOneApiError(
"401: Unauthorized. Please supply a valid account id and API token."
)

elif response.status_code in [429, 503]:
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
raise JupiterOneApiRetryError("Gateway Timeout.")

elif response.status_code in [500]:
raise JupiterOneApiError("JupiterOne API internal server error.")

else:
content = response._content
if isinstance(content, (bytes, bytearray)):
content = content.decode("utf-8")
if "application/json" in response.headers.get("Content-Type", "text/plain"):
data = json.loads(content)
content = data.get("error", data.get("errors", content))
raise JupiterOneApiError("{}:{}".format(response.status_code, content))

def query_v1(self, query: str, **kwargs) -> Dict:
"""Performs a V1 graph query
args:
Expand Down Expand Up @@ -349,3 +402,114 @@

response = self._execute_query(DELETE_RELATIONSHIP, variables=variables)
return response["data"]["deleteRelationship"]

def create_integration_instance(self, instance_name: str = None, instance_description: str = None):
"""Creates a new Custom Integration Instance.

args:
instance_name (str): The "Account name" for integration instance
instance_description (str): The "Description" for integration instance
"""
variables = {
"instance": {
"name": instance_name,
"description": instance_description,
"integrationDefinitionId": "8013680b-311a-4c2e-b53b-c8735fd97a5c",
SeaBlooms marked this conversation as resolved.
Show resolved Hide resolved
"pollingInterval": "DISABLED",
"config": {
"@tag": {
"Production": False,
"AccountName": True
}
},
"pollingIntervalCronExpression": {},
"ingestionSourcesOverrides": []
}
}

response = self._execute_query(CREATE_INSTANCE, variables=variables)
return response['data']['createIntegrationInstance']

def fetch_all_entity_properties(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('parameter.', 'tag.')) == False:

return_list.append(i)

return return_list

def fetch_all_entity_tags(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('tag.')) == True:

return_list.append(i)

return return_list

def start_sync_job(self, instance_id: str = None):
"""Start a synchronization job.

args:
instance_id (str): The "integrationInstanceId" request param for synchronization job
"""
endpoint = "/persister/synchronization/jobs"

data = {
"source": "integration-managed",
"integrationInstanceId": instance_id
}

response = self._execute_syncapi_request(endpoint=endpoint, headers=self.headers, payload=data)
Fixed Show fixed Hide fixed

return response

def upload_entities_batch_json(self, instance_job_id: str = None, entities_list: list = None):
SeaBlooms marked this conversation as resolved.
Show resolved Hide resolved
"""Upload batch of entities.

args:
instance_job_id (str): The "Job ID" for the Custom Integration integration job
entities_list (list): List of Dictionaries containing entities data to upload
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/entities"

data = {
"entities": entities_list
}

json_headers = self.headers.update({'Content-Type': 'application/json'})

response = self._execute_syncapi_request(endpoint=endpoint, headers=json_headers, payload=data)

return response

def finalize_sync_job(self, instance_job_id: str = None):
"""Start a synchronization job.

args:
instance_job_id (str): The "Job ID" for the Custom Integration integration job
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/finalize"

data = {}

response = self._execute_syncapi_request(endpoint=endpoint, headers=self.headers, payload=data)

return response
Loading