diff --git a/entrypoint.py b/entrypoint.py index 1311731..288616c 100755 --- a/entrypoint.py +++ b/entrypoint.py @@ -9,13 +9,14 @@ import shutil import time from pathlib import Path +from typing import IO from src.exceptions import InvalidConfigurationException -from src.graphdb import get_loaded_vocabs, setup_graphdb +from src.graphdb import get_loaded_vocabs, set_timestamp, setup_graphdb, update_timestamp from src.vocabularies import get_file_from_config, get_graph, load_vocab_yaml, load_vocabulary -def append_file(source, dest): +def append_file(source: IO, dest: str): """ Append source to dest file. :param source: A file pointer to a source file. @@ -61,11 +62,21 @@ def append_file(source, dest): graph = get_graph(config) print(f"Graph: {graph}") - always_load = vocab_config['config'].get('alwaysRefresh', False) + should_reload = False + if graph not in loaded_vocabs: + should_reload = True + elif vocab_config['config'].get('refresh', False): + interval = vocab_config['config'].get('refreshInterval', 0) + diff = (time.time() - loaded_vocabs[graph]) / 3600 + should_reload = diff > interval - if always_load or graph not in loaded_vocabs: + if should_reload: print(f"Loading vocabulary {vocab}") load_vocabulary(vocab_config['source'], data, graph) + if graph in loaded_vocabs: + update_timestamp(graph, int(time.time())) + else: + set_timestamp(graph, int(time.time())) print("... DONE") # Doing this last makes sure the vocab isn't added to the config when there's a problem diff --git a/requirements.txt b/requirements.txt index 3c4c149..c41d163 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ PyYAML~=6.0.1 requests~=2.31.0 + +SPARQLWrapper~=2.0.0 diff --git a/src/graphdb.py b/src/graphdb.py index 6a0207d..270b849 100644 --- a/src/graphdb.py +++ b/src/graphdb.py @@ -2,13 +2,17 @@ This file contains functions for interacting with GraphDB """ import os +from typing import TextIO + import requests +from SPARQLWrapper import SPARQLWrapper, JSON, POST, DIGEST + admin_password = os.environ.get("ADMIN_PASSWORD", '') endpoint = os.environ.get("SPARQL_ENDPOINT", '') -def setup_graphdb(): +def setup_graphdb() -> None: """ Setup graphdb, if it isn't set up yet. :return: @@ -33,28 +37,79 @@ def setup_graphdb(): print(f"EXISTS GRAPHDB [{endpoint}]]") -def get_loaded_vocabs(): +def get_loaded_vocabs() -> dict[str, int]: """ Get all loaded vocabularies from GraphDB :return: """ - graphs_response = requests.get( - f"{endpoint}/rdf-graphs", - headers={"Accept": "application/json"}, - timeout=60 - ) - tmp = [] - if graphs_response.status_code == 200: - body = graphs_response.json() - tmp = [] - for binding in body["results"]["bindings"]: - tmp.append(binding["contextID"]["value"]) - print("Loaded vocabs:") - print(tmp) + sparql = SPARQLWrapper(endpoint) + sparql.setReturnFormat(JSON) + q = """ + SELECT ?graph ?timestamp + WHERE { + ?graph ?timestamp . + FILTER NOT EXISTS { + GRAPH ?g {?graph ?timestamp .} + } + } + ORDER BY ?timestamp + """ + sparql.setQuery(q) + result = sparql.queryAndConvert() + result = result['results']['bindings'] + tmp = {} + for line in result: + tmp[line['graph']['value']] = int(line['timestamp']['value']) return tmp -def get_type(extension): +def set_timestamp(graph_name: str, timestamp: int) -> None: + """ + Set a timestamp for a new graph. + :param graph_name: + :param timestamp: + :return: + """ + sparql = SPARQLWrapper(f"{endpoint}/statements") + sparql.setHTTPAuth(DIGEST) + sparql.setCredentials("admin", admin_password) + sparql.setMethod(POST) + q = """INSERT DATA {{ + <{graph}> {timestamp} . + }}""" + q_formatted = q.format(graph=graph_name, timestamp=timestamp) + print(q_formatted) + sparql.setQuery(q_formatted) + sparql.query() + + +def update_timestamp(graph_name: str, timestamp: int) -> None: + """ + Set a timestamp for an existing graph. + :param graph_name: + :param timestamp: + :return: + """ + sparql = SPARQLWrapper(f"{endpoint}/statements") + sparql.setHTTPAuth(DIGEST) + sparql.setCredentials("admin", admin_password) + sparql.setMethod(POST) + q = """ + DELETE {{ + <{graph}> ?timestamp . + }} + INSERT {{ + <{graph}> {timestamp} . + }} + WHERE {{ + <{graph}> ?timestamp . + }} + """ + sparql.setQuery(q.format(graph=graph_name, timestamp=timestamp)) + sparql.query() + + +def get_type(extension: str) -> str: """ Get the http mimetype based on the extension of a file. :param extension: @@ -68,7 +123,7 @@ def get_type(extension): return "text/turtle" -def add_vocabulary(graph, graph_name, extension): +def add_vocabulary(graph: TextIO, graph_name: str, extension: str) -> None: """ Add a vocabulary to GraphDB :param graph: File diff --git a/src/vocabularies.py b/src/vocabularies.py index bb7f5ba..68de659 100644 --- a/src/vocabularies.py +++ b/src/vocabularies.py @@ -5,13 +5,16 @@ import re import urllib.request import urllib.parse +from pathlib import Path +from typing import IO, TextIO + import yaml from src.exceptions import InvalidConfigurationException, UnknownAuthenticationTypeException from src.graphdb import add_vocabulary -def get_file_from_config(config_data, data_dir): +def get_file_from_config(config_data: dict, data_dir: str) -> TextIO: """ Get the config file from yaml data. :param config_data: The configuration, a dict with information about the file. @@ -52,7 +55,7 @@ def get_file_from_config(config_data, data_dir): raise InvalidConfigurationException("Type must be file") -def load_vocabulary(source_data, data_dir, graph_name): +def load_vocabulary(source_data: dict, data_dir: str, graph_name: str) -> None: """ Load a vocabulary using the source data from the yaml. :param source_data: @@ -64,17 +67,16 @@ def load_vocabulary(source_data, data_dir, graph_name): add_vocabulary(vocab_file, graph_name, get_vocab_format(source_data)) -def get_graph(fp): +def get_graph(fp: IO) -> str: """ Get the sparql graph from the given vocab :param fp: The vocabulary config, a file pointer :return: """ for line in fp: - # If line is a bytes-like object, we need to decode it try: - line = line.decode() - except (UnicodeDecodeError, AttributeError): + line = line.decode('utf-8') + except UnicodeDecodeError: # Already decoded pass if re.search("sparqlGraph", line): @@ -82,7 +84,7 @@ def get_graph(fp): return "" -def load_vocab_yaml(file_location): +def load_vocab_yaml(file_location: Path) -> dict: """ Open a yaml config file and return a dict with its contents :param file_location: @@ -92,7 +94,7 @@ def load_vocab_yaml(file_location): return yaml.safe_load(fp) -def get_vocab_format(source_data): +def get_vocab_format(source_data: dict) -> str: """ Return the vocab format of the given data source. It is either based on the file extension, or on an override in the yaml file. @@ -101,4 +103,4 @@ def get_vocab_format(source_data): """ if 'format' in source_data: return source_data['format'] - return source_data['location'].split('.')[-1] + return source_data['location'].split('?')[0].split('.')[-1]