Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of NeptuneAnalyticsGraph #311

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion libs/aws/langchain_aws/graphs/neptune_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,46 @@
from typing import Any, Dict, List, Optional, Tuple, Union


def _format_triples(triples: List[dict]) -> List[str]:
triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)"
triple_schema = []
for t in triples:
triple = triple_template.format(
a=t["~from"], e=t["~type"], b=t["~to"]
)
triple_schema.append(triple)

return triple_schema


def _format_node_properties(n_labels: dict) -> List:
node_properties = []

for label, props_item in n_labels.items():
props = props_item["properties"]
np = {
"properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()],
"labels": label,
}
node_properties.append(np)

return node_properties


def _format_edge_properties(e_labels: dict) -> List:
edge_properties = []

for label, props_item in e_labels.items():
props = props_item["properties"]
np = {
"type": label,
"properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()],
}
edge_properties.append(np)

return edge_properties


class NeptuneQueryException(Exception):
"""Exception for the Neptune queries."""

Expand Down Expand Up @@ -170,7 +210,7 @@ def __init__(
graph_identifier: str,
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
region_name: Optional[str] = None
) -> None:
"""Create a new Neptune Analytics graph wrapper instance."""

Expand Down Expand Up @@ -266,6 +306,30 @@ def _get_summary(self) -> Dict:
else:
return summary

def _refresh_schema(self) -> None:
"""
Refreshes the Neptune graph schema information.
"""
pg_schema_query = """
CALL neptune.graph.pg_schema()
YIELD schema
RETURN schema
"""

data = self.query(pg_schema_query)
raw_schema = data[0]["schema"]
triple_schema = _format_triples(raw_schema["labelTriples"])
node_properties = _format_node_properties(raw_schema["nodeLabelDetails"])
edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"])

self.schema = f"""
Node properties are the following:
{node_properties}
Relationship properties are the following:
{edge_properties}
The relationships are the following:
{triple_schema}
"""

class NeptuneGraph(BaseNeptuneGraph):
"""Neptune wrapper for graph operations.
Expand Down
Loading