From 491b5490fc84cc21809b529a480cbe99ee345187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quirin=20Hardy=20Zie=C3=9Fler?= Date: Thu, 19 Oct 2023 00:49:42 +0200 Subject: [PATCH] Flake8 (#15) * flake8 * flake8 * flake8 --- .flake8 | 2 ++ apicomponents/bom.py | 26 ++++++++--------- apicomponents/calculator.py | 8 +++--- apicomponents/configproperty.py | 39 +++++++++++++------------- apicomponents/cwe.py | 20 ++++++------- apicomponents/finding.py | 8 +++--- apicomponents/ldap.py | 17 +++++------ apicomponents/license.py | 22 +++++++-------- apicomponents/licensegroup.py | 33 +++++++++++----------- apicomponents/metrics.py | 42 ++++++++++++++-------------- apicomponents/permission.py | 45 +++++++++++++++--------------- apicomponents/policy.py | 38 +++++++++++++------------ apicomponents/project.py | 36 ++++++++++++------------ apicomponents/projectProperty.py | 6 ++-- apicomponents/repository.py | 37 ++++++++++++------------ apicomponents/search.py | 20 ++++++------- apicomponents/service.py | 40 +++++++++++++------------- apicomponents/team.py | 19 +++++++------ apicomponents/user.py | 9 +++--- apicomponents/violation.py | 12 ++++---- apicomponents/violationAnalysis.py | 17 +++++------ 21 files changed, 254 insertions(+), 242 deletions(-) diff --git a/.flake8 b/.flake8 index 0eee6b0..8bb0316 100644 --- a/.flake8 +++ b/.flake8 @@ -8,6 +8,8 @@ ignore = F821 # Suppress - line too long (> 79 characters) E501 + # Suppress - Function is too complex + C901 exclude = diff --git a/apicomponents/bom.py b/apicomponents/bom.py index 08c583c..fd02b98 100644 --- a/apicomponents/bom.py +++ b/apicomponents/bom.py @@ -17,7 +17,7 @@ def get_bom_token(self, uuid): return (f"Unauthorized, {response.status_code}") else: return response.status_code - + def get_bom_project(self, uuid, format="json"): """Returns dependency metadata for a project in CycloneDX format @@ -27,7 +27,7 @@ def get_bom_project(self, uuid, format="json"): Returns: xml or json: returns dependency metadata for a project in CycloneDX format in xml or json """ - response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/project/{uuid}",params={"format":format}) + response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/project/{uuid}", params={"format": format}) if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -38,7 +38,7 @@ def get_bom_project(self, uuid, format="json"): return (f"Project not found, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def get_bom_component(self, uuid, format="json"): """Returns dependency metadata for a component in CycloneDX format @@ -48,7 +48,7 @@ def get_bom_component(self, uuid, format="json"): Returns: xml or json: returns dependency metadata for a component in CycloneDX format in xml or json """ - response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/component/{uuid}",params={"format":format}) + response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/component/{uuid}", params={"format": format}) if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -59,9 +59,9 @@ def get_bom_component(self, uuid, format="json"): return (f"Component not found, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): - #TODO: refactor for formdata + # TODO: refactor for formdata """Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. Args: @@ -73,11 +73,11 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): Returns: response status code """ - data=dict() + data = dict() data["project"] = project data["projectName"] = projectName data["projectVersion"] = projectVersion - data["body"] =body + data["body"] = body data["autoCreate"] = autoCreate response = self.session.post(self.apicall + "/v1/bom", files=body) if response.status_code == 200: @@ -90,7 +90,7 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): return (f"Project not found, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def put_bom(self, project, body): """Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. @@ -102,9 +102,9 @@ def put_bom(self, project, body): Returns: response status code """ - data=dict() - data["project"]=project - data["bom"]=base64.b64encode(json.dumps(body).encode("utf-8")).decode("utf-8") + data = dict() + data["project"] = project + data["bom"] = base64.b64encode(json.dumps(body).encode("utf-8")).decode("utf-8") print(json.dumps(data)) response = self.session.put( self.apicall + "/v1/bom", data=json.dumps(data)) @@ -117,4 +117,4 @@ def put_bom(self, project, body): elif response.status_code == 404: return (f"Project not found, {response.status_code}") else: - return ((response.content).decode("utf-8"), response.status_code) \ No newline at end of file + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/calculator.py b/apicomponents/calculator.py index 53c8247..3318f7c 100644 --- a/apicomponents/calculator.py +++ b/apicomponents/calculator.py @@ -1,16 +1,16 @@ class Calculator(object): - - def get_calculator(self,cvss): + + def get_calculator(self, cvss): """ Returns the CVSS base score, impact sub-score and exploitability sub-score Args: cvss (string): A valid CVSSv2 or CVSSv3 vector. Example "CVSS:3.0/AV:L/AC:L/PR:L/UI:N/S:U/C:L/I:N/A:H" """ - response = self.session.get(self.apicall + f"/v1/calculator/cvss",params={"vector":cvss}) + response = self.session.get(self.apicall + "/v1/calculator/cvss", params={"vector": cvss}) if response.status_code == 200: return response.json() elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") else: - return ((response.content).decode("UTF-8"),response.status_code) \ No newline at end of file + return ((response.content).decode("UTF-8"), response.status_code) diff --git a/apicomponents/configproperty.py b/apicomponents/configproperty.py index 5b09c9f..b4debbc 100644 --- a/apicomponents/configproperty.py +++ b/apicomponents/configproperty.py @@ -1,7 +1,8 @@ import json + class ConfigProperty(object): - + def get_configProperty(self, pageSize=100): """ Returns a list of all ConfigProperties for the specified groupName @@ -9,14 +10,14 @@ def get_configProperty(self, pageSize=100): Returns: list: list of all configProperty in json """ - config_list=list() + config_list = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/configProperty",params={"pageSize":pageSize,"pageNumber":pageNumber}) - for config in range(0,len(response.json())): - config_list.append(response.json()[config]-1) - while len(response.json())==pageSize: + response = self.session.get(self.apicall + "/v1/configProperty", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for config in range(0, len(response.json())): + config_list.append(response.json()[config] - 1) + while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/configProperty", params={ + response = self.session.get(self.apicall + "/v1/configProperty", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for config in range(0, len(response.json())): config_list.append(response.json()[config] - 1) @@ -26,8 +27,8 @@ def get_configProperty(self, pageSize=100): return (f"Unauthorized, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - - def post_configProperty(self,body): + + def post_configProperty(self, body): """Update a config property Args: @@ -42,7 +43,7 @@ def post_configProperty(self,body): Returns: JSON: Json object which was sent successful """ - response = self.session.post(self.apicall + f"/v1/configProperty",data=body) + response = self.session.post(self.apicall + "/v1/configProperty", data=body) if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -51,8 +52,8 @@ def post_configProperty(self,body): return (f"The config property could not be found, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - - def post_configPropertyAggregate(self, groupName = None, propertyName = None, propertyValue=None, propertyType=None, description = None): + + def post_configPropertyAggregate(self, groupName=None, propertyName=None, propertyValue=None, propertyType=None, description=None): """Update a config property Args: @@ -69,18 +70,18 @@ def post_configPropertyAggregate(self, groupName = None, propertyName = None, pr """ data = { } - if groupName != None: + if groupName is not None: data['groupName'] = groupName - if propertyName != None: + if propertyName is not None: data['propertyName'] = propertyName - if propertyValue != None: + if propertyValue is not None: data['propertyValue'] = propertyValue - if propertyType != None: + if propertyType is not None: data['propertyType'] = propertyType - if description != None: + if description is not None: data['description'] = description response = self.session.post( - self.apicall + f"/v1/configProperty/aggregate", data=json.dumps([data])) + self.apicall + "/v1/configProperty/aggregate", data=json.dumps([data])) if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -88,4 +89,4 @@ def post_configPropertyAggregate(self, groupName = None, propertyName = None, pr elif response.status_code == 404: return (f"One or more config properties could not be found, {response.status_code}") else: - return ((response.content).decode("utf-8"), response.status_code) \ No newline at end of file + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/cwe.py b/apicomponents/cwe.py index 38c689c..6d44507 100644 --- a/apicomponents/cwe.py +++ b/apicomponents/cwe.py @@ -7,16 +7,16 @@ def get_cwe(self, pageSize=100): pageSize (int, optional): size of the page. Defaults to 100. Returns: - JSON: json object + JSON: json object """ - cwe_list= list() + cwe_list = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/cwe",params={"pageSize":pageSize, "pageNumber": pageNumber}) - for cwe in range(0,len(response.json())): - cwe_list.append(response.json()[cwe-1]) - while len(response.json())== pageSize: + response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for cwe in range(0, len(response.json())): + cwe_list.append(response.json()[cwe - 1]) + while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber}) + response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber}) for cwe in range(0, len(response.json())): cwe_list.append(response.json()[cwe - 1]) if response.status_code == 200: @@ -25,7 +25,7 @@ def get_cwe(self, pageSize=100): return (f"Unauthorized, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def get_cweById(self, cweId): """ Returns a specific CWE @@ -42,8 +42,8 @@ def get_cweById(self, cweId): if response.status_code == 200: return response.json() elif response.status_code == 401: - return (f"Unauthorized ", response.status_code) + return ("Unauthorized ", response.status_code) elif response.status_code == 404: return (f"The CWE could not be found, {response.status_code}") else: - return ((response.content).decode("utf-8"), response.status_code) \ No newline at end of file + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/finding.py b/apicomponents/finding.py index aa2adc1..c52279c 100644 --- a/apicomponents/finding.py +++ b/apicomponents/finding.py @@ -1,7 +1,7 @@ class Finding(object): def get_project_finding(self, uuid, suppressed=False, pageSize=100): - """ + """ Returns a list of all findings for a specific project. uuid:The UUID of the project. suppressed: optionally includes suppressed vulnerabilities(boolean) @@ -11,12 +11,12 @@ def get_project_finding(self, uuid, suppressed=False, pageSize=100): response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for finding in range(0, len(response.json())): - finding_list.append(response.json()[finding-1]) + finding_list.append(response.json()[finding - 1]) while len(response.json()) == pageSize: response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for finding in range(0, len(response.json())): - finding_list.append(response.json()[finding-1]) + finding_list.append(response.json()[finding - 1]) if response.status_code == 200: return finding_list elif response.status_code == 401: @@ -27,7 +27,7 @@ def get_project_finding(self, uuid, suppressed=False, pageSize=100): return (f"Project not found, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def export_findings(self, uuid): """ Returns the findings for the specified project as FPF diff --git a/apicomponents/ldap.py b/apicomponents/ldap.py index e4627ff..8bcc6db 100644 --- a/apicomponents/ldap.py +++ b/apicomponents/ldap.py @@ -1,7 +1,8 @@ import json + class LDAP(object): - + def list_ldapgroups(self, pageSize=100): """ This API performs a pass-thru query to the configured LDAP server. Search criteria results are cached using default Alpine CacheManager policy. @@ -11,17 +12,17 @@ def list_ldapgroups(self, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for ldap in range(0, len(response.json())): - ldaplist.append(response.json()[ldap-1]) + ldaplist.append(response.json()[ldap - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for ldap in range(0, len(response.json())): - ldaplist.append(response.json()[ldap-1]) + ldaplist.append(response.json()[ldap - 1]) if response.status_code == 200: return ldaplist else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def get_ldapteam(self, uuid): """Returns the DNs of all groups mapped to the specified team @@ -37,19 +38,19 @@ def get_ldapteam(self, uuid): return (f"The UUID of the team could not be found , {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def create_ldap(self, team, dn): """Adds a mapping Args: team (string): The UUID of the team - dn (string): DNs + dn (string): DNs """ - data={ + data = { "team": team, "dn": dn } - response = self.session.put(self.apicall + "/v1/ldap/mapping",data=json.dumps(data)) + response = self.session.put(self.apicall + "/v1/ldap/mapping", data=json.dumps(data)) if response.status_code == 200: return response.json() elif response.status_code == 401: diff --git a/apicomponents/license.py b/apicomponents/license.py index 13d5a5e..b02ef28 100644 --- a/apicomponents/license.py +++ b/apicomponents/license.py @@ -5,19 +5,19 @@ def get_list_license(self, pageSize=100): license_list = list() pageNumber = 1 response = self.session.get( - self.apicall + f"/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) + self.apicall + "/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) for lice in range(0, len(response.json())): - license_list.append(response.json()[lice-1]) + license_list.append(response.json()[lice - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + f"/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) + self.apicall + "/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) for lice in range(0, len(response.json())): - license_list.append(response.json()[lice-1]) + license_list.append(response.json()[lice - 1]) if response.status_code == 200: return license_list else: - return (f"Unauthorized ", response.status_code) + return ("Unauthorized ", response.status_code) def get_license(self, licenseId): """ @@ -28,9 +28,9 @@ def get_license(self, licenseId): if response.status_code == 200: return response.json() elif response.status_code == 401: - return (f"Unauthorized ", response.status_code) + return ("Unauthorized ", response.status_code) elif response.status_code == 404: - return(f"The license could not be found ", response.status_code) + return ("The license could not be found", response.status_code) else: return response.status_code @@ -39,15 +39,15 @@ def get_license_concise(self, pageSize=100): license_list = list() pageNumber = 1 response = self.session.get( - self.apicall + f"/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) + self.apicall + "/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) for lice in range(0, len(response.json())): - license_list.append(response.json()[lice-1]) + license_list.append(response.json()[lice - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + f"/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) + self.apicall + "/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) for lice in range(0, len(response.json())): - license_list.append(response.json()[lice-1]) + license_list.append(response.json()[lice - 1]) if response.status_code == 200: return license_list elif response.status_code == 401: diff --git a/apicomponents/licensegroup.py b/apicomponents/licensegroup.py index 5d1c703..669d5c2 100644 --- a/apicomponents/licensegroup.py +++ b/apicomponents/licensegroup.py @@ -1,20 +1,21 @@ import json + class LicenseGroup(object): - + def list_licensegroups(self, pageSize=100): grouplist = list() pageNumber = 1 response = self.session.get( self.apicall + "/v1/licenseGroup", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for group in range(0, len(response.json())): - grouplist.append(response.json()[group-1]) + grouplist.append(response.json()[group - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/licenseGroup", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for group in range(0, len(response.json())): - grouplist.append(response.json()[group-1]) + grouplist.append(response.json()[group - 1]) if response.status_code == 200: return grouplist elif response.status_code == 401: @@ -44,8 +45,8 @@ def delete_licensegroup(self, uuid): else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def remove_license_from_licensegroup(self, licensegroup,license): - + def remove_license_from_licensegroup(self, licensegroup, license): + response = self.session.delete(self.apicall + f"/v1/licenseGroup/{licensegroup}/license/{license}") if response.status_code == 200: return "Successful operation" @@ -67,16 +68,14 @@ def add_license_to_group(self, licensegroup, license): else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def create_licensegroup(self, name,licenses=None,riskWeight=0): - data={'name':name, - "riskWeight": riskWeight - } - if licenses : - if isinstance(license,list): - data['licenses']= licenses + def create_licensegroup(self, name, licenses=None, riskWeight=0): + data = {'name': name, "riskWeight": riskWeight} + if licenses: + if isinstance(license, list): + data['licenses'] = licenses else: return "Error! Licenses should be a list" - response = self.session.put(self.apicall + "/v1/licenseGroup",data=json.dumps(data)) + response = self.session.put(self.apicall + "/v1/licenseGroup", data=json.dumps(data)) if response.status_code == 201: return response.json() elif response.status_code == 401: @@ -84,10 +83,10 @@ def create_licensegroup(self, name,licenses=None,riskWeight=0): else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def update_licensegroup(self,uuid, name=None,licenses=None,riskWeight=None): - data={"uuid": uuid} + def update_licensegroup(self, uuid, name=None, licenses=None, riskWeight=None): + data = {"uuid": uuid} if name: - data["name"]=name + data["name"] = name if licenses: if isinstance(license, list): data['licenses'] = licenses @@ -95,7 +94,7 @@ def update_licensegroup(self,uuid, name=None,licenses=None,riskWeight=None): return "Error! Licenses should be a list" if riskWeight: data['risk_weight'] = riskWeight - response = self.session.post(self.apicall +"/v1/licenseGroup", data=json.dumps(data)) + response = self.session.post(self.apicall + "/v1/licenseGroup", data=json.dumps(data)) if response.status_code == 200: return response.json() elif response.status_code == 401: diff --git a/apicomponents/metrics.py b/apicomponents/metrics.py index f8ebcb4..ef91675 100644 --- a/apicomponents/metrics.py +++ b/apicomponents/metrics.py @@ -6,23 +6,23 @@ def get_all_metrics(self, pageSize=100): """ metrics_list = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/metrics/vulnerability", params={ + response = self.session.get(self.apicall + "/v1/metrics/vulnerability", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/metrics/vulnerability", params={ + response = self.session.get(self.apicall + "/v1/metrics/vulnerability", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list else: return (f"Unauthorized, {response.status_code}") def get_metrics_portolio_bydate(self, date): - """ + """ Returns historical metrics for the entire portfolio from a specific date. date: The start date to retrieve metric. Date format must be YYYYMMDD """ @@ -34,7 +34,7 @@ def get_metrics_portolio_bydate(self, date): return (f"Unauthorized, {response.status_code}") def get_metrics_project_bydate(self, uuid, date): - """ + """ Returns historical metrics for a specific project from a specific date date: The start date to retrieve metric. Date format must be YYYYMMDD. uuid: The UUID of the project to retrieve metrics for. @@ -57,14 +57,14 @@ def get_current_metrics_portfolio(self): Returns current metrics for entire portfolio """ response = self.session.get( - self.apicall + f"/v1/metrics/portfolio/current") + self.apicall + "/v1/metrics/portfolio/current") if response.status_code == 200: return response.json() else: return (f"Unauthorized , {response.status_code}") def get_metrics_dayNumber(self, days): - """ + """ Returns X days of historical metrics for the entire portfolio(int32) days: The number of days back to retrieve metrics for. """ @@ -76,18 +76,18 @@ def get_metrics_dayNumber(self, days): return (f"Unauthorized , {response.status_code}") def get_metrics_refresh_portfolio(self): - """ + """ Requests a refresh of the portfolio metrics """ response = self.session.get( - self.apicall + f"/v1/metrics/portfolio/refresh") + self.apicall + "/v1/metrics/portfolio/refresh") if response.status_code == 200: return (f"successful operation , {response.status_code}") else: return (f"Unauthorized , {response.status_code}") def get_metrics_specific_project(self, uuid): - """ + """ returns current metrics for a specific project. uuid: The UUID of the project to retrieve metrics for """ @@ -99,7 +99,7 @@ def get_metrics_specific_project(self, uuid): return (f"Unauthorized , {response.status_code}") def get_metrics_specific_project_days(self, uuid, days): - """ + """ Returns X days of historical metrics for a specific project uuid: The UUID of the project to retrieve metrics for. days: The number of days back to retrieve metrics for. @@ -118,7 +118,7 @@ def get_metrics_specific_project_days(self, uuid, days): return (response.status_code) def get_metrics_refresh_project(self, uuid): - """ + """ requests a refresh of a specific project metrics. uuid: The UUID of the project to retrieve metrics for. """ @@ -136,7 +136,7 @@ def get_metrics_refresh_project(self, uuid): return (response.status_code) def get_current_metrics_component(self, uuid): - """ + """ Returns current metrics for a specific component uuid: The UUID of the component to retrieve metrics for. """ @@ -166,13 +166,13 @@ def get_metrics_component_bydate(self, uuid, date, pageSize=100): response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{date}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{date}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list else: @@ -191,13 +191,13 @@ def get_metrics_component_bydays(self, uuid, days, pageSize=100): response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{days}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{days}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) for metric in range(0, len(response.json())): - metrics_list.append(response.json()[metric-1]) + metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list else: @@ -215,12 +215,12 @@ def get_metrics_component_refresh(self, uuid): response = self.session.get( self.apicall + f"/v1/metrics/component/{uuid}/refresh") if response.status_code == 200: - return (f"successful operation , {response.status_code}") + return (f"successful operation, {response.status_code}") elif response.status_code == 401: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized, {response.status_code}") elif response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") elif response.status_code == 404: return (f"Project not found, {response.status_code}") else: - return (response.status_code) \ No newline at end of file + return (response.status_code) diff --git a/apicomponents/permission.py b/apicomponents/permission.py index aff47a4..6bd22b1 100644 --- a/apicomponents/permission.py +++ b/apicomponents/permission.py @@ -8,12 +8,12 @@ def list_permissions(self, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + "/v1/permission", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for permission in range(0, len(response.json())): - permissionlist.append(response.json()[permission-1]) + permissionlist.append(response.json()[permission - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/permission", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for permission in range(0, len(response.json())): - permissionlist.append(response.json()[permission-1]) + permissionlist.append(response.json()[permission - 1]) if response.status_code == 200: return permissionlist else: @@ -37,7 +37,7 @@ def add_userpermission(self, permission, username): return ("The user already has the specified permission assigned, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def delete_userpermission(self, permission, username): """Removes the permission to the specified username. @@ -56,7 +56,7 @@ def delete_userpermission(self, permission, username): return ("The user already has the specified permission assigned, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def add_teampermission(self, permission, uuid): """Adds the permission to the specified username. @@ -76,21 +76,22 @@ def add_teampermission(self, permission, uuid): else: return ((response.content).decode("utf-8"), response.status_code) - def delete_userpermission(self, permission, uuid): - """Removes the permission to the specified team. - - Args: - permission (string): A valid permission. - uuid (string): A valid team uuid. - """ - response = self.session.delete(self.apicall + f"/v1/permission/{permission}/team/{uuid}") - if response.status_code == 200: - return response.json() - elif response.status_code == 401: - return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: - return (f" The user could not be found, {response.status_code}") - elif response.status_code == 304: - return ("The user already has the specified permission assigned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) +# duplicate +# def delete_userpermission(self, permission, uuid): +# """Removes the permission to the specified team. +# +# Args: +# permission (string): A valid permission. +# uuid (string): A valid team uuid. +# """ +# response = self.session.delete(self.apicall + f"/v1/permission/{permission}/team/{uuid}") +# if response.status_code == 200: +# return response.json() +# elif response.status_code == 401: +# return (f"Unauthorized, {response.status_code}") +# elif response.status_code == 404: +# return (f" The user could not be found, {response.status_code}") +# elif response.status_code == 304: +# return ("The user already has the specified permission assigned, {response.status_code}") +# else: +# return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/policy.py b/apicomponents/policy.py index 0bf8363..0eb75bd 100644 --- a/apicomponents/policy.py +++ b/apicomponents/policy.py @@ -1,4 +1,6 @@ import json + + class Policy(object): def get_policy(self, uuid): @@ -32,13 +34,13 @@ def list_policy(self, pageSize=100): response = self.session.get( self.apicall + "/v1/policy", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for policy in range(0, len(response.json())): - policylist.append(response.json()[policy-1]) + policylist.append(response.json()[policy - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/policy", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for policy in range(0, len(response.json())): - policylist.append(response.json()[policy-1]) + policylist.append(response.json()[policy - 1]) if response.status_code == 200: return policylist elif response.status_code == 401: @@ -85,23 +87,23 @@ def create_policy(self, name, operator="ANY", violationState="INFO", policyCondi else: return "Error! The policyCondition should be a list" if projects: - if isinstance(projects, list): - data["projects"] = projects - else: - return "Error! The projects should be a list" + if isinstance(projects, list): + data["projects"] = projects + else: + return "Error! The projects should be a list" if globals: data["globals"] = globals response = self.session.put( - self.apicall + f"/v1/policy", data=json.dumps(data)) + self.apicall + "/v1/policy", data=json.dumps(data)) if response.status_code == 201: return response.json() elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - - def update_policy(self,uuid ,name=None, operator=None, violationState=None, policyCondition=None, projects=None, globals=None): - # TODO: create better comments explaining the args + + def update_policy(self, uuid, name=None, operator=None, violationState=None, policyCondition=None, projects=None, globals=None): + # TODO: create better comments explaining the args """ Create a policy Args: @@ -113,7 +115,7 @@ def update_policy(self,uuid ,name=None, operator=None, violationState=None, poli globals ([type], optional): [description]. Defaults to None. """ - data ={"uuid":uuid} + data = {"uuid": uuid} if name: data['name'] = name if violationState: @@ -125,14 +127,14 @@ def update_policy(self,uuid ,name=None, operator=None, violationState=None, poli data["policyCondition"] = policyCondition else: return "Error! The policyCondition should be a list" - if projects : + if projects: if isinstance(projects, list): - data["projects"] = projects + data["projects"] = projects else: return "Error! The projects should be a list" if globals: data["globals"] = globals - response = self.session.post(self.apicall + f"/v1/policy", data=json.dumps(data)) + response = self.session.post(self.apicall + "/v1/policy", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") elif response.status_code == 401: @@ -149,14 +151,14 @@ def add_policyToproject(self, policyUuid, projectUuid): """ response = self.session.post(self.apicall + f"/v1/policy/{policyUuid}/projects/{projectUuid}") if response.status_code == 200: - return (f"Successful operation") + return ("Successful operation") elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") elif response.status_code == 304: return (f"The policy already has the specified project assigned, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def delete_policyFromproject(self, policyUuid, projectUuid): """Removes a project from a policy. @@ -164,9 +166,9 @@ def delete_policyFromproject(self, policyUuid, projectUuid): policyUuid (string): The UUID of the policy projectUuid (string): The UUID of the project """ - response=self.session.delete(self.apicall + f"/v1/policy/{policyUuid}/projects/{projectUuid}") + response = self.session.delete(self.apicall + f"/v1/policy/{policyUuid}/projects/{projectUuid}") if response.status_code == 200: - return (f"Successful operation") + return ("Successful operation") elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") elif response.status_code == 304: diff --git a/apicomponents/project.py b/apicomponents/project.py index 4715a62..7468ff8 100644 --- a/apicomponents/project.py +++ b/apicomponents/project.py @@ -5,43 +5,43 @@ def list_projects(self, pageSize=100): response = self.session.get( self.apicall + "/v1/project", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for project in range(0, len(response.json())): - projectlist.append(response.json()[project-1]) + projectlist.append(response.json()[project - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/project", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for project in range(0, len(response.json())): - projectlist.append(response.json()[project-1]) + projectlist.append(response.json()[project - 1]) if response.status_code == 200: return projectlist else: - return (f"Unable to list projects", response.status_code) + return ("Unable to list projects", response.status_code) def get_project(self, uuid): response = self.session.get(self.apicall + f"/v1/project/{uuid}") if response.status_code == 200: return response.json() else: - return (f"Unable to find project", response.status_code) + return ("Unable to find project", response.status_code) def get_project_lookup(self, name, version=None): - if version == None: + if version is None: lookup = "name=" + name else: - lookup = "name=" + name + "&version="+version + lookup = "name=" + name + "&version=" + version response = self.session.get( self.apicall + f"/v1/project/lookup?{lookup}") if response.status_code == 200: return response.json() else: - return (f"Unable to find project", response.status_code) + return ("Unable to find project", response.status_code) def delete_project_uuid(self, uuid): response = self.session.delete(self.apicall + f"/v1/project/{uuid}") if response.status_code == 204: - return (f"Successfully deleted the project", response.status_code) + return ("Successfully deleted the project", response.status_code) else: - return (f"Unable to delete the project", response.status_code) + return ("Unable to delete the project", response.status_code) def create_project(self, name, classifier, version, active=True): # TODO add more options @@ -51,14 +51,14 @@ def create_project(self, name, classifier, version, active=True): "version": version, "active": active } - response = self.session.put(self.apicall + f"/v1/project", json=data) + response = self.session.put(self.apicall + "/v1/project", json=data) if response.status_code == 201: - print(f"Successfully created the project", response.status_code) + print("Successfully created the project", response.status_code) return response.json() elif response.status_code == 409: - return (f"Project with specified name already exists", response.status_code) + return ("Project with specified name already exists", response.status_code) else: - return (f"Unable to create the project", response.status_code) + return ("Unable to create the project", response.status_code) def update_project(self, uuid, name=None, classifier=None): # TODO add more options @@ -69,12 +69,12 @@ def update_project(self, uuid, name=None, classifier=None): data['name'] = name if classifier: data['classifier'] = classifier - response = self.session.post(self.apicall + f"/v1/project", json=data) + response = self.session.post(self.apicall + "/v1/project", json=data) if response.status_code == 200: - return (f"Successfully updated the project", response.status_code) + return ("Successfully updated the project", response.status_code) elif response.status_code == 404: - return (f"Project with specified uuid could not be found", response.status_code) + return ("Project with specified uuid could not be found", response.status_code) elif response.status_code == 409: - return (f"Project with specified name already exists", response.status_code) + return ("Project with specified name already exists", response.status_code) else: - return (f"Unable to update the project", response.status_code) + return ("Unable to update the project", response.status_code) diff --git a/apicomponents/projectProperty.py b/apicomponents/projectProperty.py index da84f05..128ca2b 100644 --- a/apicomponents/projectProperty.py +++ b/apicomponents/projectProperty.py @@ -25,7 +25,7 @@ def update_projectproperty(self, uuid, name=None, classifier=None): else: return (f"Unable to update the project, {response.status_code}") - def create_projectproperty(self, uuid, propertyValue, groupName="integrations", propertyName="defectdojo.engagementId", propertyType="STRING", description="DefectDojo integration"): + def create_projectproperty(self, uuid, propertyValue, groupName="integrations", propertyName="defectdojo.engagementId", propertyType="STRING", description="DefectDojo integration"): """ name, classifier and more args to be added Args: @@ -55,10 +55,10 @@ def create_projectproperty(self, uuid, propertyValue, groupName="integrations", return (f"Project with specified name already exists, {response.status_code}") else: return (f"Unable to create the project, {response.status_code}") - + def delete_projectproperty_uuid(self, uuid): response = self.session.delete(self.apicall + f"/v1/project/{uuid}/property") if response.status_code == 204: return (f"Successfully deleted the project, {response.status_code}") else: - return (f"Unable to delete the project, {response.status_code}") \ No newline at end of file + return (f"Unable to delete the project, {response.status_code}") diff --git a/apicomponents/repository.py b/apicomponents/repository.py index 91f461e..a96a1a1 100644 --- a/apicomponents/repository.py +++ b/apicomponents/repository.py @@ -1,7 +1,8 @@ import json + class Repository(object): - + def list_repository(self, pageSize=100): """Returns a list of all repositories @@ -13,19 +14,19 @@ def list_repository(self, pageSize=100): """ respositorylist = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) + response = self.session.get(self.apicall + "/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for repository in range(0, len(response.json())): - respositorylist.append(response.json()[repository-1]) + respositorylist.append(response.json()[repository - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) + response = self.session.get(self.apicall + "/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for repository in range(0, len(response.json())): - respositorylist.append(response.json()[repository-1]) + respositorylist.append(response.json()[repository - 1]) if response.status_code == 200: return respositorylist else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def update_repository(self, uuid, identifier, type, url, resolutionOrder=0, enable=True, internal=True): """Update a specific repository @@ -58,12 +59,12 @@ def update_repository(self, uuid, identifier, type, url, resolutionOrder=0, enab "internal": internal, "identifier": identifier } - response = self.session.post(self.apicall + f"/v1/repository",data=json.dumps(data)) + response = self.session.post(self.apicall + "/v1/repository", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def create_repository(self, identifier, type, url, resolutionOrder=0, enable=True, internal=True): """ Create a new repository @@ -94,13 +95,13 @@ def create_repository(self, identifier, type, url, resolutionOrder=0, enable=Tru "internal": internal, "identifier": identifier } - response = self.session.put(self.apicall + f"/v1/repository", data=json.dumps(data)) + response = self.session.put(self.apicall + "/v1/repository", data=json.dumps(data)) if response.status_code == 201: return ("Successful operation") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - - def get_latest_repository(self,purl): + + def get_latest_repository(self, purl): """Attempts to resolve the latest version of the component available in the configured repositories Args: @@ -116,18 +117,18 @@ def get_latest_repository(self,purl): "lastCheck": "2021-12-02T16:50:56.704Z" } """ - response = self.session.get(self.apicall + f"/v1/repository/latest", params={'purl':purl}) - if response.status_code==200: + response = self.session.get(self.apicall + "/v1/repository/latest", params={'purl': purl}) + if response.status_code == 200: return response.json() else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def get_repositoryByType(self,type, pageSize=100): + def get_repositoryByType(self, type, pageSize=100): """ Returns repositories that support the specific type Args: - type (string): The type of repositories to retrieve. eg MAVEN, NPM, GEM, PYPI, NUGET, HEX, COMPOSER, CARGO, GO_MODULES, UNSUPPORTED + type (string): The type of repositories to retrieve. eg MAVEN, NPM, GEM, PYPI, NUGET, HEX, COMPOSER, CARGO, GO_MODULES, UNSUPPORTED pageSize (int, optional): [description]. Defaults to 100. Returns: @@ -137,17 +138,17 @@ def get_repositoryByType(self,type, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + f"/v1/repository/{type}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for repository in range(0, len(response.json())): - respositorylist.append(response.json()[repository-1]) + respositorylist.append(response.json()[repository - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/repository/{type}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for repository in range(0, len(response.json())): - respositorylist.append(response.json()[repository-1]) + respositorylist.append(response.json()[repository - 1]) if response.status_code == 200: return respositorylist else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def delete_repository(self, uuid): """Deletes a repository diff --git a/apicomponents/search.py b/apicomponents/search.py index e7e35db..9bedc70 100644 --- a/apicomponents/search.py +++ b/apicomponents/search.py @@ -1,10 +1,10 @@ class Search(object): - def general_search(self,query=None): + def general_search(self, query=None): if query: - response= self.session.get(self.apicall + "/v1/search", params={'query':query}) + response = self.session.get(self.apicall + "/v1/search", params={'query': query}) else: - response= self.session.get(self.apicall + "/v1/search") + response = self.session.get(self.apicall + "/v1/search") if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -12,18 +12,18 @@ def general_search(self,query=None): else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def project_search(self,query=None): + def project_search(self, query=None): if query: - response= self.session.get(self.apicall + "/v1/search/project", params={'query':query}) + response = self.session.get(self.apicall + "/v1/search/project", params={'query': query}) else: - response= self.session.get(self.apicall + "/v1/search/project") + response = self.session.get(self.apicall + "/v1/search/project") if response.status_code == 200: return response.json() elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def component_search(self, query=None): if query: response = self.session.get(self.apicall + "/v1/search/component", params={'query': query}) @@ -35,7 +35,7 @@ def component_search(self, query=None): return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def service_search(self, query=None): if query: response = self.session.get(self.apicall + "/v1/search/service", params={'query': query}) @@ -47,7 +47,7 @@ def service_search(self, query=None): return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def license_search(self, query=None): if query: response = self.session.get(self.apicall + "/v1/search/license", params={'query': query}) @@ -59,7 +59,7 @@ def license_search(self, query=None): return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def vulnerability_search(self, query=None): if query: response = self.session.get(self.apicall + "/v1/search/vulnerability", params={'query': query}) diff --git a/apicomponents/service.py b/apicomponents/service.py index 200c481..d076bbc 100644 --- a/apicomponents/service.py +++ b/apicomponents/service.py @@ -1,5 +1,6 @@ import json + class Service(object): def list_services(self, uuid, pageSize=100): @@ -15,17 +16,17 @@ def list_services(self, uuid, pageSize=100): servicelist = list() pageNumber = 1 response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - try: + if response.status_code == 200: for service in range(0, len(response.json())): - servicelist.append(response.json()[service-1]) + servicelist.append(response.json()[service - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for service in range(0, len(response.json())): - servicelist.append(response.json()[service-1]) + servicelist.append(response.json()[service - 1]) if response.status_code == 200: return servicelist - except : + else: if response.status_code == 404: return (f"The project could not be found, {response.status_code}") elif response.status_code == 403: @@ -34,8 +35,8 @@ def list_services(self, uuid, pageSize=100): return (f"Unauthorized, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - - def get_service(self,uuid): + + def get_service(self, uuid): """Returns a specific service. Args: @@ -55,16 +56,16 @@ def get_service(self,uuid): return (f"Access to the specified project is forbidden, {response.status_code}") else: return ((response.content).decode("utf-8"), response.status_code) - + def delete_service(self, uuid): """Deletes a service Args: - uuid (string): The UUID of the project. + uuid (string): The UUID of the project. """ response = self.session.delete(self.apicall + f"/v1/service/{uuid}") if response.status_code == 200: - return (f"Successful operation") + return ("Successful operation") elif response.status_code == 403: return (f"Access to the specified service is forbidden, {response.status_code}") elif response.status_code == 401: @@ -73,13 +74,13 @@ def delete_service(self, uuid): return (f"The UUID of the service could not be found, {response.status_code}") else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - + def create_service(self, uuid, providerName=None, providerURL=None, contactName=None, contactEmail=None, contactPhone=None): """ Model: check on the official page """ - data={} - contact={} + data = {} + contact = {} if providerName: data['provider'] = {"name": providerName} if providerURL: @@ -89,11 +90,11 @@ def create_service(self, uuid, providerName=None, providerURL=None, contactName= if contactEmail: contact['email'] = contactEmail if contactPhone: - contact['phone']= contactPhone + contact['phone'] = contactPhone if not bool(contact): - data['contact'] =[contact] - #TODO: add more option - response = self.session.put(self.apicall +f"/v1/service/project/{uuid}",data=json.dumps(data)) + data['contact'] = [contact] + # TODO: add more option + response = self.session.put(self.apicall + f"/v1/service/project/{uuid}", data=json.dumps(data)) if response.status_code == 201: return ("Successful operation") elif response.status_code == 401: @@ -103,10 +104,11 @@ def create_service(self, uuid, providerName=None, providerURL=None, contactName= else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") -def update_service(self,**args): - data={} + +def update_service(self, **args): + data = {} # TODO: improve the input parameters - #? this wont return anything for now. + # ? this wont return anything for now. response = self.session.post(self.apicall + "/v1/service", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") diff --git a/apicomponents/team.py b/apicomponents/team.py index ada7be6..bfe09ee 100644 --- a/apicomponents/team.py +++ b/apicomponents/team.py @@ -1,5 +1,6 @@ import json + class Team(object): def get_teamByUUID(self, uuid): @@ -50,7 +51,7 @@ def delete_team(self, name, uuid, key=[]): response = self.session.delete( self.apicall + "/v1/team", data=json.dumps(data)) if response.status_code == 204: - return (f"Successfully operation") + return ("Successfully operation") elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") elif response.status_code == 404: @@ -125,17 +126,17 @@ def list_teams(self, pageSize=100): response = self.session.get( self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for team in range(0, len(response.json())): - teamlist.append(response.json()[team-1]) + teamlist.append(response.json()[team - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for team in range(0, len(response.json())): - teamlist.append(response.json()[team-1]) + teamlist.append(response.json()[team - 1]) if response.status_code == 200: return teamlist else: - return (f"Unable to list teams", response.status_code) + return ("Unable to list teams", response.status_code) def get_uuid_from_team_name(self, teamname, pageSize=100): """Returns a list of all teams @@ -146,20 +147,20 @@ def get_uuid_from_team_name(self, teamname, pageSize=100): response = self.session.get( self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for team in range(0, len(response.json())): - teamlist.append(response.json()[team-1]) + teamlist.append(response.json()[team - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) for team in range(0, len(response.json())): - teamlist.append(response.json()[team-1]) + teamlist.append(response.json()[team - 1]) for team in teamlist: - if team['name']==teamname: + if team['name'] == teamname: teamuid = team['uuid'] if response.status_code == 200: return teamuid else: - return (f"Unable to list team", response.status_code) + return ("Unable to list team", response.status_code) def delete_apikey(self, apikey): """Delete specified API key @@ -189,4 +190,4 @@ def update_apikey(self, apikey): elif response.status_code == 404: return (f"The API key could not be found, {response.status_code}") else: - return response.status_code \ No newline at end of file + return response.status_code diff --git a/apicomponents/user.py b/apicomponents/user.py index 41876c2..287efea 100644 --- a/apicomponents/user.py +++ b/apicomponents/user.py @@ -1,11 +1,12 @@ import json + class User(object): def get_user_oidc(self): """ """ - response = self.session.get(self.apicall + f"/v1/user/oidc") #Retuns a list of all OIDC users + response = self.session.get(self.apicall + "/v1/user/oidc") # Retuns a list of all OIDC users if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -16,7 +17,7 @@ def get_user_oidc(self): def join_team(self, username, uuid): """ Adds a user to a team. - + Args: uuid (string): The uuid of the team. username (string): A valid username. @@ -32,6 +33,6 @@ def join_team(self, username, uuid): elif response.status_code == 401: return (f"Unauthorized, {response.status_code}") elif response.status_code == 404: - return (f"User or team could not be found, {response.status_code}") + return (f"User or team could not be found, {response.status_code}") -#TODO extend \ No newline at end of file +# TODO extend diff --git a/apicomponents/violation.py b/apicomponents/violation.py index 74f8a7d..8b21344 100644 --- a/apicomponents/violation.py +++ b/apicomponents/violation.py @@ -12,16 +12,16 @@ def list_violations(self, suppressed=False, pageSize=100): """ violationlist = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/violation", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber,'suppressed': suppressed}) + response = self.session.get(self.apicall + "/v1/violation", params={ + 'pageSize': pageSize, 'pageNumber': pageNumber, 'suppressed': suppressed}) for violation in range(0, len(response.json())): - violationlist.append(response.json()[violation-1]) + violationlist.append(response.json()[violation - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/violation", params={ + response = self.session.get(self.apicall + "/v1/violation", params={ 'pageSize': pageSize, 'pageNumber': pageNumber, 'suppressed': suppressed}) for violation in range(0, len(response.json())): - violationlist.append(response.json()[violation-1]) + violationlist.append(response.json()[violation - 1]) if response.status_code == 200: return violationlist else: @@ -42,7 +42,7 @@ def get_project_violation(self, uuid, suppressed=False): return response.json() else: return ((response.content).decode("utf-8"), response.status_code) - + def get_component_violation(self, uuid, suppressed=False): """Returns a list of all policy violations for a specific component diff --git a/apicomponents/violationAnalysis.py b/apicomponents/violationAnalysis.py index 75bedce..ae9b4c2 100644 --- a/apicomponents/violationAnalysis.py +++ b/apicomponents/violationAnalysis.py @@ -1,8 +1,9 @@ import json + class ViolationAnalysis(object): - def record_violation(self,component,policyViolation,suppressed=True): + def record_violation(self, component, policyViolation, suppressed=True): """Record a violation analysis decision Args: @@ -13,12 +14,12 @@ def record_violation(self,component,policyViolation,suppressed=True): Returns: dict: decision """ - data={ - "component":component, - "policyViolation":policyViolation, - "suppressed":suppressed + data = { + "component": component, + "policyViolation": policyViolation, + "suppressed": suppressed } - response = self.session.put(self.apicall + f"/v1/violation/analysis",data=json.dumps(data)) + response = self.session.put(self.apicall + "/v1/violation/analysis", data=json.dumps(data)) if response.status_code == 200: return response.json() elif response.status_code == 401: @@ -26,7 +27,7 @@ def record_violation(self,component,policyViolation,suppressed=True): else: return (f"{(response.content).decode('utf-8')}, {response.status_code}") - def get_violation_analysis(self,component,policyViolation): + def get_violation_analysis(self, component, policyViolation): """Retrieve a violation analysis trail Args: @@ -37,7 +38,7 @@ def get_violation_analysis(self,component,policyViolation): Returns: dict: analysis """ - response = self.session.get(self.apicall + f"/v1/violation/analysis",params={"component": component,"policyViolation":policyViolation}) + response = self.session.get(self.apicall + "/v1/violation/analysis", params={"component": component, "policyViolation": policyViolation}) if response.status_code == 200: return response.json() elif response.status_code == 401: