From d28049d34be4971587fe37688a7e9169d79ad5d6 Mon Sep 17 00:00:00 2001 From: Anthony Mahanna Date: Fri, 19 Jan 2024 16:47:56 -0500 Subject: [PATCH] cleanup: progress/spinner bars via `rich` --- arango_rdf/main.py | 227 +++++++++++++++++++++++++------------------- arango_rdf/utils.py | 2 +- 2 files changed, 129 insertions(+), 100 deletions(-) diff --git a/arango_rdf/main.py b/arango_rdf/main.py index a950bd43..3f2c03e5 100644 --- a/arango_rdf/main.py +++ b/arango_rdf/main.py @@ -52,8 +52,8 @@ Tree, empty_func, get_bar_progress, - get_export_spinner_progress, get_import_spinner_progress, + get_spinner_progress, logger, ) @@ -603,6 +603,26 @@ def rdf_to_arangodb_by_rpt( if overwrite_graph: self.db.delete_graph(name, ignore_missing=True, drop_collections=True) + # NOTE: Graph Contextualization is an experimental work-in-progress + contextualize_statement_func = empty_func + if contextualize_graph: + contextualize_statement_func = self.__rpt_contextualize_statement + + self.__rdf_graph = self.__load_meta_ontology(self.__rdf_graph) + + with get_spinner_progress("(RDF → ADB): Graph Contextualization") as rp: + rp.add_task("graph contextualization") + + self.__explicit_type_map = self.__build_explicit_type_map() + self.__subclass_tree = self.__build_subclass_tree() + self.__predicate_scope = self.__build_predicate_scope() + self.__domain_range_map = self.__build_domain_range_map() + self.__type_map = self.__combine_type_map_and_dr_map() + + self.__reified_subject_predicate_map = {} + if flatten_reified_triples: + self.__flatten_reified_triples(contextualize_statement_func, is_pgt=False) + rdf_graph_size = len(rdf_graph) batch_size = batch_size or rdf_graph_size @@ -616,25 +636,10 @@ def rdf_to_arangodb_by_rpt( else rdf_graph.triples ) - bar_progress = get_bar_progress("(RDF → ADB): RPT", "#97C423") + bar_progress = get_bar_progress("(RDF → ADB): RPT", "#BF23C4") bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size) spinner_progress = get_import_spinner_progress(" ") - # NOTE: Graph Contextualization is an experimental work-in-progress - contextualize_statement_func = empty_func - if contextualize_graph: - contextualize_statement_func = self.__rpt_contextualize_statement - self.__rdf_graph = self.__load_meta_ontology(self.__rdf_graph) - self.__explicit_type_map = self.__build_explicit_type_map() - self.__subclass_tree = self.__build_subclass_tree() - self.__predicate_scope = self.__build_predicate_scope() - self.__domain_range_map = self.__build_domain_range_map() - self.__type_map = self.__combine_type_map_and_dr_map() - - self.__reified_subject_predicate_map = {} - if flatten_reified_triples: - self.__flatten_reified_triples(contextualize_statement_func, is_pgt=False) - with Live(Group(bar_progress, spinner_progress)): for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1): bar_progress.advance(bar_progress_task) @@ -886,23 +891,6 @@ def rdf_to_arangodb_by_pgt( # Builds the ArangoDB Edge Definitions of the (soon to be) ArangoDB Graph self.__e_col_map = defaultdict(lambda: defaultdict(set)) - rdf_graph_size = len(rdf_graph) - batch_size = batch_size or rdf_graph_size - - s: RDFTerm # Subject - p: URIRef # Predicate - o: RDFTerm # Object - - statements = ( - self.__rdf_graph.quads - if isinstance(self.__rdf_graph, RDFConjunctiveGraph) - else self.__rdf_graph.triples - ) - - bar_progress = get_bar_progress("(RDF → ADB): PGT", "#08479E") - bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size) - spinner_progress = get_import_spinner_progress(" ") - self.__pgt_remove_blacklisted_statements() # NOTE: Graph Contextualization is an experimental work-in-progress @@ -932,11 +920,28 @@ def rdf_to_arangodb_by_pgt( if flatten_reified_triples: self.__flatten_reified_triples(contextualize_statement_func, is_pgt=True) + s: RDFTerm # Subject + p: URIRef # Predicate + o: RDFTerm # Object + + statements = ( + self.__rdf_graph.quads + if isinstance(self.__rdf_graph, RDFConjunctiveGraph) + else self.__rdf_graph.triples + ) + # TODO: # self.__pgt_parse_rdf_lists() self.__pgt_parse_literal_statements(statements, contextualize_statement_func) + rdf_graph_size = len(rdf_graph) + batch_size = batch_size or rdf_graph_size + + bar_progress = get_bar_progress("(RDF → ADB): PGT", "#08479E") + bar_progress_task = bar_progress.add_task(name, total=rdf_graph_size) + spinner_progress = get_import_spinner_progress(" ") + with Live(Group(bar_progress, spinner_progress)): for i, (s, p, o, *sg) in enumerate(statements((None, None, None)), 1): bar_progress.advance(bar_progress_task) @@ -962,7 +967,7 @@ def rdf_to_arangodb_by_pgt( self.__insert_adb_docs(spinner_progress, use_async, **adb_import_kwargs) - bar_progress = get_bar_progress("(RDF → ADB): PGT List Processing ", "#EF7D00") + bar_progress = get_bar_progress("(RDF → ADB): PGT [RDF Lists]", "#EF7D00") with Live(Group(bar_progress, spinner_progress)): self.__pgt_process_rdf_lists(bar_progress) self.__insert_adb_docs(spinner_progress, use_async) @@ -1049,38 +1054,48 @@ def write_adb_col_statements( self.__rdf_graph = rdf_graph self.__cntrl.rdf_graph = rdf_graph - # 1. RDF.type statements - self.__explicit_type_map = self.__build_explicit_type_map( - self.__add_adb_col_statement - ) + with get_spinner_progress("(RDF → ADB): Write Col Statements") as rp: + rp.add_task("adb:collection") - # 2. RDF.subClassOf Statements - self.__subclass_tree = self.__build_subclass_tree(self.__add_adb_col_statement) + # 1. RDF.type statements + self.__explicit_type_map = self.__build_explicit_type_map( + self.__add_adb_col_statement + ) - # 3. Domain & Range Statements - self.__predicate_scope = self.__build_predicate_scope( - self.__add_adb_col_statement - ) - self.__domain_range_map = self.__build_domain_range_map() + # 2. RDF.subClassOf Statements + self.__subclass_tree = self.__build_subclass_tree( + self.__add_adb_col_statement + ) - # 4. (Optional) Create the type map for Graph Contextualization - if contextualize_graph: - self.__type_map = self.__combine_type_map_and_dr_map() - - # 5. Finalize **adb_col_statements** - for rdf_map in [self.__explicit_type_map, self.__domain_range_map]: - for rdf_resource, class_set in rdf_map.items(): - has_mapping = (rdf_resource, None, None) in self.__adb_col_statements - if has_mapping or len(class_set) == 0: - continue # pragma: no cover # (false negative) - - adb_col = self.rdf_id_to_adb_label( - self.__cntrl.identify_best_class( - rdf_resource, class_set, self.__subclass_tree + # 3. Domain & Range Statements + self.__predicate_scope = self.__build_predicate_scope( + self.__add_adb_col_statement + ) + + self.__domain_range_map = self.__build_domain_range_map() + + # 4. (Optional) Create the type map for Graph Contextualization + if contextualize_graph: + self.__type_map = self.__combine_type_map_and_dr_map() + + # 5. Finalize **adb_col_statements** + for rdf_map in [self.__explicit_type_map, self.__domain_range_map]: + for rdf_resource, class_set in rdf_map.items(): + has_mapping = ( + rdf_resource, + None, + None, + ) in self.__adb_col_statements + if has_mapping or len(class_set) == 0: + continue # pragma: no cover # (false negative) + + adb_col = self.rdf_id_to_adb_label( + self.__cntrl.identify_best_class( + rdf_resource, class_set, self.__subclass_tree + ) ) - ) - self.__add_adb_col_statement(rdf_resource, adb_col) + self.__add_adb_col_statement(rdf_resource, adb_col) self.__adb_col_statements.remove( (self.adb_col_uri, self.adb_col_uri, Literal("Property")) @@ -1204,7 +1219,7 @@ def __fetch_adb_docs( col_size: int = self.__db.collection(col).count() - with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p: + with get_spinner_progress(f"(ADB → RDF): Export '{col}' ({col_size})") as p: p.add_task(col) cursor: Cursor = self.__db.aql.execute( @@ -1892,15 +1907,6 @@ def __pgt_parse_literal_statements( is disabled. :type pgt_contextualize_statement_func: Callable[..., None] """ - # query = """ - # SELECT ?subject ?predicate - # WHERE { - # ?subject ?predicate ?object . - # FILTER isLiteral(?object) - # } - # GROUP BY ?subject ?predicate - # """ - # TODO: Revisit FILTER clauses # We rely on the FILTER clauses to make sure no literal # statements belonging to RDF Lists are processed, @@ -1920,22 +1926,31 @@ def __pgt_parse_literal_statements( GROUP BY ?subject ?predicate """ - for s, p in self.__rdf_graph.query(query): - s_meta = self.__pgt_get_term_metadata(s) - self.__pgt_process_rdf_term(s_meta) + data = self.__rdf_graph.query(query) + + m = "(RDF → ADB): PGT [RDF Literals]" + bar_progress = get_bar_progress(m, "#EF7D00") + bar_progress_task = bar_progress.add_task(m, total=len(data) - 1) - p_meta = self.__pgt_get_term_metadata(p) - self.__pgt_process_rdf_term(p_meta) + with Live(Group(bar_progress)): + for s, p in self.__rdf_graph.query(query): + bar_progress.advance(bar_progress_task) - for _, _, o, *sg in statements((s, p, None)): - sg_str = self.__get_subgraph_str(sg) + s_meta = self.__pgt_get_term_metadata(s) + self.__pgt_process_rdf_term(s_meta) - o_meta = self.__pgt_get_term_metadata(o) - self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) + p_meta = self.__pgt_get_term_metadata(p) + self.__pgt_process_rdf_term(p_meta) - pgt_contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) + for _, _, o, *sg in statements((s, p, None)): + sg_str = self.__get_subgraph_str(sg) - self.__rdf_graph.remove((s, p, o)) + o_meta = self.__pgt_get_term_metadata(o) + self.__pgt_process_object(s_meta, p_meta, o_meta, sg_str) + + pgt_contextualize_statement_func(s_meta, p_meta, o_meta, sg_str) + + self.__rdf_graph.remove((s, p, o)) def __pgt_process_subject_predicate_object( self, @@ -2574,10 +2589,6 @@ def __flatten_reified_triples( data = self.__rdf_graph.query(query) - self.__reified_subject_predicate_map = { - reified_subject: p for reified_subject, _, p, *_ in data - } - reified_subject: RDFTerm s: RDFTerm p: URIRef @@ -2589,19 +2600,37 @@ def __flatten_reified_triples( else self.__rpt_process_subject_predicate_object ) - for reified_subject, s, p, o, *sg in data: - reified_triple_key = self.rdf_id_to_adb_key( - str(reified_subject), reified_subject - ) + self.__reified_subject_predicate_map = {} - process_subject_predicate_object( - s, p, o, sg, reified_triple_key, contextualize_statement_func - ) + m = "(RDF → ADB): Flatten Reified Triples [Prep]" + bar_progress = get_bar_progress(m, "#FFFFFF") + bar_progress_task = bar_progress.add_task(m, total=len(data)) + + with Live(Group(bar_progress)): + for reified_subject, _, p, *_ in data: + bar_progress.advance(bar_progress_task) + self.__reified_subject_predicate_map[reified_subject] = p + + m = "(RDF → ADB): Flatten Reified Triples" + bar_progress = get_bar_progress(m, "#FFFFFF") + bar_progress_task = bar_progress.add_task(m, total=len(data)) + + with Live(Group(bar_progress)): + for reified_subject, s, p, o, *sg in data: + bar_progress.advance(bar_progress_task) + + reified_triple_key = self.rdf_id_to_adb_key( + str(reified_subject), reified_subject + ) + + process_subject_predicate_object( + s, p, o, sg, reified_triple_key, contextualize_statement_func + ) - self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement)) - self.__rdf_graph.remove((reified_subject, RDF.subject, s)) - self.__rdf_graph.remove((reified_subject, RDF.predicate, p)) - self.__rdf_graph.remove((reified_subject, RDF.object, o)) + self.__rdf_graph.remove((reified_subject, RDF.type, RDF.Statement)) + self.__rdf_graph.remove((reified_subject, RDF.subject, s)) + self.__rdf_graph.remove((reified_subject, RDF.predicate, p)) + self.__rdf_graph.remove((reified_subject, RDF.object, o)) def __get_subgraph_str(self, possible_sg: Optional[List[Any]]) -> str: """RDF -> ArangoDB: Extract the sub-graph URIRef string of a quad (if any). @@ -3009,7 +3038,7 @@ def __insert_adb_docs( for col in adb_cols: doc_list = self.__adb_docs[col].values() - action = f"ADB Import: '{col}' ({len(doc_list)})" + action = f"(RDF → ADB): Import '{col}' ({len(doc_list)})" spinner_progress_task = spinner_progress.add_task("", action=action) if not self.db.has_collection(col): diff --git a/arango_rdf/utils.py b/arango_rdf/utils.py index e117c15b..324516ae 100644 --- a/arango_rdf/utils.py +++ b/arango_rdf/utils.py @@ -25,7 +25,7 @@ def empty_func(*args: Any, **kwargs: Any) -> None: pass -def get_export_spinner_progress(text: str) -> Progress: +def get_spinner_progress(text: str) -> Progress: return Progress( TextColumn(text), SpinnerColumn("aesthetic", "#5BC0DE"),