Skip to content

Commit

Permalink
Add a stub Python version of default organizer. Minor org<>srv integr…
Browse files Browse the repository at this point in the history
…ation tweaks.
  • Loading branch information
elonen committed Oct 23, 2023
1 parent d76b405 commit c762760
Show file tree
Hide file tree
Showing 19 changed files with 317 additions and 27 deletions.
4 changes: 4 additions & 0 deletions organizer/default/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ Cargo.lock
target
dist_deb
Dockerfile
_venv
.vscode
__pycache__

42 changes: 42 additions & 0 deletions organizer/default/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

import json
import logging
import sys


def make_logger(name: str, debug: bool = False, json: bool = False):
"""
Create a logger with the given name.
:param name: Name of the logger
:param debug: Whether to enable debug logging
:param json: Whether to log in JSON format
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG if debug else logging.INFO)
formatter = _JsonFormatter() if json else logging.Formatter(f'%(levelname)s [%(name)s] %(message)s') # no timestamp, it's already logged by the server

# Create a stream handler for stdout (for levels below ERROR)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG if debug else logging.INFO)
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)

# Create a stream handler for stderr (for levels ERROR and above)
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter)
logger.addHandler(stderr_handler)

return logger


class _JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'time': self.formatTime(record, self.datefmt),
'level': record.levelname,
'name': record.name,
'message': record.getMessage(),
}
return json.dumps(log_record)
207 changes: 207 additions & 0 deletions organizer/default/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import asyncio
import json
import re
from textwrap import dedent
from grpclib import GRPCError
import grpclib.client
from grpclib.server import Server

import sys
from docopt import docopt
import clapshot_grpc.clapshot as ͼ # most types are in this module, use ͼ for short
import clapshot_grpc.clapshot.organizer as org
from grpclib.const import Status as GrpcStatus

try:
from typing import override # type: ignore # Python 3.12+
except ImportError:
def override(func):
return func

from logger import make_logger


# Define the version of your program
VERSION = "0.1.0"

PATH_COOKIE_NAME = "folder_path"
USER_ID_NODE_TYPE = "user_id"
FOLDER_NODE_TYPE = "folder"
PARENT_FOLDER_EDGE_TYPE = "parent_folder"
OWNER_EDGE_TYPE = "owner"


async def main():
doc = """
Default/example Clapshot Organizer plugin.
This gRPC server can bind to Unix socket or TCP address.
Usage:
{NAME} [options] <bind>
{NAME} (-h | --help)
{NAME} (-v | --version)
Required:
<bind> Unix socket or IP address to bind to.
e.g. '/tmp/organizer.sock' or '[::1]:50051'
Options:
-d --debug Enable debug logging
-j --json Log in JSON format
-t --tcp Use TCP instead of Unix socket
-h --help Show this screen
-v --version Show version
"""
global debug_logging, json_logging
arguments = docopt(doc.format(NAME=sys.argv[0]), version=VERSION)

bind_addr = arguments["<bind>"]
logger=make_logger("py", debug=arguments["--debug"], json=arguments["--json"])

server = Server([OrganizerInbound(logger)])
if arguments["--tcp"]:
assert re.match(r"^\d+\.\d+\.\d+\.\d+:\d+$", bind_addr) or re.match(r"^\[[0-9a-f:]*\]:\d+$", bind_addr), \
"bind_addr must be in the format of 'ip_listen_address:port' when using TCP"
host, port = bind_addr.split(":")
await server.start(host=host, port=int(port))
else:
await server.start(path=bind_addr) # unix socket
logger.info(f"Organizer listening on '{bind_addr}'")

await server.wait_closed()
logger.info("Organizer stopped listening.")


# -------------------------------------------------------------------------------------------------

class OrganizerInbound(org.OrganizerInboundBase):
srv: org.OrganizerOutboundStub # connection back to Clapshot server

def __init__(self, logger):
self.log = logger


@override
async def handshake(self, server_info: org.ServerInfo) -> ͼ.Empty:
'''
Receive handshake from Clapshot server.
We must connect back to it and send hanshake to establish a bidirectional connection.
'''
self.log.info(f"Got handshake from server.")
self.log.debug(f"Server info: {json.dumps(server_info.to_dict())}")
try:
if tcp := server_info.backchannel.tcp:
backchannel = grpclib.client.Channel(host=tcp.host, port=tcp.port)
else:
backchannel = grpclib.client.Channel(path=server_info.backchannel.unix.path)

self.log.info("Connecting back to Clapshot server...")
self.srv = org.OrganizerOutboundStub(backchannel)
self.srv.handshake
await self.srv.handshake(org.OrganizerInfo())
self.log.info("Clapshot server connected.")

except ConnectionRefusedError as e:
self.log.error(f"Return connection to Clapshot server refused: {e}")
raise GRPCError(GrpcStatus.UNKNOWN, "Failed to connect back to you (the Clapshot server)")
return ͼ.Empty()


@override
async def on_start_user_session(self, req: org.OnStartUserSessionRequest) -> org.OnStartUserSessionResult:
'''
New user session started. Send the clien a list of actions that this organizer plugin supports.
'''
self.log.info("on_start_user_session")

await self.srv.client_define_actions(
org.ClientDefineActionsRequest(
sid = req.ses.sid,
actions = {
# --- "New folder" popup ---
"new_folder": ͼ.ActionDef(
# how to display it in UI
ui_props=ͼ.ActionUiProps(
label = "New folder",
icon = ͼ.Icon(fa_class=ͼ.IconFaClass(classes="fa fa-folder-plus")),
natural_desc = "Create a new folder",
),
# what to do when user clicks it (=show a browser dialog and make a call back to this plugin)
action = ͼ.ScriptCall(
lang = ͼ.ScriptCallLang.JAVASCRIPT,
code = dedent(r'''
var folder_name = (await prompt("Name for the new folder", ""))?.trim();
if (folder_name) { await call_organizer("new_folder", {name: folder_name}); }
'''))
)
}))

return org.OnStartUserSessionResult()


@override
async def navigate_page(self, navigate_page_request: org.NavigatePageRequest) -> org.ClientShowPageRequest:
self.log.info("navigate_page")
raise GRPCError(GrpcStatus.UNIMPLEMENTED)


@override
async def cmd_from_client(self, cmd: org.CmdFromClientRequest) -> ͼ.Empty:
self.log.info("cmd_from_client: " + str(cmd.to_dict()))
raise GRPCError(GrpcStatus.UNIMPLEMENTED)


@override
async def authz_user_action(
self, authz_user_action_request: org.AuthzUserActionRequest) -> org.AuthzResult:
raise GRPCError(GrpcStatus.UNIMPLEMENTED) # = let Clapshot server decice

# -------------------------------------------------------------------------------------------------

@override
async def list_tests(self, clapshot_empty: ͼ.Empty) -> org.ListTestsResult:
self.log.info("list_tests")
return org.ListTestsResult(test_names=[])


@override
async def run_test(self, run_test_request: org.RunTestRequest) -> org.RunTestResult:
self.log.info("run_test")
raise GRPCError(GrpcStatus.UNIMPLEMENTED)

# -------------------------------------------------------------------------------------------------

async def _get_current_folder_path(self, ses: org.UserSessionData) -> list[org.PropNode]:
'''
User's current folder path is stored in a cookie as a JSON list of folder IDs.
Read it, get the folder nodes from DB, and return them.
'''
ck = ses.cookies or {}
try:
if folder_ids := json.loads(ck.get(PATH_COOKIE_NAME) or '[]'):
folder_nodes = await self.srv.db_get_prop_nodes(org.DbGetPropNodesRequest(
node_type = FOLDER_NODE_TYPE,
ids = org.IdList(ids = folder_ids)))
if len(folder_nodes.items) == len(folder_ids):
return [folder_nodes.items[id] for id in folder_ids]
else:
# Some folder weren't found in DB. Clear cookie.
await self.srv.client_set_cookies(org.ClientSetCookiesRequest(cookies = {PATH_COOKIE_NAME: ''}, sid = ses.sid))
await self.srv.client_show_user_message(org.ClientShowUserMessageRequest(
sid = ses.sid,
msg = ͼ.UserMessage(
message = "Some unknown folder IDs in folder_path cookie. Clearing it.",
user_id = ses.user.id,
type = ͼ.UserMessageType.ERROR)))
return []
except json.JSONDecodeError as e:
self.log.error(f"Failed to parse folder_path cookie: {e}. Falling back to empty folder path.")
return []


if __name__ == '__main__':
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
print("EXIT signaled.")
4 changes: 4 additions & 0 deletions organizer/default/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
docopt
../../protobuf/libs/python/dist/clapshot_grpc-0.0.1.tar.gz
grpclib
betterproto==2.0.0b6
4 changes: 4 additions & 0 deletions organizer/default/run-py-org.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
DIR=$(dirname "$0")
$DIR/_venv/bin/python $DIR/main.py $@

6 changes: 5 additions & 1 deletion organizer/default/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub const VERSION: &'static str = env!("CARGO_PKG_VERSION");
pub const NAME: &'static str = env!("CARGO_PKG_NAME");


// Implement inbound RCP methods
// Implement inbound RCP methods (from organizer.proto)

#[tonic::async_trait]
impl org::organizer_inbound_server::OrganizerInbound for DefaultOrganizer
Expand Down Expand Up @@ -150,6 +150,10 @@ impl org::organizer_inbound_server::OrganizerInbound for DefaultOrganizer
}
}

// ------------------------------------------------------------------
// Unit / integration tests
// ------------------------------------------------------------------

async fn list_tests(&self, _req: Request<proto::Empty>) -> RpcResponseResult<org::ListTestsResult>
{
Ok(Response::new(org::ListTestsResult {
Expand Down
2 changes: 2 additions & 0 deletions organizer/default/src/ui_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ if (folder_name) {
}
}

// ---------------------------------------------------------------------------

/// Build folder view page.
/// Reads folder_path cookie and builds a list of folders and videos in the folder.
Expand Down Expand Up @@ -150,6 +151,7 @@ fn folder_node_to_page_item(folder: &org::PropNode) -> proto::page_item::folder_
}
}

// ---------------------------------------------------------------------------


/// Build folder view page.
Expand Down
2 changes: 1 addition & 1 deletion protobuf/libs/python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ generate:

package: protobufs
echo "from setuptools import setup, find_packages" > $(DIST_DIR)/setup.py
echo "setup(name='$(PACKAGE_NAME)', version='$(PACKAGE_VERSION)', packages=find_packages(), install_requires=['betterproto>=2.0.0', 'grpclib'])" >> $(DIST_DIR)/setup.py
echo "setup(name='$(PACKAGE_NAME)', version='$(PACKAGE_VERSION)', packages=find_packages(), install_requires=[])" >> $(DIST_DIR)/setup.py
cd $(DIST_DIR) && $(VENV_PYTHON) setup.py sdist
cp $(DIST_DIR)/dist/*.tar.gz $(DIST_DIR)

Expand Down
39 changes: 30 additions & 9 deletions protobuf/libs/rust/src/subprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct ProcHandle {

/// Execute a shell command (pass to 'sh') in a subprocess,
/// and log its stdout and stderr.
///
///
/// Returns a handle that will kill the subprocess when dropped.
pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Result<ProcHandle>
{
Expand All @@ -37,15 +37,36 @@ pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Re
for line in reader.lines() {
match line {
Ok(line) => {
match level {
tracing::Level::INFO => info!("[{}] {}", name, line),
tracing::Level::ERROR => error!("[{}] {}", name, line),
_ => panic!("Unsupported log level"),
}}
// If the line from organizer starts with a log level, use it.
// Otherwise, use the default log level (INFO for stdout, ERROR for stderr).
match line.split_once(" ") {
Some((level_str, msg_str)) => {
let level_override = match level_str {
"DEBUG" => tracing::Level::DEBUG,
"INFO" => tracing::Level::INFO,
"WARN" | "WARNING" => tracing::Level::WARN,
"ERROR" | "CRITICAL" | "FATAL" => tracing::Level::ERROR,
_ => match level {
tracing::Level::INFO => tracing::Level::INFO,
tracing::Level::ERROR => tracing::Level::ERROR,
_ => panic!("Unsupported log level"),
}
};
match level_override {
tracing::Level::DEBUG => debug!("[{}] {}", name, msg_str),
tracing::Level::INFO => info!("[{}] {}", name, msg_str),
tracing::Level::WARN => warn!("[{}] {}", name, msg_str),
tracing::Level::ERROR => error!("[{}] {}", name, msg_str),
_ => panic!("Unsupported log level"),
}
}
None => info!("[{}] {}", name, line),
}
}
Err(e) => {
error!("Failed to read {}. Bailing. -- {:?}", name, e);
break;
}
}
}
}
debug!("Thread to read {}->log exiting", name);
Expand All @@ -68,7 +89,7 @@ pub fn spawn_shell(cmd_str: &str, name: &str, span: tracing::Span) -> anyhow::Re
}

/// Terminate the subprocess when Handle is dropped.
impl Drop for ProcHandle
impl Drop for ProcHandle
{
fn drop(&mut self) {
self.span.in_scope(|| {
Expand All @@ -88,7 +109,7 @@ impl Drop for ProcHandle
Ok(status) => { info!("Shell exited with status: {}", status); }
Err(e) => { warn!("Failed to wait for shell: {:?}", e); }
}
}
}
});
}
}
2 changes: 1 addition & 1 deletion protobuf/proto/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message ServerToClientCmd {
optional string drawing = 5; // data-uri of an image
}
message SetCookies {
Cookies cookies = 1; // Cookies to set. Use empty string to delete a cookie.
map<string, string> cookies = 1; // Cookies to set. Use empty string to delete a cookie.
google.protobuf.Timestamp expire_time = 2;
}

Expand Down
Loading

0 comments on commit c762760

Please sign in to comment.