Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: http::HttpServer::handle_request() issues #110

Open
nick1udwig opened this issue Nov 14, 2024 · 3 comments
Open

feature: http::HttpServer::handle_request() issues #110

nick1udwig opened this issue Nov 14, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@nick1udwig
Copy link
Collaborator

nick1udwig commented Nov 14, 2024

Is your feature request related to a problem? Please describe.
There are two big issues I'm running into with handle_request().

  1. I want to, as a result of handling either an HTTP or WS request, .ws_push_all_channels(). However, I can't since I'm in a closure inside of a HttpServer method call that is borrowing it as mutable.
  2. I have some state I want to be able to use as a mutable reference in both the HTTP and the WS closure. But Rust complains:
          error[E0524]: two closures require unique access to `*message_archive` at the same time
          --> foo/src/lib.rs:145:9
           |
       74  |     server.handle_request(
           |            -------------- first borrow later used by call
       75  |         request,
       76  |         |request| {
           |         --------- first closure is constructed here
       ...
       124 |                         message_archive,
           |                         --------------- first borrow occurs due to use of `*message_archive` in closure
       ...
       145 |         |_channel, _message_type, blob| {
           |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ second closure is constructed here
       ...
       151 |                 message_archive,
           |                 --------------- second borrow occurs due to use of `*message_archive` in closure
    

Describe the solution you'd like

  1. One possible way around this would be to expose the ws_channels HashMap and add a function (not a method) ws_push_all_channels() that operated on the given HashMap.
  2. idk

Describe alternatives you've considered
What I'm probably going to have to do is go back to the old pattern of matching.

Notes
It seems like we probably didn't hit the second of these problems before because we used WS only one way (i.e. Kinode -> FE or FE -> Kinode, not both), and hence, the state need not be passed into two closures, only one.

@nick1udwig nick1udwig added the enhancement New feature or request label Nov 14, 2024
@nick1udwig
Copy link
Collaborator Author

nick1udwig commented Nov 14, 2024

For the record, here's my non-working code:

use std::collections::HashMap;

use crate::kinode::process::chat::{
    ChatMessage, Request as ChatRequest, Response as ChatResponse, SendRequest,
};
use kinode_process_lib::logging::{error, info, init_logging, Level};
use kinode_process_lib::{
    await_message, call_init, get_blob,
    http::server::{
        HttpBindingConfig, HttpResponse, HttpServer, HttpServerRequest, WsBindingConfig,
        WsMessageType,
    },
    println, Address, LazyLoadBlob, Message, Request, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "chat-template-dot-os-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

const HTTP_API_PATH: &str = "/messages";
const WS_PATH: &str = "/";

const StatusCodeOK: u16 = 200;
const StatusCodeCREATED: u16 = 201;
const StatusCodeBAD_REQUEST: u16 = 400;
const StatusCodeMETHOD_NOT_ALLOWED: u16 = 405;

#[derive(Debug, serde::Serialize, serde::Deserialize, process_macros::SerdeJsonInto)]
struct NewMessage {
    chat: String,
    author: String,
    content: String,
}

type MessageArchive = HashMap<String, Vec<ChatMessage>>;

fn make_http_address(our: &Address) -> Address {
    Address::from((our.node(), "http_server", "distro", "sys"))
}

fn handle_http_server_request(
    our: &Address,
    body: &[u8],
    message_archive: &mut MessageArchive,
    server: &mut HttpServer,
) -> anyhow::Result<()> {
    let Ok(request) = serde_json::from_slice::<HttpServerRequest>(body) else {
        // Fail quietly if we can't parse the request
        info!("couldn't parse message from http_server: {body:?}");
        return Ok(());
    };

    server.handle_request(
        request,
        |request| {
            match request.method().unwrap().as_str() {
                // Get all messages
                "GET" => (
                    HttpResponse::new(StatusCodeOK).header("Content-Type", "application/json"),
                    Some(LazyLoadBlob {
                        mime: None,
                        bytes: serde_json::to_vec(&serde_json::json!({
                            "History": {
                                "messages": message_archive.clone()
                            }
                        }))
                        .unwrap(),
                    }),
                ),
                //{
                //    let mut headers = HashMap::new();
                //    headers.insert("Content-Type".to_string(), "application/json".to_string());

                //    send_response(
                //        StatusCode::OK,
                //        Some(headers),
                //        serde_json::to_vec(&ChatResponse::History {
                //            messages: message_archive.clone(),
                //        })
                //        .unwrap(),
                //    );
                //}
                // Send a message
                "POST" => {
                    let Some(blob) = get_blob() else {
                        return (HttpResponse::new(StatusCodeBAD_REQUEST), None);
                    };
                    handle_chat_request(
                        our,
                        &make_http_address(our),
                        &blob.bytes,
                        false,
                        message_archive,
                        None,
                    )
                    .unwrap()
                    .unwrap();
                    server.ws_push_all_channels(WS_PATH, WsMessageType::text, blob);

                    //(HttpResponse::new(StatusCode::CREATED as u16), None)
                    (HttpResponse::new(StatusCodeCREATED), None)
                    //// Send an http response via the http server
                    //send_response(StatusCode::CREATED, None, vec![]);
                }
                _ => (
                    //HttpResponse::new(StatusCode::METHOD_NOT_ALLOWED as u16),
                    HttpResponse::new(StatusCodeMETHOD_NOT_ALLOWED),
                    None,
                ),
                //{
                //    // Method not allowed
                //    send_response(StatusCode::METHOD_NOT_ALLOWED, None, vec![]);
                //}
            }
        },
        |_channel, _message_type, blob| {
            let blob = handle_chat_request(
                our,
                &make_http_address(our),
                &blob.bytes,
                false,
                message_archive,
                None,
            )
            .unwrap()
            .unwrap();
            server.ws_push_all_channels(WS_PATH, WsMessageType::text, blob);
        },
    );

    //match server_request {
    //    HttpServerRequest::WebSocketOpen { channel_id, .. } => {
    //        // Set our channel_id to the newly opened channel
    //        // Note: this code could be improved to support multiple channels
    //        *our_channel_id = channel_id;
    //    }
    //    HttpServerRequest::WebSocketPush { .. } => {
    //        let Some(blob) = get_blob() else {
    //            return Ok(());
    //        };

    //        handle_chat_request(
    //            our,
    //            &make_http_address(our),
    //            &blob.bytes,
    //            false,
    //            message_archive,
    //            server,
    //        )?;
    //    }
    //    HttpServerRequest::WebSocketClose(_channel_id) => {}
    //    HttpServerRequest::Http(request) => {
    //        match request.method()?.as_str() {
    //            // Get all messages
    //            "GET" => {
    //                let mut headers = HashMap::new();
    //                headers.insert("Content-Type".to_string(), "application/json".to_string());

    //                send_response(
    //                    StatusCode::OK,
    //                    Some(headers),
    //                    serde_json::to_vec(&ChatResponse::History {
    //                        messages: message_archive.clone(),
    //                    })
    //                    .unwrap(),
    //                );
    //            }
    //            // Send a message
    //            "POST" => {
    //                let Some(blob) = get_blob() else {
    //                    return Ok(());
    //                };
    //                handle_chat_request(
    //                    our,
    //                    &make_http_address(our),
    //                    &blob.bytes,
    //                    true,
    //                    message_archive,
    //                    server,
    //                )?;

    //                // Send an http response via the http server
    //                send_response(StatusCode::CREATED, None, vec![]);
    //            }
    //            _ => {
    //                // Method not allowed
    //                send_response(StatusCode::METHOD_NOT_ALLOWED, None, vec![]);
    //            }
    //        }
    //    }
    //};

    Ok(())
}

fn handle_chat_request(
    our: &Address,
    source: &Address,
    body: &[u8],
    is_http: bool,
    message_archive: &mut MessageArchive,
    server: Option<&HttpServer>,
) -> anyhow::Result<Option<LazyLoadBlob>> {
    let Ok(chat_request) = serde_json::from_slice::<ChatRequest>(body) else {
        // Fail silently if we can't parse the request
        return Ok(None);
    };

    match chat_request {
        ChatRequest::Send(SendRequest {
            ref target,
            ref message,
        }) => {
            // counterparty will be the other node in the chat with us
            let (counterparty, author) = if target == &our.node {
                (&source.node, source.node.clone())
            } else {
                (target, our.node.clone())
            };

            // If the target is not us, send a request to the target
            if target == &our.node {
                println!("{}: {}", source.node, message);
            } else {
                Request::new()
                    .target((target, "chat", "chat", "template.os"))
                    .body(body)
                    .send_and_await_response(5)??;
            }

            let new_message = ChatMessage {
                author: author.clone(),
                content: message.clone(),
            };

            // Insert message into archive, creating one for counterparty if it DNE
            message_archive
                .entry(counterparty.to_string())
                .or_insert(vec![new_message]);

            // If this is an HTTP request, handle the response in the calling function
            if !is_http {
                // If this is not an HTTP request, send a response to the other node
                Response::new()
                    .body(serde_json::to_vec(&ChatResponse::Send).unwrap())
                    .send()
                    .unwrap();
            }

            // Generate a blob for the new message
            let blob = LazyLoadBlob {
                mime: Some("application/json".to_string()),
                bytes: NewMessage {
                    chat: counterparty.to_string(),
                    author,
                    content: message.to_string(),
                }
                .into(),
            };

            // Send a WebSocket message to the http server in order to update the UI
            //send_ws_push(channel_id.clone(), WsMessageType::Text, blob);
            if let Some(server) = server {
                server.ws_push_all_channels(WS_PATH, WsMessageType::Text, blob);
                return Ok(None);
            }
            return Ok(Some(blob));
        }
        ChatRequest::History(ref node) => {
            Response::new()
                .body(ChatResponse::History(
                    message_archive
                        .get(node)
                        .map(|msgs| msgs.clone())
                        .unwrap_or_default(),
                ))
                .send()
                .unwrap();
            return Ok(None);
        }
    };
}

fn handle_message(
    our: &Address,
    message: &Message,
    message_archive: &mut MessageArchive,
    server: &mut HttpServer,
) -> anyhow::Result<()> {
    if !message.is_request() {
        info!("got response - {:?}", message);
        return Ok(());
    }

    let body = message.body();
    let source = message.source();

    if source == &make_http_address(our) {
        handle_http_server_request(our, body, message_archive, server)?;
    } else {
        handle_chat_request(our, source, body, false, message_archive, Some(server))?;
    }

    Ok(())
}

call_init!(init);
fn init(our: Address) {
    init_logging(&our, Level::DEBUG, Level::INFO, None, None).unwrap();
    info!("begin");

    let mut message_archive = HashMap::new();

    let mut server = HttpServer::new(5);

    // Bind UI files to routes; index.html is bound to "/"
    server
        .serve_ui(&our, "ui", vec!["/"], HttpBindingConfig::default())
        .expect("failed to serve UI");
    // Bind HTTP path /messages
    server
        .bind_http_path(HTTP_API_PATH, HttpBindingConfig::default())
        .expect("failed to bind messages API");
    server
        .bind_ws_path(WS_PATH, WsBindingConfig::default())
        .expect("failed to bind WS API");

    loop {
        match await_message() {
            Err(send_error) => error!("got SendError: {send_error}"),
            Ok(ref message) => {
                match handle_message(&our, message, &mut message_archive, &mut server) {
                    Ok(_) => {}
                    Err(e) => error!("got error while handling message: {e:?}"),
                }
            }
        }
    }
}

@nick1udwig nick1udwig mentioned this issue Nov 14, 2024
@nick1udwig
Copy link
Collaborator Author

This is what the code ended up as: https://github.com/kinode-dao/kit/blob/master/src/new/templates/rust/ui/chat/chat/src/lib.rs. Note that I end up matching on the HTTP server request here rather than using server.handle_request() and it works fine:

https://github.com/kinode-dao/kit/blob/master/src/new/templates/rust/ui/chat/chat/src/lib.rs#L51-L112

@nick1udwig
Copy link
Collaborator Author

Addressed 1 in #112; 2 still outstanding

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant