Skip to content

Commit

Permalink
keep track of refresh timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
jarno-knaw committed Aug 1, 2024
1 parent 9969e72 commit a2f8098
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 30 deletions.
19 changes: 15 additions & 4 deletions entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import shutil
import time
from pathlib import Path
from typing import IO

from src.exceptions import InvalidConfigurationException
from src.graphdb import get_loaded_vocabs, setup_graphdb
from src.graphdb import get_loaded_vocabs, set_timestamp, setup_graphdb, update_timestamp
from src.vocabularies import get_file_from_config, get_graph, load_vocab_yaml, load_vocabulary


def append_file(source, dest):
def append_file(source: IO, dest: str):
"""
Append source to dest file.
:param source: A file pointer to a source file.
Expand Down Expand Up @@ -61,11 +62,21 @@ def append_file(source, dest):
graph = get_graph(config)
print(f"Graph: {graph}")

always_load = vocab_config['config'].get('alwaysRefresh', False)
should_reload = False
if graph not in loaded_vocabs:
should_reload = True
elif vocab_config['config'].get('refresh', False):
interval = vocab_config['config'].get('refreshInterval', 0)
diff = (time.time() - loaded_vocabs[graph]) / 3600
should_reload = diff > interval

if always_load or graph not in loaded_vocabs:
if should_reload:
print(f"Loading vocabulary {vocab}")
load_vocabulary(vocab_config['source'], data, graph)
if graph in loaded_vocabs:
update_timestamp(graph, int(time.time()))
else:
set_timestamp(graph, int(time.time()))
print("... DONE")

# Doing this last makes sure the vocab isn't added to the config when there's a problem
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
PyYAML~=6.0.1
requests~=2.31.0

SPARQLWrapper~=2.0.0
89 changes: 72 additions & 17 deletions src/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
This file contains functions for interacting with GraphDB
"""
import os
from typing import TextIO

import requests

from SPARQLWrapper import SPARQLWrapper, JSON, POST, DIGEST

admin_password = os.environ.get("ADMIN_PASSWORD", '')
endpoint = os.environ.get("SPARQL_ENDPOINT", '')


def setup_graphdb():
def setup_graphdb() -> None:
"""
Setup graphdb, if it isn't set up yet.
:return:
Expand All @@ -33,28 +37,79 @@ def setup_graphdb():
print(f"EXISTS GRAPHDB [{endpoint}]]")


def get_loaded_vocabs():
def get_loaded_vocabs() -> dict[str, int]:
"""
Get all loaded vocabularies from GraphDB
:return:
"""
graphs_response = requests.get(
f"{endpoint}/rdf-graphs",
headers={"Accept": "application/json"},
timeout=60
)
tmp = []
if graphs_response.status_code == 200:
body = graphs_response.json()
tmp = []
for binding in body["results"]["bindings"]:
tmp.append(binding["contextID"]["value"])
print("Loaded vocabs:")
print(tmp)
sparql = SPARQLWrapper(endpoint)
sparql.setReturnFormat(JSON)
q = """
SELECT ?graph ?timestamp
WHERE {
?graph <http://purl.org/dc/terms/modified> ?timestamp .
FILTER NOT EXISTS {
GRAPH ?g {?graph <http://purl.org/dc/terms/modified> ?timestamp .}
}
}
ORDER BY ?timestamp
"""
sparql.setQuery(q)
result = sparql.queryAndConvert()
result = result['results']['bindings']
tmp = {}
for line in result:
tmp[line['graph']['value']] = int(line['timestamp']['value'])
return tmp


def get_type(extension):
def set_timestamp(graph_name: str, timestamp: int) -> None:
"""
Set a timestamp for a new graph.
:param graph_name:
:param timestamp:
:return:
"""
sparql = SPARQLWrapper(f"{endpoint}/statements")
sparql.setHTTPAuth(DIGEST)
sparql.setCredentials("admin", admin_password)
sparql.setMethod(POST)
q = """INSERT DATA {{
<{graph}> <http://purl.org/dc/terms/modified> {timestamp} .
}}"""
q_formatted = q.format(graph=graph_name, timestamp=timestamp)
print(q_formatted)
sparql.setQuery(q_formatted)
sparql.query()


def update_timestamp(graph_name: str, timestamp: int) -> None:
"""
Set a timestamp for an existing graph.
:param graph_name:
:param timestamp:
:return:
"""
sparql = SPARQLWrapper(f"{endpoint}/statements")
sparql.setHTTPAuth(DIGEST)
sparql.setCredentials("admin", admin_password)
sparql.setMethod(POST)
q = """
DELETE {{
<{graph}> <http://purl.org/dc/terms/modified> ?timestamp .
}}
INSERT {{
<{graph}> <http://purl.org/dc/terms/modified> {timestamp} .
}}
WHERE {{
<{graph}> <http://purl.org/dc/terms/modified> ?timestamp .
}}
"""
sparql.setQuery(q.format(graph=graph_name, timestamp=timestamp))
sparql.query()


def get_type(extension: str) -> str:
"""
Get the http mimetype based on the extension of a file.
:param extension:
Expand All @@ -68,7 +123,7 @@ def get_type(extension):
return "text/turtle"


def add_vocabulary(graph, graph_name, extension):
def add_vocabulary(graph: TextIO, graph_name: str, extension: str) -> None:
"""
Add a vocabulary to GraphDB
:param graph: File
Expand Down
20 changes: 11 additions & 9 deletions src/vocabularies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import re
import urllib.request
import urllib.parse
from pathlib import Path
from typing import IO, TextIO

import yaml

from src.exceptions import InvalidConfigurationException, UnknownAuthenticationTypeException
from src.graphdb import add_vocabulary


def get_file_from_config(config_data, data_dir):
def get_file_from_config(config_data: dict, data_dir: str) -> TextIO:
"""
Get the config file from yaml data.
:param config_data: The configuration, a dict with information about the file.
Expand Down Expand Up @@ -52,7 +55,7 @@ def get_file_from_config(config_data, data_dir):
raise InvalidConfigurationException("Type must be file")


def load_vocabulary(source_data, data_dir, graph_name):
def load_vocabulary(source_data: dict, data_dir: str, graph_name: str) -> None:
"""
Load a vocabulary using the source data from the yaml.
:param source_data:
Expand All @@ -64,25 +67,24 @@ def load_vocabulary(source_data, data_dir, graph_name):
add_vocabulary(vocab_file, graph_name, get_vocab_format(source_data))


def get_graph(fp):
def get_graph(fp: IO) -> str:
"""
Get the sparql graph from the given vocab
:param fp: The vocabulary config, a file pointer
:return:
"""
for line in fp:
# If line is a bytes-like object, we need to decode it
try:
line = line.decode()
except (UnicodeDecodeError, AttributeError):
line = line.decode('utf-8')
except UnicodeDecodeError:
# Already decoded
pass
if re.search("sparqlGraph", line):
return line.strip().split(" ")[1].strip("<>")
return ""


def load_vocab_yaml(file_location):
def load_vocab_yaml(file_location: Path) -> dict:
"""
Open a yaml config file and return a dict with its contents
:param file_location:
Expand All @@ -92,7 +94,7 @@ def load_vocab_yaml(file_location):
return yaml.safe_load(fp)


def get_vocab_format(source_data):
def get_vocab_format(source_data: dict) -> str:
"""
Return the vocab format of the given data source. It is either based on the file extension,
or on an override in the yaml file.
Expand All @@ -101,4 +103,4 @@ def get_vocab_format(source_data):
"""
if 'format' in source_data:
return source_data['format']
return source_data['location'].split('.')[-1]
return source_data['location'].split('?')[0].split('.')[-1]

0 comments on commit a2f8098

Please sign in to comment.