Skip to content

Commit

Permalink
kcidb test result field info extracting script
Browse files Browse the repository at this point in the history
  • Loading branch information
octonawish-akcodes committed Jul 5, 2023
1 parent 97f0086 commit fdae07f
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"URL info caching script."

import csv
import json
import os
import sys
from urllib.parse import urlparse
import requests

OBJECT_SPECS = {
'checkouts': {
'patchset_files': [{'url': True}],
},
'builds': {
'input_files': [{'url': True}],
'output_files': [{'url': True}],
'config_url': True,
'log_url': True,
},
'tests': {
'output_files': [{'url': True}],
'log_url': True,
},
}


def extract_fields(spec, data):
"""
Extract values of fields from data according to a specification.
Args:
spec: The specification of fields to extract the values from.
data: The data to extract the field values from.
Returns:
An array of tuples, where each tuple contains:
* A tuple containing dictionary keys leading to the field
* The field value
"""
tuples = []

if isinstance(spec, dict):
for key, value in spec.items():
if key in data:
for ext_tuple in extract_fields(value, data[key]):
tuples.append(((key,) + ext_tuple[0], ext_tuple[1]))
elif isinstance(spec, list):
assert len(spec) == 1
for item in enumerate(data):
tuples += extract_fields(spec[0], item)
elif spec is True:
return [(tuple(), data)]

return tuples


def get_url_info(url):
"""Extract url info and returns fields value."""

status_code = 0
content_type = ''
size = ''
extension = os.path.splitext(urlparse(url).path)[1]
url_length = sys.getsizeof(url.encode('utf-8'))
# Some CI systems send links to index pages, instead of actual files,
# which is against the schema, and we don't really want to cache them.
# That's why we ignore URLs ending with "/"
if url.endswith('/'):
return (status_code, content_type, size, extension, url_length)
try:
response = requests.head(url, timeout=10)
except requests.exceptions.RequestException as error:
print(f"Error while trying to get URL info for {url}: {error!r}")
else:
status_code = response.status_code

if status_code == 200:
content_type = response.headers.get('Content-Type')
size = response.headers.get('Content-Length')

return (status_code, content_type, size, extension, url_length)


def main():
"""Extract URL info from JSON data and save it in a CSV file."""

# Prepare CSV file
csv_writer = csv.writer(sys.stdout)
csv_writer.writerow([
'Origin', 'Field Path', 'URL', 'Status Code',
'Content Type', 'File Extension', 'URL Length (Bytes)', 'Size'
])

url_path_set = set() # Track unique field path and URL combinations

# Read JSON file path from the terminal
json_file_path = sys.argv[1]

# Read JSON file
with open(json_file_path, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)

# For each object type and spec
for obj_type, obj_spec in OBJECT_SPECS.items():
for obj in data.get(obj_type, []):
origin = obj.get("origin", "")
url_tuples = extract_fields(obj_spec, obj)

# Iterating through URL tuples
for keys, url in url_tuples:
path = '.'.join(str(key) for key in (obj_type,) + keys)
url_info = get_url_info(url)

# Check for unique field path and URL combination
if (path, url) in url_path_set:
continue

url_path_set.add((path, url))

# Save URL info in CSV file
csv_writer.writerow([
origin,
path, url, *url_info
])

sys.stdout.flush()


if __name__ == '__main__':
main()

0 comments on commit fdae07f

Please sign in to comment.