diff --git a/script.py b/script.py new file mode 100644 index 00000000..89f15dae --- /dev/null +++ b/script.py @@ -0,0 +1,130 @@ +"URL info caching script." + +import csv +import json +import os +import sys +from urllib.parse import urlparse +import requests + +OBJECT_SPECS = { + 'checkouts': { + 'patchset_files': [{'url': True}], + }, + 'builds': { + 'input_files': [{'url': True}], + 'output_files': [{'url': True}], + 'config_url': True, + 'log_url': True, + }, + 'tests': { + 'output_files': [{'url': True}], + 'log_url': True, + }, +} + + +def extract_fields(spec, data): + """ + Extract values of fields from data according to a specification. + + Args: + spec: The specification of fields to extract the values from. + data: The data to extract the field values from. + + Returns: + An array of tuples, where each tuple contains: + * A tuple containing dictionary keys leading to the field + * The field value + """ + tuples = [] + + if isinstance(spec, dict): + for key, value in spec.items(): + if key in data: + for ext_tuple in extract_fields(value, data[key]): + tuples.append(((key,) + ext_tuple[0], ext_tuple[1])) + elif isinstance(spec, list): + assert len(spec) == 1 + for item in enumerate(data): + tuples += extract_fields(spec[0], item) + elif spec is True: + return [(tuple(), data)] + + return tuples + + +def get_url_info(url): + """Extract url info and returns fields value.""" + + status_code = 0 + content_type = '' + size = '' + extension = os.path.splitext(urlparse(url).path)[1] + url_length = sys.getsizeof(url.encode('utf-8')) + # Some CI systems send links to index pages, instead of actual files, + # which is against the schema, and we don't really want to cache them. + # That's why we ignore URLs ending with "/" + if url.endswith('/'): + return (status_code, content_type, size, extension, url_length) + try: + response = requests.head(url, timeout=10) + except requests.exceptions.RequestException as error: + print(f"Error while trying to get URL info for {url}: {error!r}") + else: + status_code = response.status_code + + if status_code == 200: + content_type = response.headers.get('Content-Type') + size = response.headers.get('Content-Length') + + return (status_code, content_type, size, extension, url_length) + + +def main(): + """Extract URL info from JSON data and save it in a CSV file.""" + + # Prepare CSV file + csv_writer = csv.writer(sys.stdout) + csv_writer.writerow([ + 'Origin', 'Field Path', 'URL', 'Status Code', + 'Content Type', 'File Extension', 'URL Length (Bytes)', 'Size' + ]) + + url_path_set = set() # Track unique field path and URL combinations + + # Read JSON file path from the terminal + json_file_path = sys.argv[1] + + # Read JSON file + with open(json_file_path, 'r', encoding='utf-8') as json_file: + data = json.load(json_file) + + # For each object type and spec + for obj_type, obj_spec in OBJECT_SPECS.items(): + for obj in data.get(obj_type, []): + origin = obj.get("origin", "") + url_tuples = extract_fields(obj_spec, obj) + + # Iterating through URL tuples + for keys, url in url_tuples: + path = '.'.join(str(key) for key in (obj_type,) + keys) + url_info = get_url_info(url) + + # Check for unique field path and URL combination + if (path, url) in url_path_set: + continue + + url_path_set.add((path, url)) + + # Save URL info in CSV file + csv_writer.writerow([ + origin, + path, url, *url_info + ]) + + sys.stdout.flush() + + +if __name__ == '__main__': + main()