Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.0 add additional methods #27

Merged
merged 24 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 242 additions & 4 deletions jupiterone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
CREATE_RELATIONSHIP,
DELETE_RELATIONSHIP,
CURSOR_QUERY_V1,
CREATE_INSTANCE,
ALL_PROPERTIES,
CREATE_SMARTCLASS,
CREATE_SMARTCLASS_QUERY,
EVALUATE_SMARTCLASS,
GET_SMARTCLASS_DETAILS
)


Expand All @@ -41,6 +47,7 @@
# pylint: disable=too-many-instance-attributes

DEFAULT_URL = "https://graphql.us.jupiterone.io"
SYNC_API_URL = "https://api.us.jupiterone.io"

RETRY_OPTS = {
"wait_exponential_multiplier": 1000,
Expand All @@ -49,15 +56,17 @@
"retry_on_exception": retry_on_429,
}

def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL):
def __init__(self, account: str = None, token: str = None, url: str = DEFAULT_URL, sync_url: str = SYNC_API_URL):
self.account = account
self.token = token
self.url = url
self.query_endpoint = self.url
self.rules_endpoint = self.url + "/rules/graphql"
self.sync_url = sync_url
self.headers = {
"Authorization": "Bearer {}".format(self.token),
"JupiterOne-Account": self.account,
"Content-Type": "application/json"
}

@property
Expand Down Expand Up @@ -98,7 +107,7 @@

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 502, 503, 504])
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
Expand Down Expand Up @@ -127,8 +136,6 @@
)

elif response.status_code in [429, 503]:
print(response.status_code)
print(response.content)
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
Expand Down Expand Up @@ -233,6 +240,58 @@

return {"data": results}

def _execute_syncapi_request(self, endpoint: str, payload: Dict):
"""Executes POST request to SyncAPI endpoints"""

# initiate requests session and implement retry logic of 5 request retries with 1 second between
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))

response = s.post(
self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60
)

# It is still unclear if all responses will have a status
# code of 200 or if 429 will eventually be used to
# indicate rate limits being hit. J1 devs are aware.
if response.status_code == 200:
if response._content:
content = json.loads(response._content)
if "errors" in content:
errors = content["errors"]
if len(errors) == 1:
if "429" in errors[0]["message"]:
raise JupiterOneApiRetryError(
"JupiterOne API rate limit exceeded"
)
raise JupiterOneApiError(content.get("errors"))

return response.json()

elif response.status_code == 401:
raise JupiterOneApiError(
"401: Unauthorized. Please supply a valid account id and API token."
)

elif response.status_code in [429, 503]:
raise JupiterOneApiRetryError("JupiterOne API rate limit exceeded.")

elif response.status_code in [504]:
raise JupiterOneApiRetryError("Gateway Timeout.")

elif response.status_code in [500]:
raise JupiterOneApiError("JupiterOne API internal server error.")

else:
content = response._content
if isinstance(content, (bytes, bytearray)):
content = content.decode("utf-8")
if "application/json" in response.headers.get("Content-Type", "text/plain"):
data = json.loads(content)
content = data.get("error", data.get("errors", content))
raise JupiterOneApiError("{}:{}".format(response.status_code, content))

def query_v1(self, query: str, **kwargs) -> Dict:
"""Performs a V1 graph query
args:
Expand Down Expand Up @@ -349,3 +408,182 @@

response = self._execute_query(DELETE_RELATIONSHIP, variables=variables)
return response["data"]["deleteRelationship"]

def create_integration_instance(self, instance_name: str = None, instance_description: str = None):
"""Creates a new Custom Integration Instance.

args:
instance_name (str): The "Account name" for integration instance
instance_description (str): The "Description" for integration instance
"""
variables = {
"instance": {
"name": instance_name,
"description": instance_description,
"integrationDefinitionId": "8013680b-311a-4c2e-b53b-c8735fd97a5c",
"pollingInterval": "DISABLED",
"config": {
"@tag": {
"Production": False,
"AccountName": True
}
},
"pollingIntervalCronExpression": {},
"ingestionSourcesOverrides": []
}
}

response = self._execute_query(CREATE_INSTANCE, variables=variables)
return response['data']['createIntegrationInstance']

def fetch_all_entity_properties(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('parameter.', 'tag.')) == False:

return_list.append(i)

return return_list

def fetch_all_entity_tags(self):
"""Fetch list of aggregated property keys from all entities in the graph.

"""

response = self._execute_query(query=ALL_PROPERTIES)

return_list = []

for i in response['data']['getAllAssetProperties']:

if i.startswith(('tag.')) == True:

return_list.append(i)

return return_list

def start_sync_job(self, instance_id: str = None):
"""Start a synchronization job.

args:
instance_id (str): The "integrationInstanceId" request param for synchronization job
"""
endpoint = "/persister/synchronization/jobs"

data = {
"source": "integration-managed",
"integrationInstanceId": instance_id
}

response = self._execute_syncapi_request(endpoint=endpoint, payload=data)

return response

def upload_entities_batch_json(self, instance_job_id: str = None, entities_list: list = None):
"""Upload batch of entities.

args:
instance_job_id (str): The "Job ID" for the Custom Integration job
entities_list (list): List of Dictionaries containing entities data to upload
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/entities"

data = {
"entities": entities_list
}

response = self._execute_syncapi_request(endpoint=endpoint, payload=data)

return response

def finalize_sync_job(self, instance_job_id: str = None):
"""Start a synchronization job.

args:
instance_job_id (str): The "Job ID" for the Custom Integration job
"""
endpoint = f"/persister/synchronization/jobs/{instance_job_id}/finalize"

data = {}

response = self._execute_syncapi_request(endpoint=endpoint, payload=data)

return response

def create_smartclass(self, smartclass_name: str = None, smartclass_description: str = None):
"""Creates a new Smart Class within Assets.

args:
smartclass_name (str): The "Smart class name" for Smart Class to be created.
smartclass_description (str): The "Description" for Smart Class to be created.
"""

variables = {
"input": {
"tagName": smartclass_name,
"description": smartclass_description
}
}

response = self._execute_query(CREATE_SMARTCLASS, variables=variables)

return response['data']['createSmartClass']

def create_smartclass_query(self, smartclass_id: str = None, query: str = None, query_description: str = None):
"""Creates a new J1QL Query within a defined Smart Class.

args:
smartclass_id (str): The unique ID of the Smart Class the query is created within.
query (str): The J1QL for the query being created.
query_description (str): The description of the query being created.
"""

variables = {
"input": {
"smartClassId": smartclass_id,
"query": query,
"description": query_description
}
}

response = self._execute_query(CREATE_SMARTCLASS_QUERY, variables=variables)

return response['data']['createSmartClassQuery']

def evaluate_smartclass(self, smartclass_id: str = None):
"""Execute an on-demand Evaluation of a defined Smartclass.

args:
smartclass_id (str): The unique ID of the Smart Class to trigger the evaluation for.
"""

variables = {
"smartClassId": smartclass_id
}

response = self._execute_query(EVALUATE_SMARTCLASS, variables=variables)

return response['data']['evaluateSmartClassRule']

def get_smartclass_details(self, smartclass_id: str = None):
"""Fetch config details from defined Smart Class.

args:
smartclass_id (str): The unique ID of the Smart Class to fetch details from.
"""

variables = {
"id": smartclass_id
}

response = self._execute_query(GET_SMARTCLASS_DETAILS, variables=variables)

return response['data']['smartClass']
89 changes: 88 additions & 1 deletion jupiterone/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,91 @@
}
}
}
"""
"""

CREATE_INSTANCE = """
mutation CreateInstance($instance: CreateIntegrationInstanceInput!) {
createIntegrationInstance(instance: $instance) {
id
name
accountId
pollingInterval
integrationDefinitionId
description
config
}
}
"""

ALL_PROPERTIES = """
query getAllAssetProperties {
getAllAssetProperties
}
"""

CREATE_SMARTCLASS = """
mutation CreateSmartClass($input: CreateSmartClassInput!) {
createSmartClass(input: $input) {
id
accountId
tagName
description
ruleId
__typename
}
}
"""

CREATE_SMARTCLASS_QUERY = """
mutation CreateSmartClassQuery($input: CreateSmartClassQueryInput!) {
createSmartClassQuery(input: $input) {
id
smartClassId
description
query
__typename
}
}
"""

EVALUATE_SMARTCLASS = """
mutation EvaluateSmartClassRule($smartClassId: ID!) {
evaluateSmartClassRule(smartClassId: $smartClassId) {
ruleId
__typename
}
}
"""

GET_SMARTCLASS_DETAILS = """
query GetSmartClass($id: ID!) {
smartClass(id: $id) {
id
accountId
tagName
description
ruleId
queries {
id
smartClassId
description
query
__typename
}
tags {
id
smartClassId
name
type
value
__typename
}
rule {
lastEvaluationEndOn
evaluationStep
__typename
}
__typename
}
}
"""
Loading