Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add broker api #12

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/secretnote/src/modules/server/server-manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ServerConnection, URL } from '@difizen/libro-jupyter';
import { ServerConnection } from '@difizen/libro-jupyter';
import { Emitter, inject, prop, singleton } from '@difizen/mana-app';

import { RequestService } from '@/utils';
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from typing import List, Tuple, Type

from jupyter_client.jsonutil import json_default
from jupyter_server.base.handlers import APIHandler, JupyterHandler
from tornado import web

from .manager import node_manager


class NodeHandler(APIHandler):
@web.authenticated
async def get(self, node_id):
node = node_manager.get_node(id=node_id)
if node is None:
raise web.HTTPError(404, "node not found.")
self.finish(json.dumps(node, default=json_default))

@web.authenticated
async def patch(self, node_id):
model = self.get_json_body()

if model is None:
raise web.HTTPError(400, "no request body provided.")

try:
node_manager.update_node(node_id, model)
except Exception as e:
raise web.HTTPError(400, str(e)) # noqa: B904

self.finish(json.dumps(model, default=json_default))

@web.authenticated
async def delete(self, node_id):
node_manager.remove_node(node_id)
self.set_status(204)
self.finish()


_node_id_regex = r"(?P<node_id>\d+)"

broker_handlers: List[Tuple[str, Type[JupyterHandler]]] = [
(r"/api/broker", NodeHandler),
]
316 changes: 316 additions & 0 deletions pyprojects/secretnote/src/secretnote/server/services/broker/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
import json
from typing import Any, List

from tornado import httpclient

BROKER_SERVICE_PATH = {
"query": "/intra/query",
"submit_query": "/intra/query/submit",
"fetch_result": "/intra/query/fetch",
"create_project": "/intra/project/create",
"list_projects": "/intra/project/list",
"invite_member": "/intra/member/invite",
"list_invitations": "/intra/invitation/list",
"process_invitation": "/intra/invitation/process",
"create_table": "/intra/table/create",
"list_tables": "/intra/table/list",
"drop_table": "/intra/table/drop",
"grant_ccl": "/intra/ccl/grant",
"revoke_ccl": "/intra/ccl/revoke",
"show_ccl": "/intra/ccl/show",
}


class BrokerManager:
global_project_id = "secretnote"

def __init__(self):
pass

def request(self, url: str, method="GET", body=None):
if body is None:
body = {}
http_client = httpclient.HTTPClient()
http_request_body = json.dumps(body)

try:
http_request = httpclient.HTTPRequest(
url=url,
method=method,
body=http_request_body,
headers={"Content-Type": "application/json"},
)
response = http_client.fetch(http_request)
return json.loads(response.body)
except httpclient.HTTPError as e:
# HTTPError is raised for non-200 responses; the response
# can be found in e.response.
print("Error: " + str(e))
except Exception as e:
# Other errors are possible, such as IOError.
print("Error: " + str(e))
http_client.close()

def get_request_status(self, response):
code = 0
message = ""
status = response.get("status", None)

if status is not None:
code = status.get("code", 0)
message = status.get("message", "")
else:
message = "no status found."
code = 500

return code, message

def create_project(self, project_id: str, address: str):
url = f"{address}{BROKER_SERVICE_PATH['create_project']}"
body = {
"project_id": project_id,
"conf": {"spu_runtime_cfg": {"protocol": "SEMI2K", "field": "FM64"}},
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("project_id", "")

def get_project_list(self, address: str):
url = f"{address}{BROKER_SERVICE_PATH['list_projects']}"
body = {"ids": []}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("projects", [])

def invite_member(self, invitee: str, address: str, project_id=global_project_id):
url = f"{address}{BROKER_SERVICE_PATH['invite_member']}"
body = {
"project_id": project_id,
"invitee": invitee,
"postscript": "",
"method": "PUSH",
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response

def get_invite_list(self, address: str):
url = f"{address}{BROKER_SERVICE_PATH['list_invitations']}"
body = {}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("invitations", [])

def process_invite(self, invitation_id: str, respond: str, address: str):
url = f"{address}{BROKER_SERVICE_PATH['process_invitation']}"
body = {
"invitation_id": invitation_id,
"respond": respond,
"respond_comment": "",
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

def create_table(
self,
table_name: str,
ref_table: str,
columns: List[Any],
address: str,
project_id=global_project_id,
):
url = f"{address}{BROKER_SERVICE_PATH['create_table']}"
body = {
"project_id": project_id,
"table_name": table_name,
"ref_table": ref_table,
"db_type": "mysql",
"columns": columns,
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

def get_table_list(self, address: str, project_id=global_project_id):
url = f"{address}{BROKER_SERVICE_PATH['list_tables']}"
body = {"project_id": project_id, "names": []}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("tables", [])

def delete_table(self, table_name: str, address: str, project_id=global_project_id):
url = f"{address}{BROKER_SERVICE_PATH['drop_table']}"
body = {"project_id": project_id, "table_name": table_name}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

def grant_ccl(
self, ccl_list: List[Any], address: str, project_id=global_project_id
):
url = f"{address}{BROKER_SERVICE_PATH['grant_ccl']}"
body = {
"project_id": project_id,
"column_control_list": ccl_list,
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

def revoke_ccl(
self, ccl_list: List[Any], address: str, project_id=global_project_id
):
url = f"{address}{BROKER_SERVICE_PATH['revoke_ccl']}"
body = {
"project_id": project_id,
"column_control_list": ccl_list,
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

def get_ccl_list(
self,
party: List[str],
table_name: List[str],
address: str,
project_id=global_project_id,
):
url = f"{address}{BROKER_SERVICE_PATH['show_ccl']}"
body = {"project_id": project_id, "tables": table_name, "dest_parties": party}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("column_control_list", [])

def query(self, query: str, address: str, project_id=global_project_id):
url = f"{address}{BROKER_SERVICE_PATH['query']}"
body = {
"project_id": project_id,
"query": query,
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("out_columns", [])

def create_query_job(self, query: str, address: str, project_id=global_project_id):
url = f"{address}{BROKER_SERVICE_PATH['submit_query']}"
body = {
"project_id": project_id,
"query": query,
}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("job_id", "")

def get_job_result(self, job_id: str, address: str):
url = f"{address}{BROKER_SERVICE_PATH['fetch_result']}"
body = {"job_id": job_id}
response = self.request(
url=url,
method="POST",
body=body,
)
code, message = self.get_request_status(response)

if code != 0:
raise Exception(message)

return response.get("out_columns", [])


broker_manager = BrokerManager()
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from jupyter_server.base.handlers import APIHandler, JupyterHandler
from tornado import web

from .nodemanager import node_manager
from .manager import node_manager


class NodeRootHandler(APIHandler):
Expand Down
Loading