From c6f964368f64b51636e9f7ce44d7e895b36ebbf3 Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Sun, 24 Nov 2024 23:14:54 +0100 Subject: [PATCH] For the `MULTI_INPUT_JSON` value, also allow `JSONL` (#91) So far, `MULTI_INPUT_JSON` had to be a JSON string. When producing the string programatically for a large number of directories or files, that can be cumbersome because of the leading `[` and trailing `]` and the separating commas. To make this easier, the string may now also be in JSONL format (one JSON per line). It will then simply be transformed to a JSON array in the obvious way. As a proof of concept and as an example, the Qleverfile for DBLP is adapted accordingly. --- pyproject.toml | 2 +- src/qlever/Qleverfiles/Qleverfile.dblp | 2 +- src/qlever/commands/index.py | 171 +++++++++++++++---------- 3 files changed, 103 insertions(+), 72 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 962ce3d3..18991391 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "qlever" description = "Script for using the QLever SPARQL engine." -version = "0.5.11" +version = "0.5.12" authors = [ { name = "Hannah Bast", email = "bast@cs.uni-freiburg.de" } ] diff --git a/src/qlever/Qleverfiles/Qleverfile.dblp b/src/qlever/Qleverfiles/Qleverfile.dblp index 8a0e7f5d..d7e90937 100644 --- a/src/qlever/Qleverfiles/Qleverfile.dblp +++ b/src/qlever/Qleverfiles/Qleverfile.dblp @@ -17,7 +17,7 @@ FORMAT = ttl [index] INPUT_FILES = *.gz -MULTI_INPUT_JSON = $$(ls *.gz | awk 'BEGIN { printf "[ " } NR > 1 { printf ", " } { printf "{\"cmd\": \"zcat " $$0 "\"}" } END { printf "]" }') +MULTI_INPUT_JSON = $$(ls *.gz | xargs -I {} echo '{ "cmd": "zcat {}" }') SETTINGS_JSON = { "ascii-prefixes-only": false, "num-triples-per-batch": 5000000, "prefixes-external": [""] } [server] diff --git a/src/qlever/commands/index.py b/src/qlever/commands/index.py index a2c5d6ab..cd4478e1 100644 --- a/src/qlever/commands/index.py +++ b/src/qlever/commands/index.py @@ -3,12 +3,12 @@ import glob import json import shlex +import re from qlever.command import QleverCommand from qlever.containerize import Containerize from qlever.log import log -from qlever.util import (get_existing_index_files, get_total_file_size, - run_command) +from qlever.util import get_existing_index_files, get_total_file_size, run_command class IndexCommand(QleverCommand): @@ -20,24 +20,36 @@ def __init__(self): pass def description(self) -> str: - return ("Build the index for a given RDF dataset") + return "Build the index for a given RDF dataset" def should_have_qleverfile(self) -> bool: return True - def relevant_qleverfile_arguments(self) -> dict[str: list[str]]: - return {"data": ["name", "format"], - "index": ["input_files", "cat_input_files", "multi_input_json", - "parallel_parsing", "settings_json", "index_binary", - "only_pso_and_pos_permutations", "use_patterns", - "text_index", "stxxl_memory"], - "runtime": ["system", "image", "index_container"]} + def relevant_qleverfile_arguments(self) -> dict[str : list[str]]: + return { + "data": ["name", "format"], + "index": [ + "input_files", + "cat_input_files", + "multi_input_json", + "parallel_parsing", + "settings_json", + "index_binary", + "only_pso_and_pos_permutations", + "use_patterns", + "text_index", + "stxxl_memory", + ], + "runtime": ["system", "image", "index_container"], + } def additional_arguments(self, subparser) -> None: subparser.add_argument( - "--overwrite-existing", action="store_true", - default=False, - help="Overwrite an existing index, think twice before using.") + "--overwrite-existing", + action="store_true", + default=False, + help="Overwrite an existing index, think twice before using.", + ) # Exception for invalid JSON. class InvalidInputJson(Exception): @@ -48,22 +60,29 @@ def __init__(self, error_message, additional_info): # Helper function to get command line options from JSON. def get_input_options_for_json(self, args) -> str: - # Parse the JSON. + # Parse the JSON. If `args.multi_input_json` look like JSONL, turn + # it into a JSON array. try: + jsonl_line_regex = re.compile(r"^\s*\{.*\}\s*$") + jsonl_lines = args.multi_input_json.split("\n") + if all(re.match(jsonl_line_regex, line) for line in jsonl_lines): + args.multi_input_json = "[" + ", ".join(jsonl_lines) + "]" input_specs = json.loads(args.multi_input_json) except Exception as e: raise self.InvalidInputJson( - f"Failed to parse `MULTI_INPUT_JSON` ({e})", - args.multi_input_json) + f"Failed to parse `MULTI_INPUT_JSON` as either JSON or JSONL ({e})", + args.multi_input_json, + ) # Check that it is an array of length at least one. if not isinstance(input_specs, list): raise self.InvalidInputJson( - "`MULTI_INPUT_JSON` must be a JSON array", - args.multi_input_json) + "`MULTI_INPUT_JSON` must be a JSON array", args.multi_input_json + ) if len(input_specs) == 0: raise self.InvalidInputJson( - "`MULTI_INPUT_JSON` must contain at least one element", - args.multi_input_json) + "`MULTI_INPUT_JSON` must contain at least one element", + args.multi_input_json, + ) # For each of the maps, construct the corresponding command-line # options to the index binary. input_options = [] @@ -71,15 +90,15 @@ def get_input_options_for_json(self, args) -> str: # Check that `input_spec` is a dictionary. if not isinstance(input_spec, dict): raise self.InvalidInputJson( - f"Element {i} in `MULTI_INPUT_JSON` must be a JSON " - "object", - input_spec) + f"Element {i} in `MULTI_INPUT_JSON` must be a JSON " "object", + input_spec, + ) # For each `input_spec`, we must have a command. if "cmd" not in input_spec: raise self.InvalidInputJson( - f"Element {i} in `MULTI_INPUT_JSON` must contain a " - "key `cmd`", - input_spec) + f"Element {i} in `MULTI_INPUT_JSON` must contain a " "key `cmd`", + input_spec, + ) input_cmd = input_spec["cmd"] # The `format`, `graph`, and `parallel` keys are optional. input_format = input_spec.get("format", args.format) @@ -89,17 +108,19 @@ def get_input_options_for_json(self, args) -> str: extra_keys = input_spec.keys() - {"cmd", "format", "graph", "parallel"} if extra_keys: raise self.InvalidInputJson( - f"Element {i} in `MULTI_INPUT_JSON` must only contain " - "the keys `format`, `graph`, and `parallel`. Contains " - "extra keys {extra_keys}.", - input_spec) + f"Element {i} in `MULTI_INPUT_JSON` must only contain " + "the keys `format`, `graph`, and `parallel`. Contains " + "extra keys {extra_keys}.", + input_spec, + ) # Add the command-line options for this input stream. We use # process substitution `<(...)` as a convenient way to handle # an input stream just like a file. This is not POSIX compliant, # but supported by various shells, including bash and zsh. input_options.append( - f"-f <({input_cmd}) -F {input_format} " - f"-g \"{input_graph}\" -p {input_parallel}") + f"-f <({input_cmd}) -F {input_format} " + f'-g "{input_graph}" -p {input_parallel}' + ) # Return the concatenated command-line options. return " ".join(input_options) @@ -108,11 +129,13 @@ def execute(self, args) -> bool: # basename of the index, and the settings file). There are two ways # to specify the input: via a single stream or via multiple streams. if args.cat_input_files and not args.multi_input_json: - index_cmd = (f"{args.cat_input_files} | {args.index_binary}" - f" -i {args.name} -s {args.name}.settings.json" - f" -F {args.format} -f -") + index_cmd = ( + f"{args.cat_input_files} | {args.index_binary}" + f" -i {args.name} -s {args.name}.settings.json" + f" -F {args.format} -f -" + ) if args.parallel_parsing: - index_cmd += (f" -p {args.parallel_parsing}") + index_cmd += f" -p {args.parallel_parsing}" elif args.multi_input_json and not args.cat_input_files: try: input_options = self.get_input_options_for_json(args) @@ -121,13 +144,17 @@ def execute(self, args) -> bool: log.info("") log.info(e.additional_info) return False - index_cmd = (f"{args.index_binary}" - f" -i {args.name} -s {args.name}.settings.json" - f" {input_options}") + index_cmd = ( + f"{args.index_binary}" + f" -i {args.name} -s {args.name}.settings.json" + f" {input_options}" + ) else: - log.error("Specify exactly one of `CAT_INPUT_FILES` (for a " - "single input stream) or `MULTI_INPUT_JSON` (for " - "multiple input streams)") + log.error( + "Specify exactly one of `CAT_INPUT_FILES` (for a " + "single input stream) or `MULTI_INPUT_JSON` (for " + "multiple input streams)" + ) log.info("") log.info("See `qlever index --help` for more information") return False @@ -137,12 +164,11 @@ def execute(self, args) -> bool: index_cmd += " --only-pso-and-pos-permutations --no-patterns" if not args.use_patterns: index_cmd += " --no-patterns" - if args.text_index in \ - ["from_text_records", "from_text_records_and_literals"]: - index_cmd += (f" -w {args.name}.wordsfile.tsv" - f" -d {args.name}.docsfile.tsv") - if args.text_index in \ - ["from_literals", "from_text_records_and_literals"]: + if args.text_index in ["from_text_records", "from_text_records_and_literals"]: + index_cmd += ( + f" -w {args.name}.wordsfile.tsv" f" -d {args.name}.docsfile.tsv" + ) + if args.text_index in ["from_literals", "from_text_records_and_literals"]: index_cmd += " --text-words-from-literals" if args.stxxl_memory: index_cmd += f" --stxxl-memory {args.stxxl_memory}" @@ -150,24 +176,26 @@ def execute(self, args) -> bool: # If the total file size is larger than 10 GB, set ulimit (such that a # large number of open files is allowed). - total_file_size = get_total_file_size( - shlex.split(args.input_files)) + total_file_size = get_total_file_size(shlex.split(args.input_files)) if total_file_size > 1e10: index_cmd = f"ulimit -Sn 1048576; {index_cmd}" # Run the command in a container (if so desired). if args.system in Containerize.supported_systems(): index_cmd = Containerize().containerize_command( - index_cmd, - args.system, "run --rm", - args.image, - args.index_container, - volumes=[("$(pwd)", "/index")], - working_directory="/index") + index_cmd, + args.system, + "run --rm", + args.image, + args.index_container, + volumes=[("$(pwd)", "/index")], + working_directory="/index", + ) # Command for writing the settings JSON to a file. - settings_json_cmd = (f"echo {shlex.quote(args.settings_json)} " - f"> {args.name}.settings.json") + settings_json_cmd = ( + f"echo {shlex.quote(args.settings_json)} " f"> {args.name}.settings.json" + ) # Show the command line. self.show(f"{settings_json_cmd}\n{index_cmd}", only_show=args.show) @@ -179,9 +207,11 @@ def execute(self, args) -> bool: try: run_command(f"{args.index_binary} --help") except Exception as e: - log.error(f"Running \"{args.index_binary}\" failed, " - f"set `--index-binary` to a different binary or " - f"set `--system to a container system`") + log.error( + f'Running "{args.index_binary}" failed, ' + f"set `--index-binary` to a different binary or " + f"set `--system to a container system`" + ) log.info("") log.info(f"The error message was: {e}") return False @@ -189,28 +219,29 @@ def execute(self, args) -> bool: # Check if all of the input files exist. for pattern in shlex.split(args.input_files): if len(glob.glob(pattern)) == 0: - log.error(f"No file matching \"{pattern}\" found") + log.error(f'No file matching "{pattern}" found') log.info("") - log.info("Did you call `qlever get-data`? If you did, check " - "GET_DATA_CMD and INPUT_FILES in the QLeverfile") + log.info( + "Did you call `qlever get-data`? If you did, check " + "GET_DATA_CMD and INPUT_FILES in the QLeverfile" + ) return False # Check if index files (name.index.*) already exist. existing_index_files = get_existing_index_files(args.name) if len(existing_index_files) > 0 and not args.overwrite_existing: log.error( - f"Index files for basename \"{args.name}\" found, if you " - f"want to overwrite them, use --overwrite-existing") + f'Index files for basename "{args.name}" found, if you ' + f"want to overwrite them, use --overwrite-existing" + ) log.info("") log.info(f"Index files found: {existing_index_files}") return False # Remove already existing container. - if args.system in Containerize.supported_systems() \ - and args.overwrite_existing: + if args.system in Containerize.supported_systems() and args.overwrite_existing: if Containerize.is_running(args.system, args.index_container): - log.info("Another index process is running, trying to stop " - "it ...") + log.info("Another index process is running, trying to stop " "it ...") log.info("") try: run_command(f"{args.system} rm -f {args.index_container}")