Skip to content

Commit

Permalink
For the MULTI_INPUT_JSON value, also allow JSONL (#91)
Browse files Browse the repository at this point in the history
So far, `MULTI_INPUT_JSON` had to be a JSON string. When producing the string programatically for a large number of directories or files, that can be cumbersome because of the leading `[` and trailing `]` and the separating commas.

To make this easier, the string may now also be in JSONL format (one JSON per line). It will then simply be transformed to a JSON array in the obvious way.

As a proof of concept and as an example, the Qleverfile for DBLP is adapted accordingly.
  • Loading branch information
hannahbast authored Nov 24, 2024
1 parent 0e74e90 commit c6f9643
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "qlever"
description = "Script for using the QLever SPARQL engine."
version = "0.5.11"
version = "0.5.12"
authors = [
{ name = "Hannah Bast", email = "[email protected]" }
]
Expand Down
2 changes: 1 addition & 1 deletion src/qlever/Qleverfiles/Qleverfile.dblp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ FORMAT = ttl

[index]
INPUT_FILES = *.gz
MULTI_INPUT_JSON = $$(ls *.gz | awk 'BEGIN { printf "[ " } NR > 1 { printf ", " } { printf "{\"cmd\": \"zcat " $$0 "\"}" } END { printf "]" }')
MULTI_INPUT_JSON = $$(ls *.gz | xargs -I {} echo '{ "cmd": "zcat {}" }')
SETTINGS_JSON = { "ascii-prefixes-only": false, "num-triples-per-batch": 5000000, "prefixes-external": [""] }

[server]
Expand Down
171 changes: 101 additions & 70 deletions src/qlever/commands/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import glob
import json
import shlex
import re

from qlever.command import QleverCommand
from qlever.containerize import Containerize
from qlever.log import log
from qlever.util import (get_existing_index_files, get_total_file_size,
run_command)
from qlever.util import get_existing_index_files, get_total_file_size, run_command


class IndexCommand(QleverCommand):
Expand All @@ -20,24 +20,36 @@ def __init__(self):
pass

def description(self) -> str:
return ("Build the index for a given RDF dataset")
return "Build the index for a given RDF dataset"

def should_have_qleverfile(self) -> bool:
return True

def relevant_qleverfile_arguments(self) -> dict[str: list[str]]:
return {"data": ["name", "format"],
"index": ["input_files", "cat_input_files", "multi_input_json",
"parallel_parsing", "settings_json", "index_binary",
"only_pso_and_pos_permutations", "use_patterns",
"text_index", "stxxl_memory"],
"runtime": ["system", "image", "index_container"]}
def relevant_qleverfile_arguments(self) -> dict[str : list[str]]:
return {
"data": ["name", "format"],
"index": [
"input_files",
"cat_input_files",
"multi_input_json",
"parallel_parsing",
"settings_json",
"index_binary",
"only_pso_and_pos_permutations",
"use_patterns",
"text_index",
"stxxl_memory",
],
"runtime": ["system", "image", "index_container"],
}

def additional_arguments(self, subparser) -> None:
subparser.add_argument(
"--overwrite-existing", action="store_true",
default=False,
help="Overwrite an existing index, think twice before using.")
"--overwrite-existing",
action="store_true",
default=False,
help="Overwrite an existing index, think twice before using.",
)

# Exception for invalid JSON.
class InvalidInputJson(Exception):
Expand All @@ -48,38 +60,45 @@ def __init__(self, error_message, additional_info):

# Helper function to get command line options from JSON.
def get_input_options_for_json(self, args) -> str:
# Parse the JSON.
# Parse the JSON. If `args.multi_input_json` look like JSONL, turn
# it into a JSON array.
try:
jsonl_line_regex = re.compile(r"^\s*\{.*\}\s*$")
jsonl_lines = args.multi_input_json.split("\n")
if all(re.match(jsonl_line_regex, line) for line in jsonl_lines):
args.multi_input_json = "[" + ", ".join(jsonl_lines) + "]"
input_specs = json.loads(args.multi_input_json)
except Exception as e:
raise self.InvalidInputJson(
f"Failed to parse `MULTI_INPUT_JSON` ({e})",
args.multi_input_json)
f"Failed to parse `MULTI_INPUT_JSON` as either JSON or JSONL ({e})",
args.multi_input_json,
)
# Check that it is an array of length at least one.
if not isinstance(input_specs, list):
raise self.InvalidInputJson(
"`MULTI_INPUT_JSON` must be a JSON array",
args.multi_input_json)
"`MULTI_INPUT_JSON` must be a JSON array", args.multi_input_json
)
if len(input_specs) == 0:
raise self.InvalidInputJson(
"`MULTI_INPUT_JSON` must contain at least one element",
args.multi_input_json)
"`MULTI_INPUT_JSON` must contain at least one element",
args.multi_input_json,
)
# For each of the maps, construct the corresponding command-line
# options to the index binary.
input_options = []
for i, input_spec in enumerate(input_specs):
# Check that `input_spec` is a dictionary.
if not isinstance(input_spec, dict):
raise self.InvalidInputJson(
f"Element {i} in `MULTI_INPUT_JSON` must be a JSON "
"object",
input_spec)
f"Element {i} in `MULTI_INPUT_JSON` must be a JSON " "object",
input_spec,
)
# For each `input_spec`, we must have a command.
if "cmd" not in input_spec:
raise self.InvalidInputJson(
f"Element {i} in `MULTI_INPUT_JSON` must contain a "
"key `cmd`",
input_spec)
f"Element {i} in `MULTI_INPUT_JSON` must contain a " "key `cmd`",
input_spec,
)
input_cmd = input_spec["cmd"]
# The `format`, `graph`, and `parallel` keys are optional.
input_format = input_spec.get("format", args.format)
Expand All @@ -89,17 +108,19 @@ def get_input_options_for_json(self, args) -> str:
extra_keys = input_spec.keys() - {"cmd", "format", "graph", "parallel"}
if extra_keys:
raise self.InvalidInputJson(
f"Element {i} in `MULTI_INPUT_JSON` must only contain "
"the keys `format`, `graph`, and `parallel`. Contains "
"extra keys {extra_keys}.",
input_spec)
f"Element {i} in `MULTI_INPUT_JSON` must only contain "
"the keys `format`, `graph`, and `parallel`. Contains "
"extra keys {extra_keys}.",
input_spec,
)
# Add the command-line options for this input stream. We use
# process substitution `<(...)` as a convenient way to handle
# an input stream just like a file. This is not POSIX compliant,
# but supported by various shells, including bash and zsh.
input_options.append(
f"-f <({input_cmd}) -F {input_format} "
f"-g \"{input_graph}\" -p {input_parallel}")
f"-f <({input_cmd}) -F {input_format} "
f'-g "{input_graph}" -p {input_parallel}'
)
# Return the concatenated command-line options.
return " ".join(input_options)

Expand All @@ -108,11 +129,13 @@ def execute(self, args) -> bool:
# basename of the index, and the settings file). There are two ways
# to specify the input: via a single stream or via multiple streams.
if args.cat_input_files and not args.multi_input_json:
index_cmd = (f"{args.cat_input_files} | {args.index_binary}"
f" -i {args.name} -s {args.name}.settings.json"
f" -F {args.format} -f -")
index_cmd = (
f"{args.cat_input_files} | {args.index_binary}"
f" -i {args.name} -s {args.name}.settings.json"
f" -F {args.format} -f -"
)
if args.parallel_parsing:
index_cmd += (f" -p {args.parallel_parsing}")
index_cmd += f" -p {args.parallel_parsing}"
elif args.multi_input_json and not args.cat_input_files:
try:
input_options = self.get_input_options_for_json(args)
Expand All @@ -121,13 +144,17 @@ def execute(self, args) -> bool:
log.info("")
log.info(e.additional_info)
return False
index_cmd = (f"{args.index_binary}"
f" -i {args.name} -s {args.name}.settings.json"
f" {input_options}")
index_cmd = (
f"{args.index_binary}"
f" -i {args.name} -s {args.name}.settings.json"
f" {input_options}"
)
else:
log.error("Specify exactly one of `CAT_INPUT_FILES` (for a "
"single input stream) or `MULTI_INPUT_JSON` (for "
"multiple input streams)")
log.error(
"Specify exactly one of `CAT_INPUT_FILES` (for a "
"single input stream) or `MULTI_INPUT_JSON` (for "
"multiple input streams)"
)
log.info("")
log.info("See `qlever index --help` for more information")
return False
Expand All @@ -137,37 +164,38 @@ def execute(self, args) -> bool:
index_cmd += " --only-pso-and-pos-permutations --no-patterns"
if not args.use_patterns:
index_cmd += " --no-patterns"
if args.text_index in \
["from_text_records", "from_text_records_and_literals"]:
index_cmd += (f" -w {args.name}.wordsfile.tsv"
f" -d {args.name}.docsfile.tsv")
if args.text_index in \
["from_literals", "from_text_records_and_literals"]:
if args.text_index in ["from_text_records", "from_text_records_and_literals"]:
index_cmd += (
f" -w {args.name}.wordsfile.tsv" f" -d {args.name}.docsfile.tsv"
)
if args.text_index in ["from_literals", "from_text_records_and_literals"]:
index_cmd += " --text-words-from-literals"
if args.stxxl_memory:
index_cmd += f" --stxxl-memory {args.stxxl_memory}"
index_cmd += f" | tee {args.name}.index-log.txt"

# If the total file size is larger than 10 GB, set ulimit (such that a
# large number of open files is allowed).
total_file_size = get_total_file_size(
shlex.split(args.input_files))
total_file_size = get_total_file_size(shlex.split(args.input_files))
if total_file_size > 1e10:
index_cmd = f"ulimit -Sn 1048576; {index_cmd}"

# Run the command in a container (if so desired).
if args.system in Containerize.supported_systems():
index_cmd = Containerize().containerize_command(
index_cmd,
args.system, "run --rm",
args.image,
args.index_container,
volumes=[("$(pwd)", "/index")],
working_directory="/index")
index_cmd,
args.system,
"run --rm",
args.image,
args.index_container,
volumes=[("$(pwd)", "/index")],
working_directory="/index",
)

# Command for writing the settings JSON to a file.
settings_json_cmd = (f"echo {shlex.quote(args.settings_json)} "
f"> {args.name}.settings.json")
settings_json_cmd = (
f"echo {shlex.quote(args.settings_json)} " f"> {args.name}.settings.json"
)

# Show the command line.
self.show(f"{settings_json_cmd}\n{index_cmd}", only_show=args.show)
Expand All @@ -179,38 +207,41 @@ def execute(self, args) -> bool:
try:
run_command(f"{args.index_binary} --help")
except Exception as e:
log.error(f"Running \"{args.index_binary}\" failed, "
f"set `--index-binary` to a different binary or "
f"set `--system to a container system`")
log.error(
f'Running "{args.index_binary}" failed, '
f"set `--index-binary` to a different binary or "
f"set `--system to a container system`"
)
log.info("")
log.info(f"The error message was: {e}")
return False

# Check if all of the input files exist.
for pattern in shlex.split(args.input_files):
if len(glob.glob(pattern)) == 0:
log.error(f"No file matching \"{pattern}\" found")
log.error(f'No file matching "{pattern}" found')
log.info("")
log.info("Did you call `qlever get-data`? If you did, check "
"GET_DATA_CMD and INPUT_FILES in the QLeverfile")
log.info(
"Did you call `qlever get-data`? If you did, check "
"GET_DATA_CMD and INPUT_FILES in the QLeverfile"
)
return False

# Check if index files (name.index.*) already exist.
existing_index_files = get_existing_index_files(args.name)
if len(existing_index_files) > 0 and not args.overwrite_existing:
log.error(
f"Index files for basename \"{args.name}\" found, if you "
f"want to overwrite them, use --overwrite-existing")
f'Index files for basename "{args.name}" found, if you '
f"want to overwrite them, use --overwrite-existing"
)
log.info("")
log.info(f"Index files found: {existing_index_files}")
return False

# Remove already existing container.
if args.system in Containerize.supported_systems() \
and args.overwrite_existing:
if args.system in Containerize.supported_systems() and args.overwrite_existing:
if Containerize.is_running(args.system, args.index_container):
log.info("Another index process is running, trying to stop "
"it ...")
log.info("Another index process is running, trying to stop " "it ...")
log.info("")
try:
run_command(f"{args.system} rm -f {args.index_container}")
Expand Down

0 comments on commit c6f9643

Please sign in to comment.