Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of NeptuneAnalyticsGraph #311

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions libs/aws/langchain_aws/graphs/neptune_graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,50 @@
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union

logger = logging.getLogger(__name__)


def _format_triples(triples: List[dict]) -> List[str]:
triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)"
triple_schema = []
for t in triples:
triple = triple_template.format(
a=t["~from"], e=t["~type"], b=t["~to"]
)
triple_schema.append(triple)

return triple_schema


def _format_node_properties(n_labels: dict) -> List:
node_properties = []

for label, props_item in n_labels.items():
props = props_item["properties"]
np = {
"properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()],
"labels": label,
}
node_properties.append(np)

return node_properties


def _format_edge_properties(e_labels: dict) -> List:
edge_properties = []

for label, props_item in e_labels.items():
props = props_item["properties"]
np = {
"type": label,
"properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()],
}
edge_properties.append(np)

return edge_properties


class NeptuneQueryException(Exception):
"""Exception for the Neptune queries."""
Expand Down Expand Up @@ -171,9 +214,12 @@ def __init__(
client: Any = None,
credentials_profile_name: Optional[str] = None,
region_name: Optional[str] = None,
use_schema_algorithm: bool = True
) -> None:
"""Create a new Neptune Analytics graph wrapper instance."""

self.use_schema_algorithm = use_schema_algorithm

try:
if client is not None:
self.client = client
Expand Down Expand Up @@ -266,6 +312,38 @@ def _get_summary(self) -> Dict:
else:
return summary

def _refresh_schema(self) -> None:
"""
Refreshes the Neptune graph schema information.
"""
pg_schema_query = """
CALL neptune.graph.pg_schema()
YIELD schema
RETURN schema
"""
if self.use_schema_algorithm:
try:
data = self.query(pg_schema_query)
raw_schema = data[0]["schema"]
triple_schema = _format_triples(raw_schema["labelTriples"])
node_properties = _format_node_properties(raw_schema["nodeLabelDetails"])
edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"])
self.schema = f"""
Node properties are the following:
{node_properties}
Relationship properties are the following:
{edge_properties}
The relationships are the following:
{triple_schema}
"""
except Exception as e:
logger.info("pg_schema algorithm is unsupported on this Neptune version. "
"Falling back to manual graph schema creation.")
logger.debug(e)
super()._refresh_schema()
else:
super()._refresh_schema()


class NeptuneGraph(BaseNeptuneGraph):
"""Neptune wrapper for graph operations.
Expand Down
Loading