diff --git a/organizer/default/.gitignore b/organizer/default/.gitignore index 726cb4e..5674bdc 100644 --- a/organizer/default/.gitignore +++ b/organizer/default/.gitignore @@ -2,3 +2,7 @@ Cargo.lock target dist_deb Dockerfile +_venv +.vscode +__pycache__ + diff --git a/organizer/default/logger.py b/organizer/default/logger.py new file mode 100644 index 0000000..6666367 --- /dev/null +++ b/organizer/default/logger.py @@ -0,0 +1,42 @@ + +import json +import logging +import sys + + +def make_logger(name: str, debug: bool = False, json: bool = False): + """ + Create a logger with the given name. + :param name: Name of the logger + :param debug: Whether to enable debug logging + :param json: Whether to log in JSON format + """ + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG if debug else logging.INFO) + formatter = _JsonFormatter() if json else logging.Formatter(f'%(levelname)s [%(name)s] %(message)s') # no timestamp, it's already logged by the server + + # Create a stream handler for stdout (for levels below ERROR) + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setLevel(logging.DEBUG if debug else logging.INFO) + stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR) + stdout_handler.setFormatter(formatter) + logger.addHandler(stdout_handler) + + # Create a stream handler for stderr (for levels ERROR and above) + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setLevel(logging.ERROR) + stderr_handler.setFormatter(formatter) + logger.addHandler(stderr_handler) + + return logger + + +class _JsonFormatter(logging.Formatter): + def format(self, record): + log_record = { + 'time': self.formatTime(record, self.datefmt), + 'level': record.levelname, + 'name': record.name, + 'message': record.getMessage(), + } + return json.dumps(log_record) diff --git a/organizer/default/main.py b/organizer/default/main.py new file mode 100644 index 0000000..f02291d --- /dev/null +++ b/organizer/default/main.py @@ -0,0 +1,207 @@ +import asyncio +import json +import re +from textwrap import dedent +from grpclib import GRPCError +import grpclib.client +from grpclib.server import Server + +import sys +from docopt import docopt +import clapshot_grpc.clapshot as ͼ # most types are in this module, use ͼ for short +import clapshot_grpc.clapshot.organizer as org +from grpclib.const import Status as GrpcStatus + +try: + from typing import override # type: ignore # Python 3.12+ +except ImportError: + def override(func): + return func + +from logger import make_logger + + +# Define the version of your program +VERSION = "0.1.0" + +PATH_COOKIE_NAME = "folder_path" +USER_ID_NODE_TYPE = "user_id" +FOLDER_NODE_TYPE = "folder" +PARENT_FOLDER_EDGE_TYPE = "parent_folder" +OWNER_EDGE_TYPE = "owner" + + +async def main(): + doc = """ + Default/example Clapshot Organizer plugin. + This gRPC server can bind to Unix socket or TCP address. + + Usage: + {NAME} [options] + {NAME} (-h | --help) + {NAME} (-v | --version) + + Required: + Unix socket or IP address to bind to. + e.g. '/tmp/organizer.sock' or '[::1]:50051' + + Options: + -d --debug Enable debug logging + -j --json Log in JSON format + -t --tcp Use TCP instead of Unix socket + -h --help Show this screen + -v --version Show version + """ + global debug_logging, json_logging + arguments = docopt(doc.format(NAME=sys.argv[0]), version=VERSION) + + bind_addr = arguments[""] + logger=make_logger("py", debug=arguments["--debug"], json=arguments["--json"]) + + server = Server([OrganizerInbound(logger)]) + if arguments["--tcp"]: + assert re.match(r"^\d+\.\d+\.\d+\.\d+:\d+$", bind_addr) or re.match(r"^\[[0-9a-f:]*\]:\d+$", bind_addr), \ + "bind_addr must be in the format of 'ip_listen_address:port' when using TCP" + host, port = bind_addr.split(":") + await server.start(host=host, port=int(port)) + else: + await server.start(path=bind_addr) # unix socket + logger.info(f"Organizer listening on '{bind_addr}'") + + await server.wait_closed() + logger.info("Organizer stopped listening.") + + +# ------------------------------------------------------------------------------------------------- + +class OrganizerInbound(org.OrganizerInboundBase): + srv: org.OrganizerOutboundStub # connection back to Clapshot server + + def __init__(self, logger): + self.log = logger + + + @override + async def handshake(self, server_info: org.ServerInfo) -> ͼ.Empty: + ''' + Receive handshake from Clapshot server. + We must connect back to it and send hanshake to establish a bidirectional connection. + ''' + self.log.info(f"Got handshake from server.") + self.log.debug(f"Server info: {json.dumps(server_info.to_dict())}") + try: + if tcp := server_info.backchannel.tcp: + backchannel = grpclib.client.Channel(host=tcp.host, port=tcp.port) + else: + backchannel = grpclib.client.Channel(path=server_info.backchannel.unix.path) + + self.log.info("Connecting back to Clapshot server...") + self.srv = org.OrganizerOutboundStub(backchannel) + self.srv.handshake + await self.srv.handshake(org.OrganizerInfo()) + self.log.info("Clapshot server connected.") + + except ConnectionRefusedError as e: + self.log.error(f"Return connection to Clapshot server refused: {e}") + raise GRPCError(GrpcStatus.UNKNOWN, "Failed to connect back to you (the Clapshot server)") + return ͼ.Empty() + + + @override + async def on_start_user_session(self, req: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResult: + ''' + New user session started. Send the clien a list of actions that this organizer plugin supports. + ''' + self.log.info("on_start_user_session") + + await self.srv.client_define_actions( + org.ClientDefineActionsRequest( + sid = req.ses.sid, + actions = { + # --- "New folder" popup --- + "new_folder": ͼ.ActionDef( + # how to display it in UI + ui_props=ͼ.ActionUiProps( + label = "New folder", + icon = ͼ.Icon(fa_class=ͼ.IconFaClass(classes="fa fa-folder-plus")), + natural_desc = "Create a new folder", + ), + # what to do when user clicks it (=show a browser dialog and make a call back to this plugin) + action = ͼ.ScriptCall( + lang = ͼ.ScriptCallLang.JAVASCRIPT, + code = dedent(r''' + var folder_name = (await prompt("Name for the new folder", ""))?.trim(); + if (folder_name) { await call_organizer("new_folder", {name: folder_name}); } + ''')) + ) + })) + + return org.OnStartUserSessionResult() + + + @override + async def navigate_page(self, navigate_page_request: org.NavigatePageRequest) -> org.ClientShowPageRequest: + self.log.info("navigate_page") + raise GRPCError(GrpcStatus.UNIMPLEMENTED) + + + @override + async def cmd_from_client(self, cmd: org.CmdFromClientRequest) -> ͼ.Empty: + self.log.info("cmd_from_client: " + str(cmd.to_dict())) + raise GRPCError(GrpcStatus.UNIMPLEMENTED) + + + @override + async def authz_user_action( + self, authz_user_action_request: org.AuthzUserActionRequest) -> org.AuthzResult: + raise GRPCError(GrpcStatus.UNIMPLEMENTED) # = let Clapshot server decice + + # ------------------------------------------------------------------------------------------------- + + @override + async def list_tests(self, clapshot_empty: ͼ.Empty) -> org.ListTestsResult: + self.log.info("list_tests") + return org.ListTestsResult(test_names=[]) + + + @override + async def run_test(self, run_test_request: org.RunTestRequest) -> org.RunTestResult: + self.log.info("run_test") + raise GRPCError(GrpcStatus.UNIMPLEMENTED) + + # ------------------------------------------------------------------------------------------------- + + async def _get_current_folder_path(self, ses: org.UserSessionData) -> list[org.PropNode]: + ''' + User's current folder path is stored in a cookie as a JSON list of folder IDs. + Read it, get the folder nodes from DB, and return them. + ''' + ck = ses.cookies or {} + try: + if folder_ids := json.loads(ck.get(PATH_COOKIE_NAME) or '[]'): + folder_nodes = await self.srv.db_get_prop_nodes(org.DbGetPropNodesRequest( + node_type = FOLDER_NODE_TYPE, + ids = org.IdList(ids = folder_ids))) + if len(folder_nodes.items) == len(folder_ids): + return [folder_nodes.items[id] for id in folder_ids] + else: + # Some folder weren't found in DB. Clear cookie. + await self.srv.client_set_cookies(org.ClientSetCookiesRequest(cookies = {PATH_COOKIE_NAME: ''}, sid = ses.sid)) + await self.srv.client_show_user_message(org.ClientShowUserMessageRequest( + sid = ses.sid, + msg = ͼ.UserMessage( + message = "Some unknown folder IDs in folder_path cookie. Clearing it.", + user_id = ses.user.id, + type = ͼ.UserMessageType.ERROR))) + return [] + except json.JSONDecodeError as e: + self.log.error(f"Failed to parse folder_path cookie: {e}. Falling back to empty folder path.") + return [] + + +if __name__ == '__main__': + try: + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + except KeyboardInterrupt: + print("EXIT signaled.") diff --git a/organizer/default/requirements.txt b/organizer/default/requirements.txt new file mode 100644 index 0000000..6448418 --- /dev/null +++ b/organizer/default/requirements.txt @@ -0,0 +1,4 @@ +docopt +../../protobuf/libs/python/dist/clapshot_grpc-0.0.1.tar.gz +grpclib +betterproto==2.0.0b6 diff --git a/organizer/default/run-py-org.sh b/organizer/default/run-py-org.sh new file mode 100755 index 0000000..3594428 --- /dev/null +++ b/organizer/default/run-py-org.sh @@ -0,0 +1,4 @@ +#!/bin/bash +DIR=$(dirname "$0") +$DIR/_venv/bin/python $DIR/main.py $@ + diff --git a/organizer/default/src/lib.rs b/organizer/default/src/lib.rs index da53074..5ff944a 100644 --- a/organizer/default/src/lib.rs +++ b/organizer/default/src/lib.rs @@ -41,7 +41,7 @@ pub const VERSION: &'static str = env!("CARGO_PKG_VERSION"); pub const NAME: &'static str = env!("CARGO_PKG_NAME"); -// Implement inbound RCP methods +// Implement inbound RCP methods (from organizer.proto) #[tonic::async_trait] impl org::organizer_inbound_server::OrganizerInbound for DefaultOrganizer @@ -150,6 +150,10 @@ impl org::organizer_inbound_server::OrganizerInbound for DefaultOrganizer } } + // ------------------------------------------------------------------ + // Unit / integration tests + // ------------------------------------------------------------------ + async fn list_tests(&self, _req: Request) -> RpcResponseResult { Ok(Response::new(org::ListTestsResult { diff --git a/organizer/default/src/ui_components.rs b/organizer/default/src/ui_components.rs index 81c70f4..7df5a89 100644 --- a/organizer/default/src/ui_components.rs +++ b/organizer/default/src/ui_components.rs @@ -42,6 +42,7 @@ if (folder_name) { } } +// --------------------------------------------------------------------------- /// Build folder view page. /// Reads folder_path cookie and builds a list of folders and videos in the folder. @@ -150,6 +151,7 @@ fn folder_node_to_page_item(folder: &org::PropNode) -> proto::page_item::folder_ } } +// --------------------------------------------------------------------------- /// Build folder view page. diff --git a/protobuf/libs/python/Makefile b/protobuf/libs/python/Makefile index a165414..e42b87b 100644 --- a/protobuf/libs/python/Makefile +++ b/protobuf/libs/python/Makefile @@ -36,7 +36,7 @@ generate: package: protobufs echo "from setuptools import setup, find_packages" > $(DIST_DIR)/setup.py - echo "setup(name='$(PACKAGE_NAME)', version='$(PACKAGE_VERSION)', packages=find_packages(), install_requires=['betterproto>=2.0.0', 'grpclib'])" >> $(DIST_DIR)/setup.py + echo "setup(name='$(PACKAGE_NAME)', version='$(PACKAGE_VERSION)', packages=find_packages(), install_requires=[])" >> $(DIST_DIR)/setup.py cd $(DIST_DIR) && $(VENV_PYTHON) setup.py sdist cp $(DIST_DIR)/dist/*.tar.gz $(DIST_DIR) diff --git a/protobuf/libs/rust/src/subprocess.rs b/protobuf/libs/rust/src/subprocess.rs index c2165f9..ed8eb6e 100644 --- a/protobuf/libs/rust/src/subprocess.rs +++ b/protobuf/libs/rust/src/subprocess.rs @@ -12,7 +12,7 @@ pub struct ProcHandle { /// Execute a shell command (pass to 'sh') in a subprocess, /// and log its stdout and stderr. -/// +/// /// Returns a handle that will kill the subprocess when dropped. pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Result { @@ -37,15 +37,36 @@ pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Re for line in reader.lines() { match line { Ok(line) => { - match level { - tracing::Level::INFO => info!("[{}] {}", name, line), - tracing::Level::ERROR => error!("[{}] {}", name, line), - _ => panic!("Unsupported log level"), - }} + // If the line from organizer starts with a log level, use it. + // Otherwise, use the default log level (INFO for stdout, ERROR for stderr). + match line.split_once(" ") { + Some((level_str, msg_str)) => { + let level_override = match level_str { + "DEBUG" => tracing::Level::DEBUG, + "INFO" => tracing::Level::INFO, + "WARN" | "WARNING" => tracing::Level::WARN, + "ERROR" | "CRITICAL" | "FATAL" => tracing::Level::ERROR, + _ => match level { + tracing::Level::INFO => tracing::Level::INFO, + tracing::Level::ERROR => tracing::Level::ERROR, + _ => panic!("Unsupported log level"), + } + }; + match level_override { + tracing::Level::DEBUG => debug!("[{}] {}", name, msg_str), + tracing::Level::INFO => info!("[{}] {}", name, msg_str), + tracing::Level::WARN => warn!("[{}] {}", name, msg_str), + tracing::Level::ERROR => error!("[{}] {}", name, msg_str), + _ => panic!("Unsupported log level"), + } + } + None => info!("[{}] {}", name, line), + } + } Err(e) => { error!("Failed to read {}. Bailing. -- {:?}", name, e); break; - } + } } } debug!("Thread to read {}->log exiting", name); @@ -68,7 +89,7 @@ pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Re } /// Terminate the subprocess when Handle is dropped. -impl Drop for ProcHandle +impl Drop for ProcHandle { fn drop(&mut self) { self.span.in_scope(|| { @@ -88,7 +109,7 @@ impl Drop for ProcHandle Ok(status) => { info!("Shell exited with status: {}", status); } Err(e) => { warn!("Failed to wait for shell: {:?}", e); } } - } + } }); } } diff --git a/protobuf/proto/client.proto b/protobuf/proto/client.proto index d32a19f..0287af7 100644 --- a/protobuf/proto/client.proto +++ b/protobuf/proto/client.proto @@ -44,7 +44,7 @@ message ServerToClientCmd { optional string drawing = 5; // data-uri of an image } message SetCookies { - Cookies cookies = 1; // Cookies to set. Use empty string to delete a cookie. + map cookies = 1; // Cookies to set. Use empty string to delete a cookie. google.protobuf.Timestamp expire_time = 2; } diff --git a/protobuf/proto/common.proto b/protobuf/proto/common.proto index d891f1f..fb2016c 100644 --- a/protobuf/proto/common.proto +++ b/protobuf/proto/common.proto @@ -7,10 +7,6 @@ import "google/protobuf/struct.proto"; message Empty {} -message Cookies { - map cookies = 1; -} - // --------------------------------------------------------- // Video metadata // --------------------------------------------------------- diff --git a/protobuf/proto/organizer.proto b/protobuf/proto/organizer.proto index d45efd8..5c07c16 100644 --- a/protobuf/proto/organizer.proto +++ b/protobuf/proto/organizer.proto @@ -128,7 +128,7 @@ message SemanticVersionNumber { message UserSessionData { string sid = 1; UserInfo user = 2; - Cookies cookies = 3; + map cookies = 3; } // --------------------------------------------------------- @@ -246,7 +246,7 @@ message ClientOpenVideoRequest { message ClientSetCookiesRequest { string sid = 1; - Cookies cookies = 2; + map cookies = 2; optional google.protobuf.Timestamp expire_time = 3; } diff --git a/server/src/api_server/file_upload.rs b/server/src/api_server/file_upload.rs index 2de9fce..b605bd8 100644 --- a/server/src/api_server/file_upload.rs +++ b/server/src/api_server/file_upload.rs @@ -50,7 +50,8 @@ pub async fn handle_multipart_upload( let org_session = proto::org::UserSessionData { sid: "".to_string(), user: Some(proto::UserInfo { id: user_id.clone(), name: Some(user_name.clone()) }), - cookies: Some(proto::Cookies { cookies }) }; + cookies + }; match org_authz_with_default(&org_session, "upload video", true, &server, &Some(organizer), true, AuthzTopic::Other(None, authz_req::other_op::Op::UploadVideo)).await { diff --git a/server/src/api_server/mod.rs b/server/src/api_server/mod.rs index 89a2715..c5fe01e 100644 --- a/server/src/api_server/mod.rs +++ b/server/src/api_server/mod.rs @@ -110,7 +110,7 @@ async fn handle_ws_session( id: user_id.clone(), name: Some(username.clone()), }), - cookies: Some(proto::Cookies { cookies }), + cookies, } }; @@ -223,7 +223,7 @@ async fn handle_ws_session( let (cmd, data) = match parse_msg(&msg) { Ok((cmd, data, cookies)) => { - ses.org_session.cookies = Some(lib_clapshot_grpc::proto::Cookies { cookies }); + ses.org_session.cookies = cookies; (cmd, data) }, Err(e) => { diff --git a/server/src/grpc/grpc_client.rs b/server/src/grpc/grpc_client.rs index a02bd29..2fd1f83 100644 --- a/server/src/grpc/grpc_client.rs +++ b/server/src/grpc/grpc_client.rs @@ -46,6 +46,8 @@ pub async fn connect(uri: OrganizerURI) -> anyhow::Result pub fn prepare_organizer( org_uri: &Option, cmd: &Option, + debug: bool, + json: bool, data_dir: &Path) -> anyhow::Result<(Option, Option)> { @@ -67,7 +69,7 @@ pub fn prepare_organizer( .join("grpc-srv-to-org.sock"); org_uri = Some(OrganizerURI::UnixSocket(unix_sock)); }; - Some(spawn_organizer(&cmd.as_str(), org_uri.clone().unwrap())?) + Some(spawn_organizer(&cmd.as_str(), org_uri.clone().unwrap(), debug, json)?) } else { None }; Ok((org_uri, org_hdl)) @@ -75,12 +77,12 @@ pub fn prepare_organizer( /// Spawn organizer gRPC server as a subprocess. /// Dropping the returned handle will signal/kill the subprocess. -fn spawn_organizer(cmd: &str, uri: OrganizerURI) +fn spawn_organizer(cmd: &str, uri: OrganizerURI, debug: bool, json: bool) -> anyhow::Result { assert!(cmd != "", "Empty organizer command"); - let cmd = match uri { + let mut cmd = match uri { OrganizerURI::UnixSocket(path) => { unix_socket::delete_old(&path)?; format!("{} {}", cmd, path.display()) @@ -90,5 +92,7 @@ fn spawn_organizer(cmd: &str, uri: OrganizerURI) }, }; + if debug { cmd += " --debug"; } + if json { cmd += " --json"; } spawn_shell(&cmd, "organizer", info_span!("ORG")) } diff --git a/server/src/grpc/grpc_server.rs b/server/src/grpc/grpc_server.rs index a6ecb95..518a435 100644 --- a/server/src/grpc/grpc_server.rs +++ b/server/src/grpc/grpc_server.rs @@ -101,7 +101,6 @@ impl org::organizer_outbound_server::OrganizerOutbound for OrganizerOutboundImpl async fn client_set_cookies(&self, req: Request) -> RpcResult { let req = req.into_inner(); - rpc_expect_field(&req.cookies, "filter")?; to_rpc_empty(self.server.emit_cmd(client_cmd!(SetCookies, {cookies: req.cookies, expire_time: req.expire_time}), SendTo::UserSession(&req.sid))) } diff --git a/server/src/log.rs b/server/src/log.rs index b6f8fca..a306be9 100644 --- a/server/src/log.rs +++ b/server/src/log.rs @@ -16,7 +16,7 @@ pub fn setup_logging(time_offset: time::UtcOffset, debug: bool, log_file: &str, }; if std::env::var_os("RUST_LOG").is_none() { - std::env::set_var("RUST_LOG", if debug {"debug,clapshot_server=debug,h2=info"} else {"info,clapshot_server=info"}); + std::env::set_var("RUST_LOG", if debug {"debug,clapshot_server=debug,h2=info,hyper::proto::h1=info"} else {"info,clapshot_server=info"}); }; let minute_offset = time_offset.whole_minutes() % 60; diff --git a/server/src/main.rs b/server/src/main.rs index b76f0de..be2b466 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -122,6 +122,8 @@ fn main() -> anyhow::Result<()> let (org_uri, _org_hdl) = prepare_organizer( &args.flag_org_in_uri, &args.flag_org_cmd, + args.flag_debug, + args.flag_json, &args.flag_data_dir)?; let cors_origins: Vec = args.flag_cors diff --git a/server/src/tests/integration_test.rs b/server/src/tests/integration_test.rs index 4a59b69..9ba465e 100644 --- a/server/src/tests/integration_test.rs +++ b/server/src/tests/integration_test.rs @@ -109,7 +109,7 @@ mod integration_test let target_bitrate = $bitrate; let grpc_server_bind = crate::grpc::grpc_server::make_grpc_server_bind(&None, &$data_dir)?; - let (org_uri, _org_hdl) = prepare_organizer(&None, &$org_cmd, &$data_dir.path())?; + let (org_uri, _org_hdl) = prepare_organizer(&None, &$org_cmd, true, false, &$data_dir.path())?; let terminate_flag = Arc::new(AtomicBool::new(false));