Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.0 add additional methods #27

Merged
merged 24 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 222 additions & 4 deletions jupiterone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
CREATE_RELATIONSHIP,
DELETE_RELATIONSHIP,
CURSOR_QUERY_V1,
CREATE_INSTANCE,
ALL_PROPERTIES,
CREATE_SMARTCLASS,
CREATE_SMARTCLASS_QUERY,
RUN_SMARTCLASS_EVALUATION
)


Expand All @@ -41,6 +46,7 @@
# pylint: disable=too-many-instance-attributes

DEFAULT_URL = "https://graphql.us.jupiterone.io"
SYNC_API_URL = "https://api.us.jupiterone.io"

RETRY_OPTS = {
"wait_exponential_multiplier": 1000,
Expand All @@ -49,15 +55,17 @@
"retry_on_exception": retry_on_429,
}

def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL):
def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL, sync_url: str = SYNC_API_URL):
self.account = account
self.token = token
self.url = url
self.query_endpoint = self.url
self.rules_endpoint = self.url + "/rules/graphql"
self.sync_url = sync_url
self.headers = {
"Authorization": "Bearer {}".format(self.token),
"JupiterOne-Account": self.account,
"Content-Type": "application/json"
}

@property
Expand Down Expand Up @@ -98,7 +106,7 @@

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 502, 503, 504])
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
Expand Down Expand Up @@ -127,8 +135,6 @@
)

elif response.status_code in [429, 503]:
print(response.status_code)
print(response.content)
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
Expand Down Expand Up @@ -233,6 +239,57 @@

return {"data": results}

def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict:
Dismissed Show dismissed Hide dismissed
"""Executes POST request to SyncAPI endpoints"""

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60
)

# It is still unclear if all responses will have a status
# code of 200 or if 429 will eventually be used to
# indicate rate limits being hit. J1 devs are aware.
if response.status_code == 200:
if response._content:
content = json.loads(response._content)
if "errors" in content:
errors = content["errors"]
if len(errors) == 1:
if "429" in errors[0]["message"]:
raise JupiterOneApiRetryError(
"JupiterOne API rate limit exceeded"
)
raise JupiterOneApiError(content.get("errors"))
return response.json()

elif response.status_code == 401:
raise JupiterOneApiError(
"401: Unauthorized. Please supply a valid account id and API token."
)

elif response.status_code in [429, 503]:
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
raise JupiterOneApiRetryError("Gateway Timeout.")

elif response.status_code in [500]:
raise JupiterOneApiError("JupiterOne API internal server error.")

else:
content = response._content
if isinstance(content, (bytes, bytearray)):
content = content.decode("utf-8")
if "application/json" in response.headers.get("Content-Type", "text/plain"):
data = json.loads(content)
content = data.get("error", data.get("errors", content))
raise JupiterOneApiError("{}:{}".format(response.status_code, content))

def query_v1(self, query: str, **kwargs) -> Dict:
"""Performs a V1 graph query
args:
Expand Down Expand Up @@ -349,3 +406,164 @@

response = self._execute_query(DELETE_RELATIONSHIP, variables=variables)
return response["data"]["deleteRelationship"]

def create_integration_instance(self, instance_name: str = None, instance_description: str = None):
"""Creates a new Custom Integration Instance.

args:
instance_name (str): The "Account name" for integration instance
instance_description (str): The "Description" for integration instance
"""
variables = {
"instance": {
"name": instance_name,
"description": instance_description,
"integrationDefinitionId": "8013680b-311a-4c2e-b53b-c8735fd97a5c",
SeaBlooms marked this conversation as resolved.
Show resolved Hide resolved
"pollingInterval": "DISABLED",
"config": {
"@tag": {
"Production": False,
"AccountName": True
}
},
"pollingIntervalCronExpression": {},
"ingestionSourcesOverrides": []
}
}

response = self._execute_query(CREATE_INSTANCE, variables=variables)
return response['data']['createIntegrationInstance']

def fetch_all_entity_properties(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('parameter.', 'tag.')) == False:

return_list.append(i)

return return_list

def fetch_all_entity_tags(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('tag.')) == True:

return_list.append(i)

return return_list

def start_sync_job(self, instance_id: str = None):
"""Start a synchronization job.

args:
instance_id (str): The "integrationInstanceId" request param for synchronization job
"""
endpoint = "/persister/synchronization/jobs"

data = {
"source": "integration-managed",
"integrationInstanceId": instance_id
}

response = self._execute_syncapi_request(endpoint=endpoint, headers=self.headers, payload=data)
Fixed Show fixed Hide fixed

return response

def upload_entities_batch_json(self, instance_job_id: str = None, entities_list: list = None):
SeaBlooms marked this conversation as resolved.
Show resolved Hide resolved
"""Upload batch of entities.

args:
instance_job_id (str): The "Job ID" for the Custom Integration job
entities_list (list): List of Dictionaries containing entities data to upload
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/entities"

data = {
"entities": entities_list
}

response = self._execute_syncapi_request(endpoint=endpoint, payload=data)

return response

def finalize_sync_job(self, instance_job_id: str = None):
"""Start a synchronization job.

args:
instance_job_id (str): The "Job ID" for the Custom Integration job
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/finalize"

data = {}

response = self._execute_syncapi_request(endpoint=endpoint, payload=data)

return response

def create_smartclass(self, smartclass_name: str = None, smartclass_description: str = None):
"""Creates a new Smart Class within Assets.

args:
smartclass_name (str): The "Smart class name" for Smart Class to be created.
smartclass_description (str): The "Description" for Smart Class to be created.
"""

variables = {
"input": {
"tagName": smartclass_name,
"description": smartclass_description
}
}

response = self._execute_query(CREATE_SMARTCLASS, variables=variables)
return response['data']['createSmartClass']

def create_smartclass_query(self, smartclass_id: str = None, query: str = None, query_description: str = None):
"""Creates a new J1QL Query within a defined Smart Class.

args:
smartclass_id (str): The unique ID of the Smart Class the query is created within.
query (str): The J1QL for the query being created.
query_description (str): The description of the query being created.
"""

variables = {
"input": {
"smartClassId": smartclass_id,
"query": query,
"description": query_description
}
}

response = self._execute_query(CREATE_SMARTCLASS_QUERY, variables=variables)
return response['data']['createSmartClassQuery']

def trigger_smartclass_evaluation(self, smartclass_id: str = None):
"""Execute an on-demand Evaluation of a defined Smartclass.

args:
smartclass_id (str): The unique ID of the Smart Class the query is created within.
"""

variables = {
"smartClassId": smartclass_id
}

response = self._execute_query(RUN_SMARTCLASS_EVALUATION, variables=variables)
return response['data']['evaluateSmartClassRule']
54 changes: 54 additions & 0 deletions jupiterone/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,58 @@
}
}
}
"""

CREATE_INSTANCE = """
mutation CreateInstance($instance: CreateIntegrationInstanceInput!) {
createIntegrationInstance(instance: $instance) {
id
name
accountId
pollingInterval
integrationDefinitionId
description
config
}
}
"""

ALL_PROPERTIES = """
query getAllAssetProperties {
getAllAssetProperties
}
"""

CREATE_SMARTCLASS = """
mutation CreateSmartClass($input: CreateSmartClassInput!) {
createSmartClass(input: $input) {
id
accountId
tagName
description
ruleId
__typename
}
}
"""

CREATE_SMARTCLASS_QUERY = """
mutation CreateSmartClassQuery($input: CreateSmartClassQueryInput!) {
createSmartClassQuery(input: $input) {
id
smartClassId
description
query
__typename
}
}
"""

RUN_SMARTCLASS_EVALUATION = """
mutation EvaluateSmartClassRule($smartClassId: ID!) {
evaluateSmartClassRule(smartClassId: $smartClassId) {
ruleId
__typename
}
}
"""
Loading