Skip to content

Commit

Permalink
Flake8 (#15)
Browse files Browse the repository at this point in the history
* flake8

* flake8

* flake8
  • Loading branch information
quirinziessler authored Oct 18, 2023
1 parent d6cf3e4 commit 491b549
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 242 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ ignore =
F821
# Suppress - line too long (> 79 characters)
E501
# Suppress - Function is too complex
C901


exclude =
Expand Down
26 changes: 13 additions & 13 deletions apicomponents/bom.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_bom_token(self, uuid):
return (f"Unauthorized, {response.status_code}")
else:
return response.status_code

def get_bom_project(self, uuid, format="json"):
"""Returns dependency metadata for a project in CycloneDX format
Expand All @@ -27,7 +27,7 @@ def get_bom_project(self, uuid, format="json"):
Returns:
xml or json: returns dependency metadata for a project in CycloneDX format in xml or json """
response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/project/{uuid}",params={"format":format})
response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/project/{uuid}", params={"format": format})
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
Expand All @@ -38,7 +38,7 @@ def get_bom_project(self, uuid, format="json"):
return (f"Project not found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)

def get_bom_component(self, uuid, format="json"):
"""Returns dependency metadata for a component in CycloneDX format
Expand All @@ -48,7 +48,7 @@ def get_bom_component(self, uuid, format="json"):
Returns:
xml or json: returns dependency metadata for a component in CycloneDX format in xml or json """
response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/component/{uuid}",params={"format":format})
response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/component/{uuid}", params={"format": format})
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
Expand All @@ -59,9 +59,9 @@ def get_bom_component(self, uuid, format="json"):
return (f"Component not found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)

def post_bom(self, project, projectName, projectVersion, body, autoCreate=True):
#TODO: refactor for formdata
# TODO: refactor for formdata
"""Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission.
Args:
Expand All @@ -73,11 +73,11 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True):
Returns:
response status code """
data=dict()
data = dict()
data["project"] = project
data["projectName"] = projectName
data["projectVersion"] = projectVersion
data["body"] =body
data["body"] = body
data["autoCreate"] = autoCreate
response = self.session.post(self.apicall + "/v1/bom", files=body)
if response.status_code == 200:
Expand All @@ -90,7 +90,7 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True):
return (f"Project not found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)

def put_bom(self, project, body):
"""Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission.
Expand All @@ -102,9 +102,9 @@ def put_bom(self, project, body):
Returns:
response status code
"""
data=dict()
data["project"]=project
data["bom"]=base64.b64encode(json.dumps(body).encode("utf-8")).decode("utf-8")
data = dict()
data["project"] = project
data["bom"] = base64.b64encode(json.dumps(body).encode("utf-8")).decode("utf-8")
print(json.dumps(data))
response = self.session.put(
self.apicall + "/v1/bom", data=json.dumps(data))
Expand All @@ -117,4 +117,4 @@ def put_bom(self, project, body):
elif response.status_code == 404:
return (f"Project not found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)
return ((response.content).decode("utf-8"), response.status_code)
8 changes: 4 additions & 4 deletions apicomponents/calculator.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
class Calculator(object):
def get_calculator(self,cvss):

def get_calculator(self, cvss):
"""
Returns the CVSS base score, impact sub-score and exploitability sub-score
Args:
cvss (string): A valid CVSSv2 or CVSSv3 vector. Example "CVSS:3.0/AV:L/AC:L/PR:L/UI:N/S:U/C:L/I:N/A:H"
"""
response = self.session.get(self.apicall + f"/v1/calculator/cvss",params={"vector":cvss})
response = self.session.get(self.apicall + "/v1/calculator/cvss", params={"vector": cvss})
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
return (f"Unauthorized, {response.status_code}")
else:
return ((response.content).decode("UTF-8"),response.status_code)
return ((response.content).decode("UTF-8"), response.status_code)
39 changes: 20 additions & 19 deletions apicomponents/configproperty.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import json


class ConfigProperty(object):

def get_configProperty(self, pageSize=100):
"""
Returns a list of all ConfigProperties for the specified groupName
Returns:
list: list of all configProperty in json
"""
config_list=list()
config_list = list()
pageNumber = 1
response = self.session.get(self.apicall + f"/v1/configProperty",params={"pageSize":pageSize,"pageNumber":pageNumber})
for config in range(0,len(response.json())):
config_list.append(response.json()[config]-1)
while len(response.json())==pageSize:
response = self.session.get(self.apicall + "/v1/configProperty", params={"pageSize": pageSize, "pageNumber": pageNumber})
for config in range(0, len(response.json())):
config_list.append(response.json()[config] - 1)
while len(response.json()) == pageSize:
pageNumber += 1
response = self.session.get(self.apicall + f"/v1/configProperty", params={
response = self.session.get(self.apicall + "/v1/configProperty", params={
"pageSize": pageSize, "pageNumber": pageNumber})
for config in range(0, len(response.json())):
config_list.append(response.json()[config] - 1)
Expand All @@ -26,8 +27,8 @@ def get_configProperty(self, pageSize=100):
return (f"Unauthorized, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)
def post_configProperty(self,body):

def post_configProperty(self, body):
"""Update a config property
Args:
Expand All @@ -42,7 +43,7 @@ def post_configProperty(self,body):
Returns:
JSON: Json object which was sent successful
"""
response = self.session.post(self.apicall + f"/v1/configProperty",data=body)
response = self.session.post(self.apicall + "/v1/configProperty", data=body)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
Expand All @@ -51,8 +52,8 @@ def post_configProperty(self,body):
return (f"The config property could not be found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)
def post_configPropertyAggregate(self, groupName = None, propertyName = None, propertyValue=None, propertyType=None, description = None):

def post_configPropertyAggregate(self, groupName=None, propertyName=None, propertyValue=None, propertyType=None, description=None):
"""Update a config property
Args:
Expand All @@ -69,23 +70,23 @@ def post_configPropertyAggregate(self, groupName = None, propertyName = None, pr
"""
data = {
}
if groupName != None:
if groupName is not None:
data['groupName'] = groupName
if propertyName != None:
if propertyName is not None:
data['propertyName'] = propertyName
if propertyValue != None:
if propertyValue is not None:
data['propertyValue'] = propertyValue
if propertyType != None:
if propertyType is not None:
data['propertyType'] = propertyType
if description != None:
if description is not None:
data['description'] = description
response = self.session.post(
self.apicall + f"/v1/configProperty/aggregate", data=json.dumps([data]))
self.apicall + "/v1/configProperty/aggregate", data=json.dumps([data]))
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
return (f"Unauthorized, {response.status_code}")
elif response.status_code == 404:
return (f"One or more config properties could not be found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)
return ((response.content).decode("utf-8"), response.status_code)
20 changes: 10 additions & 10 deletions apicomponents/cwe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ def get_cwe(self, pageSize=100):
pageSize (int, optional): size of the page. Defaults to 100.
Returns:
JSON: json object
JSON: json object
"""
cwe_list= list()
cwe_list = list()
pageNumber = 1
response = self.session.get(self.apicall + f"/v1/cwe",params={"pageSize":pageSize, "pageNumber": pageNumber})
for cwe in range(0,len(response.json())):
cwe_list.append(response.json()[cwe-1])
while len(response.json())== pageSize:
response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber})
for cwe in range(0, len(response.json())):
cwe_list.append(response.json()[cwe - 1])
while len(response.json()) == pageSize:
pageNumber += 1
response = self.session.get(self.apicall + f"/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber})
response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber})
for cwe in range(0, len(response.json())):
cwe_list.append(response.json()[cwe - 1])
if response.status_code == 200:
Expand All @@ -25,7 +25,7 @@ def get_cwe(self, pageSize=100):
return (f"Unauthorized, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)

def get_cweById(self, cweId):
""" Returns a specific CWE
Expand All @@ -42,8 +42,8 @@ def get_cweById(self, cweId):
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
return (f"Unauthorized ", response.status_code)
return ("Unauthorized ", response.status_code)
elif response.status_code == 404:
return (f"The CWE could not be found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)
return ((response.content).decode("utf-8"), response.status_code)
8 changes: 4 additions & 4 deletions apicomponents/finding.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class Finding(object):

def get_project_finding(self, uuid, suppressed=False, pageSize=100):
"""
"""
Returns a list of all findings for a specific project.
uuid:The UUID of the project.
suppressed: optionally includes suppressed vulnerabilities(boolean)
Expand All @@ -11,12 +11,12 @@ def get_project_finding(self, uuid, suppressed=False, pageSize=100):
response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={
"pageSize": pageSize, "pageNumber": pageNumber})
for finding in range(0, len(response.json())):
finding_list.append(response.json()[finding-1])
finding_list.append(response.json()[finding - 1])
while len(response.json()) == pageSize:
response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={
"pageSize": pageSize, "pageNumber": pageNumber})
for finding in range(0, len(response.json())):
finding_list.append(response.json()[finding-1])
finding_list.append(response.json()[finding - 1])
if response.status_code == 200:
return finding_list
elif response.status_code == 401:
Expand All @@ -27,7 +27,7 @@ def get_project_finding(self, uuid, suppressed=False, pageSize=100):
return (f"Project not found, {response.status_code}")
else:
return ((response.content).decode("utf-8"), response.status_code)

def export_findings(self, uuid):
"""
Returns the findings for the specified project as FPF
Expand Down
17 changes: 9 additions & 8 deletions apicomponents/ldap.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json


class LDAP(object):

def list_ldapgroups(self, pageSize=100):
"""
This API performs a pass-thru query to the configured LDAP server. Search criteria results are cached using default Alpine CacheManager policy.
Expand All @@ -11,17 +12,17 @@ def list_ldapgroups(self, pageSize=100):
pageNumber = 1
response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber})
for ldap in range(0, len(response.json())):
ldaplist.append(response.json()[ldap-1])
ldaplist.append(response.json()[ldap - 1])
while len(response.json()) == pageSize:
pageNumber += 1
response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber})
for ldap in range(0, len(response.json())):
ldaplist.append(response.json()[ldap-1])
ldaplist.append(response.json()[ldap - 1])
if response.status_code == 200:
return ldaplist
else:
return (f"{(response.content).decode('utf-8')}, {response.status_code}")

def get_ldapteam(self, uuid):
"""Returns the DNs of all groups mapped to the specified team
Expand All @@ -37,19 +38,19 @@ def get_ldapteam(self, uuid):
return (f"The UUID of the team could not be found , {response.status_code}")
else:
return (f"{(response.content).decode('utf-8')}, {response.status_code}")

def create_ldap(self, team, dn):
"""Adds a mapping
Args:
team (string): The UUID of the team
dn (string): DNs
dn (string): DNs
"""
data={
data = {
"team": team,
"dn": dn
}
response = self.session.put(self.apicall + "/v1/ldap/mapping",data=json.dumps(data))
response = self.session.put(self.apicall + "/v1/ldap/mapping", data=json.dumps(data))
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
Expand Down
Loading

0 comments on commit 491b549

Please sign in to comment.