Skip to content

Commit

Permalink
feat(coordinator): Add cypher endpoint info when `list_service_status…
Browse files Browse the repository at this point in the history
…` for groot (#4382)

- Add cypher_endpoint for GrootClient.
- Rename `gremlin_interface` to `groot_endpoints`.
  • Loading branch information
zhanglei1949 authored Jan 2, 2025
1 parent 0c3d19e commit c020046
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 19 deletions.
2 changes: 2 additions & 0 deletions charts/graphscope-store/templates/portal/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ spec:
value: {{ .Values.frontend.service.servicePort | quote }}
- name: GROOT_GREMLIN_PORT
value: {{ .Values.frontend.service.gremlinPort | quote }}
- name: GROOT_CYPHER_PORT
value: {{ .Values.frontend.service.cypherPort | quote }}
- name: INSTANCE_NAME
value: {{ .Release.Name | quote }}
- name: NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ frontend:

gremlinPort: 12312

cypherPort: 7687

## Internal port for communication between components.
##
port: 55555
Expand Down
1 change: 1 addition & 0 deletions coordinator/gscoordinator/flex/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def str_to_bool(s):
# groot
GROOT_GRPC_PORT = os.environ.get("GROOT_GRPC_PORT", 55556)
GROOT_GREMLIN_PORT = os.environ.get("GROOT_GREMLIN_PORT", 12312)
GROOT_CYPHER_PORT = os.environ.get("GROOT_CYPHER_PORT", 7687)
GROOT_USERNAME = os.environ.get("GROOT_USERNAME", "")
GROOT_PASSWORD = os.environ.get("GROOT_PASSWORD", "")
try:
Expand Down
27 changes: 20 additions & 7 deletions coordinator/gscoordinator/flex/core/insight/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

from gscoordinator.flex.core.config import CLUSTER_TYPE
from gscoordinator.flex.core.config import CREATION_TIME
from gscoordinator.flex.core.config import GROOT_CYPHER_PORT
from gscoordinator.flex.core.config import GROOT_GREMLIN_PORT
from gscoordinator.flex.core.config import GROOT_GRPC_PORT
from gscoordinator.flex.core.config import GROOT_PASSWORD
from gscoordinator.flex.core.config import GROOT_USERNAME
from gscoordinator.flex.core.config import INSTANCE_NAME
from gscoordinator.flex.core.config import NAMESPACE
from gscoordinator.flex.core.insight.utils import test_cypher_endpoint
from gscoordinator.flex.core.scheduler import schedule
from gscoordinator.flex.core.utils import data_type_to_groot
from gscoordinator.flex.core.utils import get_internal_ip
Expand All @@ -46,7 +48,7 @@
class GrootGraph(object):
"""Graph class for GraphScope store"""

def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint):
def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint, cypher_endpoint = None):
self._id = "1"
self._name = name

Expand All @@ -60,12 +62,14 @@ def __init__(self, name, creation_time, gremlin_endpoint, grpc_endpoint):
)
self._g = self._conn.g()
self._schema = self._g.schema().to_dict()
self._gremlin_interface = {
self._endpoints = {
"gremlin_endpoint": gremlin_endpoint,
"grpc_endpoint": grpc_endpoint,
"username": GROOT_USERNAME,
"password": GROOT_PASSWORD,
}
self._endpoints["cypher_endpoint"] = cypher_endpoint

# kubernetes
if CLUSTER_TYPE == "KUBERNETES":
self._api_client = resolve_api_client()
Expand Down Expand Up @@ -97,22 +101,27 @@ def _fetch_endpoints_impl(self):
grpc_endpoint, gremlin_endpoint, GROOT_USERNAME, GROOT_PASSWORD
)
g = conn.g()
cypher_endpoint = test_cypher_endpoint(pod.status.pod_ip, GROOT_CYPHER_PORT)

except Exception as e:
logger.warn(f"Failed to fetch frontend endpoints: {str(e)}")
else:
if (
gremlin_endpoint != self._gremlin_interface["gremlin_endpoint"]
or grpc_endpoint != self._gremlin_interface["grpc_endpoint"]
gremlin_endpoint != self._endpoints["gremlin_endpoint"]
or grpc_endpoint != self._endpoints["grpc_endpoint"]
or cypher_endpoint != self._endpoints.get("cypher_endpoint")
):
self._conn = conn
self._g = g
self._schema = self._g.schema().to_dict()
self._gremlin_interface = {
self._endpoints = {
"gremlin_endpoint": gremlin_endpoint,
"grpc_endpoint": grpc_endpoint,
"username": GROOT_USERNAME,
"password": GROOT_PASSWORD,
}
if cypher_endpoint:
self._endpoints["cypher_endpoint"] = cypher_endpoint
logger.info(f"Update frontend endpoints: {str(endpoints)}")

def __del__(self):
Expand All @@ -131,8 +140,8 @@ def name(self):
return self._name

@property
def gremlin_interface(self):
return self._gremlin_interface
def groot_endpoints(self):
return self._endpoints

@property
def schema(self):
Expand Down Expand Up @@ -284,12 +293,16 @@ def get_groot_graph_from_local():
client.close()
break
time.sleep(5)
# test whether cypher endpoint is ready
cypher_endpoint = test_cypher_endpoint(host, GROOT_CYPHER_PORT)

# groot graph
return GrootGraph(
name=INSTANCE_NAME,
creation_time=CREATION_TIME,
gremlin_endpoint=gremlin_endpoint,
grpc_endpoint=grpc_endpoint,
cypher_endpoint=cypher_endpoint,
)


Expand Down
19 changes: 11 additions & 8 deletions coordinator/gscoordinator/flex/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,21 @@ def check_graph_exists(self, graph_id: str):
raise RuntimeError(f"Graph {graph_id} not exist.")

def list_service_status(self) -> List[dict]:
gremlin_interface = self._graph.gremlin_interface
return [
groot_endpoints = self._graph.groot_endpoints
res = [
{
"graph_id": self._graph.id,
"status": "Running",
"start_time": CREATION_TIME,
"sdk_endpoints": {
"gremlin": gremlin_interface["gremlin_endpoint"],
"grpc": gremlin_interface["grpc_endpoint"],
"gremlin": groot_endpoints["gremlin_endpoint"],
"grpc": groot_endpoints["grpc_endpoint"],
},
}
]
if "cypher_endpoint" in groot_endpoints and groot_endpoints["cypher_endpoint"]:
res[0]["sdk_endpoints"]["cypher"] = groot_endpoints["cypher_endpoint"]
return res

def create_graph(self, graph: dict) -> dict:
raise RuntimeError("Create graph is not supported yet.")
Expand Down Expand Up @@ -284,12 +287,12 @@ def get_storage_usage(self) -> dict:

def gremlin_service_available(self) -> bool:
try:
gremlin_interface = self._graph.gremlin_interface
groot_endpoints = self._graph.groot_endpoints
client = Client(
gremlin_interface["gremlin_endpoint"],
groot_endpoints["gremlin_endpoint"],
"g",
username=gremlin_interface["username"],
password=gremlin_interface["password"],
username=groot_endpoints["username"],
password=groot_endpoints["password"],
)
client.submit(
"g.with('evaluationTimeout', 5000).V().limit(1)"
Expand Down
40 changes: 36 additions & 4 deletions coordinator/gscoordinator/flex/core/insight/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

import hashlib
import json
import logging

from urllib3.exceptions import ProtocolError

from gscoordinator.flex.core.config import BASEID
from gscoordinator.version import __version__

logger = logging.getLogger("graphscope")


def convert_to_configini(graph, ds_manager, config):
# for bulk loader to connect to groot
gremlin_interface = graph.gremlin_interface
groot_endpoints = graph.groot_endpoints
# column mapping config
column_mapping_config = {}
# project
Expand Down Expand Up @@ -112,12 +117,12 @@ def convert_to_configini(graph, ds_manager, config):
# custom_config
custom_config = {
"separatr": "\\\\|", # fixed
"graphEndpoint": gremlin_interface["grpc_endpoint"],
"graphEndpoint": groot_endpoints["grpc_endpoint"],
"project": project,
"outputTable": output_table,
"columnMappingConfig": json.dumps(column_mapping_config),
"authUsername": gremlin_interface["username"],
"authPassword": gremlin_interface["password"],
"authUsername": groot_endpoints["username"],
"authPassword": groot_endpoints["password"],
"dataSinkType": "volume",
"odpsVolumeProject": project,
# "-" is not allowed
Expand All @@ -136,3 +141,30 @@ def convert_to_configini(graph, ds_manager, config):
"customConfig": custom_config,
}
return configini

def test_cypher_endpoint(host : str, port : int):
"""
Test if the cypher endpoint is available, if not return None, otherwise return the cypher endpoint
Note that we send http request to check if the cypher endpoint is available, not submitting a cypher query,
the reason is that the cypher query may raise exceptions in case of other errors.
"""
cypher_endpoint = f"neo4j://{host}:{port}"
try:
import requests
response = requests.get(f"http://{host}:{port}")
response.raise_for_status()
except (requests.exceptions.ConnectionError) as e:
if (e.args != None and len(e.args) > 0):
# Sending http request to cypher endpoint should fail with ProtocolError
if isinstance(e.args[0], ProtocolError):
logger.debug("Cypher endpoint is available: {cypher_endpoint}")
else:
cypher_endpoint = None
logger.debug(f"Cypher endpoint is not available: {str(e)}")
except Exception as e:
logger.debug(f"Cypher endpoint is not available: {str(e)}")
cypher_endpoint = None
return cypher_endpoint
else:
logger.error("Should not reach here")
return cypher_endpoint

0 comments on commit c020046

Please sign in to comment.