Skip to content

Commit

Permalink
cleanup: progress/spinner bars via rich
Browse files Browse the repository at this point in the history
  • Loading branch information
aMahanna committed Jan 19, 2024
1 parent e510e12 commit d28049d
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 100 deletions.
227 changes: 128 additions & 99 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
Tree,
empty_func,
get_bar_progress,
get_export_spinner_progress,
get_import_spinner_progress,
get_spinner_progress,
logger,
)

Expand Down Expand Up @@ -603,6 +603,26 @@ def rdf_to_arangodb_by_rpt(
if overwrite_graph:
self.db.delete_graph(name, ignore_missing=True, drop_collections=True)

# NOTE: Graph Contextualization is an experimental work-in-progress
contextualize_statement_func = empty_func
if contextualize_graph:
contextualize_statement_func = self.__rpt_contextualize_statement

self.__rdf_graph = self.__load_meta_ontology(self.__rdf_graph)

with get_spinner_progress("(RDF → ADB): Graph Contextualization") as rp:
rp.add_task("graph contextualization")

self.__explicit_type_map = self.__build_explicit_type_map()
self.__subclass_tree = self.__build_subclass_tree()
self.__predicate_scope = self.__build_predicate_scope()
self.__domain_range_map = self.__build_domain_range_map()
self.__type_map = self.__combine_type_map_and_dr_map()

self.__reified_subject_predicate_map = {}
if flatten_reified_triples:
self.__flatten_reified_triples(contextualize_statement_func, is_pgt=False)

rdf_graph_size = len(rdf_graph)
batch_size = batch_size or rdf_graph_size

Expand All @@ -616,25 +636,10 @@ def rdf_to_arangodb_by_rpt(
else rdf_graph.triples
)

bar_progress = get_bar_progress("(RDF → ADB): RPT", "#97C423")
bar_progress = get_bar_progress("(RDF → ADB): RPT", "#BF23C4")
bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

# NOTE: Graph Contextualization is an experimental work-in-progress
contextualize_statement_func = empty_func
if contextualize_graph:
contextualize_statement_func = self.__rpt_contextualize_statement
self.__rdf_graph = self.__load_meta_ontology(self.__rdf_graph)
self.__explicit_type_map = self.__build_explicit_type_map()
self.__subclass_tree = self.__build_subclass_tree()
self.__predicate_scope = self.__build_predicate_scope()
self.__domain_range_map = self.__build_domain_range_map()
self.__type_map = self.__combine_type_map_and_dr_map()

self.__reified_subject_predicate_map = {}
if flatten_reified_triples:
self.__flatten_reified_triples(contextualize_statement_func, is_pgt=False)

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)
Expand Down Expand Up @@ -886,23 +891,6 @@ def rdf_to_arangodb_by_pgt(
# Builds the ArangoDB Edge Definitions of the (soon to be) ArangoDB Graph
self.__e_col_map = defaultdict(lambda: defaultdict(set))

rdf_graph_size = len(rdf_graph)
batch_size = batch_size or rdf_graph_size

s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

statements = (
self.__rdf_graph.quads
if isinstance(self.__rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

bar_progress = get_bar_progress("(RDF → ADB): PGT", "#08479E")
bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

self.__pgt_remove_blacklisted_statements()

# NOTE: Graph Contextualization is an experimental work-in-progress
Expand Down Expand Up @@ -932,11 +920,28 @@ def rdf_to_arangodb_by_pgt(
if flatten_reified_triples:
self.__flatten_reified_triples(contextualize_statement_func, is_pgt=True)

s: RDFTerm # Subject
p: URIRef # Predicate
o: RDFTerm # Object

statements = (
self.__rdf_graph.quads
if isinstance(self.__rdf_graph, RDFConjunctiveGraph)
else self.__rdf_graph.triples
)

# TODO:
# self.__pgt_parse_rdf_lists()

self.__pgt_parse_literal_statements(statements, contextualize_statement_func)

rdf_graph_size = len(rdf_graph)
batch_size = batch_size or rdf_graph_size

bar_progress = get_bar_progress("(RDF → ADB): PGT", "#08479E")
bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size)
spinner_progress = get_import_spinner_progress(" ")

with Live(Group(bar_progress, spinner_progress)):
for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1):
bar_progress.advance(bar_progress_task)
Expand All @@ -962,7 +967,7 @@ def rdf_to_arangodb_by_pgt(

self.__insert_adb_docs(spinner_progress, use_async, **adb_import_kwargs)

bar_progress = get_bar_progress("(RDF → ADB): PGT List Processing ", "#EF7D00")
bar_progress = get_bar_progress("(RDF → ADB): PGT [RDF Lists]", "#EF7D00")
with Live(Group(bar_progress, spinner_progress)):
self.__pgt_process_rdf_lists(bar_progress)
self.__insert_adb_docs(spinner_progress, use_async)
Expand Down Expand Up @@ -1049,38 +1054,48 @@ def write_adb_col_statements(
self.__rdf_graph = rdf_graph
self.__cntrl.rdf_graph = rdf_graph

# 1. RDF.type statements
self.__explicit_type_map = self.__build_explicit_type_map(
self.__add_adb_col_statement
)
with get_spinner_progress("(RDF → ADB): Write Col Statements") as rp:
rp.add_task("adb:collection")

# 2. RDF.subClassOf Statements
self.__subclass_tree = self.__build_subclass_tree(self.__add_adb_col_statement)
# 1. RDF.type statements
self.__explicit_type_map = self.__build_explicit_type_map(
self.__add_adb_col_statement
)

# 3. Domain & Range Statements
self.__predicate_scope = self.__build_predicate_scope(
self.__add_adb_col_statement
)
self.__domain_range_map = self.__build_domain_range_map()
# 2. RDF.subClassOf Statements
self.__subclass_tree = self.__build_subclass_tree(
self.__add_adb_col_statement
)

# 4. (Optional) Create the type map for Graph Contextualization
if contextualize_graph:
self.__type_map = self.__combine_type_map_and_dr_map()

# 5. Finalize **adb_col_statements**
for rdf_map in [self.__explicit_type_map, self.__domain_range_map]:
for rdf_resource, class_set in rdf_map.items():
has_mapping = (rdf_resource, None, None) in self.__adb_col_statements
if has_mapping or len(class_set) == 0:
continue # pragma: no cover # (false negative)

adb_col = self.rdf_id_to_adb_label(
self.__cntrl.identify_best_class(
rdf_resource, class_set, self.__subclass_tree
# 3. Domain & Range Statements
self.__predicate_scope = self.__build_predicate_scope(
self.__add_adb_col_statement
)

self.__domain_range_map = self.__build_domain_range_map()

# 4. (Optional) Create the type map for Graph Contextualization
if contextualize_graph:
self.__type_map = self.__combine_type_map_and_dr_map()

# 5. Finalize **adb_col_statements**
for rdf_map in [self.__explicit_type_map, self.__domain_range_map]:
for rdf_resource, class_set in rdf_map.items():
has_mapping = (
rdf_resource,
None,
None,
) in self.__adb_col_statements
if has_mapping or len(class_set) == 0:
continue # pragma: no cover # (false negative)

adb_col = self.rdf_id_to_adb_label(
self.__cntrl.identify_best_class(
rdf_resource, class_set, self.__subclass_tree
)
)
)

self.__add_adb_col_statement(rdf_resource, adb_col)
self.__add_adb_col_statement(rdf_resource, adb_col)

self.__adb_col_statements.remove(
(self.adb_col_uri, self.adb_col_uri, Literal("Property"))
Expand Down Expand Up @@ -1204,7 +1219,7 @@ def __fetch_adb_docs(

col_size: int = self.__db.collection(col).count()

with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p:
with get_spinner_progress(f"(ADB → RDF): Export '{col}' ({col_size})") as p:
p.add_task(col)

cursor: Cursor = self.__db.aql.execute(
Expand Down Expand Up @@ -1892,15 +1907,6 @@ def __pgt_parse_literal_statements(
is disabled.
:type pgt_contextualize_statement_func: Callable[..., None]
"""
# query = """
# SELECT ?subject ?predicate
# WHERE {
# ?subject ?predicate ?object .
# FILTER isLiteral(?object)
# }
# GROUP BY ?subject ?predicate
# """

# TODO: Revisit FILTER clauses
# We rely on the FILTER clauses to make sure no literal
# statements belonging to RDF Lists are processed,
Expand All @@ -1920,22 +1926,31 @@ def __pgt_parse_literal_statements(
GROUP BY ?subject ?predicate
"""

for s, p in self.__rdf_graph.query(query):
s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)
data = self.__rdf_graph.query(query)

m = "(RDF → ADB): PGT [RDF Literals]"
bar_progress = get_bar_progress(m, "#EF7D00")
bar_progress_task = bar_progress.add_task(m, total=len(data) - 1)

p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)
with Live(Group(bar_progress)):
for s, p in self.__rdf_graph.query(query):
bar_progress.advance(bar_progress_task)

for _, _, o, *sg in statements((s, p, None)):
sg_str = self.__get_subgraph_str(sg)
s_meta = self.__pgt_get_term_metadata(s)
self.__pgt_process_rdf_term(s_meta)

o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)
p_meta = self.__pgt_get_term_metadata(p)
self.__pgt_process_rdf_term(p_meta)

pgt_contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)
for _, _, o, *sg in statements((s, p, None)):
sg_str = self.__get_subgraph_str(sg)

self.__rdf_graph.remove((s, p, o))
o_meta = self.__pgt_get_term_metadata(o)
self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str)

pgt_contextualize_statement_func(s_meta, p_meta, o_meta, sg_str)

self.__rdf_graph.remove((s, p, o))

def __pgt_process_subject_predicate_object(
self,
Expand Down Expand Up @@ -2574,10 +2589,6 @@ def __flatten_reified_triples(

data = self.__rdf_graph.query(query)

self.__reified_subject_predicate_map = {
reified_subject: p for reified_subject, _, p, *_ in data
}

reified_subject: RDFTerm
s: RDFTerm
p: URIRef
Expand All @@ -2589,19 +2600,37 @@ def __flatten_reified_triples(
else self.__rpt_process_subject_predicate_object
)

for reified_subject, s, p, o, *sg in data:
reified_triple_key = self.rdf_id_to_adb_key(
str(reified_subject), reified_subject
)
self.__reified_subject_predicate_map = {}

process_subject_predicate_object(
s, p, o, sg, reified_triple_key, contextualize_statement_func
)
m = "(RDF → ADB): Flatten Reified Triples [Prep]"
bar_progress = get_bar_progress(m, "#FFFFFF")
bar_progress_task = bar_progress.add_task(m, total=len(data))

with Live(Group(bar_progress)):
for reified_subject, _, p, *_ in data:
bar_progress.advance(bar_progress_task)
self.__reified_subject_predicate_map[reified_subject] = p

m = "(RDF → ADB): Flatten Reified Triples"
bar_progress = get_bar_progress(m, "#FFFFFF")
bar_progress_task = bar_progress.add_task(m, total=len(data))

with Live(Group(bar_progress)):
for reified_subject, s, p, o, *sg in data:
bar_progress.advance(bar_progress_task)

reified_triple_key = self.rdf_id_to_adb_key(
str(reified_subject), reified_subject
)

process_subject_predicate_object(
s, p, o, sg, reified_triple_key, contextualize_statement_func
)

self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement))
self.__rdf_graph.remove((reified_subject, RDF.subject, s))
self.__rdf_graph.remove((reified_subject, RDF.predicate, p))
self.__rdf_graph.remove((reified_subject, RDF.object, o))
self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement))
self.__rdf_graph.remove((reified_subject, RDF.subject, s))
self.__rdf_graph.remove((reified_subject, RDF.predicate, p))
self.__rdf_graph.remove((reified_subject, RDF.object, o))

def __get_subgraph_str(self, possible_sg: Optional[List[Any]]) -> str:
"""RDF -> ArangoDB: Extract the sub-graph URIRef string of a quad (if any).
Expand Down Expand Up @@ -3009,7 +3038,7 @@ def __insert_adb_docs(
for col in adb_cols:
doc_list = self.__adb_docs[col].values()

action = f"ADB Import: '{col}' ({len(doc_list)})"
action = f"(RDF → ADB): Import '{col}' ({len(doc_list)})"
spinner_progress_task = spinner_progress.add_task("", action=action)

if not self.db.has_collection(col):
Expand Down
2 changes: 1 addition & 1 deletion arango_rdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def empty_func(*args: Any, **kwargs: Any) -> None:
pass


def get_export_spinner_progress(text: str) -> Progress:
def get_spinner_progress(text: str) -> Progress:
return Progress(
TextColumn(text),
SpinnerColumn("aesthetic", "#5BC0DE"),
Expand Down

0 comments on commit d28049d

Please sign in to comment.