Skip to content

Commit

Permalink
revise coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Oct 22, 2024
1 parent 7533a49 commit a986bb6
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 20 deletions.
4 changes: 4 additions & 0 deletions coordinator/flex/server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import connexion

from flex.server import encoder
from datetime import datetime


def main():
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open("/tmp/cluster_create_time.txt", "w") as f:
f.write(current_timestamp)
app = connexion.App(__name__, specification_dir='./openapi/')
app.app.json_encoder = encoder.JSONEncoder
app.add_api('openapi.yaml',
Expand Down
3 changes: 3 additions & 0 deletions coordinator/flex/server/controllers/data_source_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def get_datasource_by_id(graph_id): # noqa: E501
except Exception as e:
return "Failed to get data source: " + str(e), 500

if data_source_config is None:
return (SchemaMapping.from_dict({}), 200)

data_source_config = json.loads(data_source_config.decode("utf-8"))

return (SchemaMapping.from_dict(data_source_config), 200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_deployment_info(): # noqa: E501
"""
result_dict = {}
result_dict["cluster_type"] = "KUBERNETES"
with open ("/tmp/graph_schema_create_time.txt", "r") as f:
with open ("/tmp/cluster_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["instance_name"] = "gart"
result_dict["frontend"] = "Cypher/Gremlin"
Expand Down
64 changes: 48 additions & 16 deletions coordinator/flex/server/controllers/graph_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,30 @@ def get_graph_schema():
rg_mapping_str = rg_mapping_str.decode("utf-8")
break
try_times += 1
time.sleep(1)
time.sleep(0.2)
except Exception as e:
try_times += 1
time.sleep(1)
time.sleep(0.2)

if try_times == try_max_times:
try_times = 0
original_graph_schema_key = etcd_prefix + "gart_graph_schema_json"
while try_times < try_max_times:
try:
original_graph_schema_str, _ = etcd_client.get(original_graph_schema_key)
if original_graph_schema_str is not None:
original_graph_schema_str = original_graph_schema_str.decode("utf-8")
break
try_times += 1
time.sleep(0.2)
except Exception as e:
try_times += 1
time.sleep(0.2)
if try_times == try_max_times:
return result_dict
result_dict["name"] = GRAPH_ID
result_dict["id"] = GRAPH_ID
result_dict["schema"] = json.loads(original_graph_schema_str)["schema"]
return result_dict

rg_mapping = yaml.load(rg_mapping_str, Loader=yaml.SafeLoader)
Expand All @@ -83,10 +101,10 @@ def get_graph_schema():
table_schema_str = table_schema_str.decode("utf-8")
break
try_times += 1
time.sleep(1)
time.sleep(0.2)
except Exception as e:
try_times += 1
time.sleep(1)
time.sleep(0.2)

if try_times == try_max_times:
return result_dict
Expand Down Expand Up @@ -463,12 +481,19 @@ def get_graph_by_id(graph_id): # noqa: E501
result_dict["id"] = graph_id
result_dict["name"] = graph_id

with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]

with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()
try:
with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]
except:
result_dict["creation_time"] = ""
result_dict["schema_update_time"] = ""

try:
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()
except:
result_dict["data_update_time"] = ""

return (GetGraphResponse.from_dict(result_dict), 200)

Expand Down Expand Up @@ -553,12 +578,19 @@ def list_graphs(): # noqa: E501
if not result_dict:
return ([GetGraphResponse.from_dict({})], 200)

with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]

with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()
try:
with open("/tmp/graph_schema_create_time.txt", "r") as f:
result_dict["creation_time"] = f.read()
result_dict["schema_update_time"] = result_dict["creation_time"]
except:
result_dict["creation_time"] = ""
result_dict["schema_update_time"] = ""

try:
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
result_dict["data_update_time"] = f.read()
except:
result_dict["data_update_time"] = ""

return ([GetGraphResponse.from_dict(result_dict)], 200)

Expand Down
7 changes: 5 additions & 2 deletions coordinator/flex/server/controllers/service_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ def list_service_status(): # noqa: E501
gremlin_service_name = os.getenv('GREMLIN_SERVICE_NAME', 'gremlin-service')
gremlin_service_port = os.getenv('GIE_GREMLIN_PORT', '8182')
gremlin_service_ip = get_external_ip_of_a_service(gremlin_service_name, k8s_namespace)
with open("/tmp/graph_id.txt", "r") as f:
graph_id = f.read()
try:
with open("/tmp/graph_id.txt", "r") as f:
graph_id = f.read()
except:
graph_id = "gart_graph"
result_dict["graph_id"] = graph_id
result_dict["status"] = "Running"
result_dict["sdk_endpoints"] = {}
Expand Down
4 changes: 3 additions & 1 deletion scripts/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def get_all_available_read_epochs_internal():
num_fragment = os.getenv("SUBGRAPH_NUM", "1")
latest_read_epoch = get_latest_read_epoch()
if latest_read_epoch == 2**64 - 1:
return []
return [[], []]
available_epochs = []
available_epochs_internal = []
for epoch in range(latest_read_epoch + 1):
Expand Down Expand Up @@ -659,6 +659,8 @@ def get_latest_read_epoch():
etcd_key = etcd_prefix + "gart_latest_epoch_p" + str(idx)
try:
etcd_value, _ = etcd_client.get(etcd_key)
if etcd_value is None:
etcd_value = 2**64 - 1
if latest_epoch > int(etcd_value):
latest_epoch = int(etcd_value)
except Exception as e:
Expand Down

0 comments on commit a986bb6

Please sign in to comment.