From 83269d0a8dba5f38398cfdc602d568e0c5c729d9 Mon Sep 17 00:00:00 2001 From: Michael Chin Date: Wed, 18 Dec 2024 20:54:31 -0800 Subject: [PATCH 1/2] Improve performance of NeptuneAnalyticsGraph --- .../aws/langchain_aws/graphs/neptune_graph.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/libs/aws/langchain_aws/graphs/neptune_graph.py b/libs/aws/langchain_aws/graphs/neptune_graph.py index f7566ecf..5d4ff928 100644 --- a/libs/aws/langchain_aws/graphs/neptune_graph.py +++ b/libs/aws/langchain_aws/graphs/neptune_graph.py @@ -1,7 +1,50 @@ import json +import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple, Union +logger = logging.getLogger(__name__) + + +def _format_triples(triples: List[dict]) -> List[str]: + triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" + triple_schema = [] + for t in triples: + triple = triple_template.format( + a=t["~from"], e=t["~type"], b=t["~to"] + ) + triple_schema.append(triple) + + return triple_schema + + +def _format_node_properties(n_labels: dict) -> List: + node_properties = [] + + for label, props_item in n_labels.items(): + props = props_item["properties"] + np = { + "properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()], + "labels": label, + } + node_properties.append(np) + + return node_properties + + +def _format_edge_properties(e_labels: dict) -> List: + edge_properties = [] + + for label, props_item in e_labels.items(): + props = props_item["properties"] + np = { + "type": label, + "properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()], + } + edge_properties.append(np) + + return edge_properties + class NeptuneQueryException(Exception): """Exception for the Neptune queries.""" @@ -171,9 +214,12 @@ def __init__( client: Any = None, credentials_profile_name: Optional[str] = None, region_name: Optional[str] = None, + use_schema_algorithm: bool = True ) -> None: """Create a new Neptune Analytics graph wrapper instance.""" + self.use_schema_algorithm = use_schema_algorithm + try: if client is not None: self.client = client @@ -266,6 +312,38 @@ def _get_summary(self) -> Dict: else: return summary + def _refresh_schema(self) -> None: + """ + Refreshes the Neptune graph schema information. + """ + pg_schema_query = """ + CALL neptune.graph.pg_schema() + YIELD schema + RETURN schema + """ + if self.use_schema_algorithm: + try: + data = self.query(pg_schema_query) + raw_schema = data[0]["schema"] + triple_schema = _format_triples(raw_schema["labelTriples"]) + node_properties = _format_node_properties(raw_schema["nodeLabelDetails"]) + edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"]) + self.schema = f""" + Node properties are the following: + {node_properties} + Relationship properties are the following: + {edge_properties} + The relationships are the following: + {triple_schema} + """ + except Exception as e: + logger.info("pg_schema algorithm is unsupported on this Neptune version. " + "Falling back to manual graph schema creation.") + logger.debug(e) + super()._refresh_schema() + else: + super()._refresh_schema() + class NeptuneGraph(BaseNeptuneGraph): """Neptune wrapper for graph operations. From b974823b124e7a60ac1c1ef17f3d399119c4eb5b Mon Sep 17 00:00:00 2001 From: Michael Chin Date: Thu, 19 Dec 2024 15:13:46 -0800 Subject: [PATCH 2/2] Remove use_schema_algorithm param and paths using old schema method --- .../aws/langchain_aws/graphs/neptune_graph.py | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/libs/aws/langchain_aws/graphs/neptune_graph.py b/libs/aws/langchain_aws/graphs/neptune_graph.py index 5d4ff928..16623879 100644 --- a/libs/aws/langchain_aws/graphs/neptune_graph.py +++ b/libs/aws/langchain_aws/graphs/neptune_graph.py @@ -1,10 +1,7 @@ import json -import logging from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple, Union -logger = logging.getLogger(__name__) - def _format_triples(triples: List[dict]) -> List[str]: triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" @@ -213,13 +210,10 @@ def __init__( graph_identifier: str, client: Any = None, credentials_profile_name: Optional[str] = None, - region_name: Optional[str] = None, - use_schema_algorithm: bool = True + region_name: Optional[str] = None ) -> None: """Create a new Neptune Analytics graph wrapper instance.""" - self.use_schema_algorithm = use_schema_algorithm - try: if client is not None: self.client = client @@ -321,29 +315,21 @@ def _refresh_schema(self) -> None: YIELD schema RETURN schema """ - if self.use_schema_algorithm: - try: - data = self.query(pg_schema_query) - raw_schema = data[0]["schema"] - triple_schema = _format_triples(raw_schema["labelTriples"]) - node_properties = _format_node_properties(raw_schema["nodeLabelDetails"]) - edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"]) - self.schema = f""" - Node properties are the following: - {node_properties} - Relationship properties are the following: - {edge_properties} - The relationships are the following: - {triple_schema} - """ - except Exception as e: - logger.info("pg_schema algorithm is unsupported on this Neptune version. " - "Falling back to manual graph schema creation.") - logger.debug(e) - super()._refresh_schema() - else: - super()._refresh_schema() + data = self.query(pg_schema_query) + raw_schema = data[0]["schema"] + triple_schema = _format_triples(raw_schema["labelTriples"]) + node_properties = _format_node_properties(raw_schema["nodeLabelDetails"]) + edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"]) + + self.schema = f""" + Node properties are the following: + {node_properties} + Relationship properties are the following: + {edge_properties} + The relationships are the following: + {triple_schema} + """ class NeptuneGraph(BaseNeptuneGraph): """Neptune wrapper for graph operations.