diff --git a/jetsocat/Cargo.toml b/jetsocat/Cargo.toml index 1b480643..920d7ea8 100644 --- a/jetsocat/Cargo.toml +++ b/jetsocat/Cargo.toml @@ -36,7 +36,7 @@ seahorse = "2.2" humantime = "2.1" # async -tokio = { version = "1.38", features = ["io-std", "io-util", "net", "fs", "time", "rt", "sync", "process", "rt-multi-thread", "macros"] } +tokio = { version = "1.38", features = ["io-std", "io-util", "net", "fs", "signal", "time", "rt", "sync", "process", "rt-multi-thread", "macros"] } tokio-tungstenite = "0.21" futures-util = "0.3" transport = { path = "../crates/transport" } diff --git a/jetsocat/src/main.rs b/jetsocat/src/main.rs index 1cca60c1..2d2858d3 100644 --- a/jetsocat/src/main.rs +++ b/jetsocat/src/main.rs @@ -77,7 +77,12 @@ pub fn run>>(f: F) -> anyhow::Result<()> { .build() .context("runtime build failed")?; - match rt.block_on(f) { + match rt.block_on(async { + tokio::select! { + res = f => res, + res = tokio::signal::ctrl_c() => res.context("ctrl-c event"), + } + }) { Ok(()) => info!("Terminated successfully"), Err(e) => { error!("{:#}", e); @@ -85,7 +90,7 @@ pub fn run>>(f: F) -> anyhow::Result<()> { } } - rt.shutdown_timeout(std::time::Duration::from_millis(100)); // just to be safe + rt.shutdown_timeout(std::time::Duration::from_millis(100)); // Just to be safe. Ok(()) } @@ -111,7 +116,11 @@ const PIPE_FORMATS: &str = r#"Pipe formats: `jet-tcp-accept://
//`: TCP stream over JET protocol as server `ws://`: WebSocket `wss://`: WebSocket Secure - `ws-listen://`: WebSocket listener"#; + `ws-listen://`: WebSocket listener + `np:///pipe/: Connect to a named pipe (Windows)` + `np-listen://./pipe/`: Open a named pipe and listen on it (Windows) + `np://: Connect to a UNIX socket (non-Windows)` + `np-listen://: Create a UNIX socket and listen on it (non-Windows)`"#; // forward @@ -515,6 +524,34 @@ fn parse_pipe_mode(arg: String) -> anyhow::Result { "ws-listen" => Ok(PipeMode::WebSocketListen { bind_addr: value.to_owned(), }), + "np" => { + #[cfg(windows)] + { + Ok(PipeMode::NamedPipe { + name: format!("\\\\{}", value.replace('/', "\\")), + }) + } + #[cfg(unix)] + { + Ok(PipeMode::UnixSocket { + path: PathBuf::from(value.to_owned()), + }) + } + } + "np-listen" => { + #[cfg(windows)] + { + Ok(PipeMode::NamedPipeListen { + name: format!("\\\\{}", value.replace('/', "\\")), + }) + } + #[cfg(unix)] + { + Ok(PipeMode::UnixSocketListen { + path: PathBuf::from(value.to_owned()), + }) + } + } _ => anyhow::bail!("Unknown pipe scheme: {}", scheme), } } diff --git a/jetsocat/src/pipe.rs b/jetsocat/src/pipe.rs index e540b7ce..2b837e55 100644 --- a/jetsocat/src/pipe.rs +++ b/jetsocat/src/pipe.rs @@ -39,6 +39,22 @@ pub enum PipeMode { WebSocketListen { bind_addr: String, }, + #[cfg(windows)] + NamedPipe { + name: String, + }, + #[cfg(windows)] + NamedPipeListen { + name: String, + }, + #[cfg(unix)] + UnixSocket { + path: PathBuf, + }, + #[cfg(unix)] + UnixSocketListen { + path: PathBuf, + }, } pub struct Pipe { @@ -282,6 +298,102 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option) -> Result _handle: None, }) } + #[cfg(windows)] + PipeMode::NamedPipe { name } => { + use std::time::Duration; + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::time; + + const ERROR_PIPE_BUSY: i32 = 231; + + info!(%name, "Open named pipe..."); + + let named_pipe = loop { + match ClientOptions::new().open(&name) { + Ok(named_pipe) => break named_pipe, + Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY) => (), + Err(e) => return Err(anyhow::Error::new(e).context("named pipe open")), + } + + time::sleep(Duration::from_millis(50)).await; + }; + + debug!("Connected"); + + Ok(Pipe { + name: "named-pipe", + stream: Box::new(named_pipe), + _handle: None, + }) + } + #[cfg(windows)] + PipeMode::NamedPipeListen { name } => { + use tokio::net::windows::named_pipe::ServerOptions; + + info!(%name, "Create named pipe..."); + + let named_pipe = ServerOptions::new() + .first_pipe_instance(true) + .create(&name) + .context("create named pipe")?; + + info!(%name, "Wait for a client to connect"); + + named_pipe.connect().await.context("named pipe connect")?; + + info!("Peer connected"); + + Ok(Pipe { + name: "named-pipe-listener", + stream: Box::new(named_pipe), + _handle: None, + }) + } + #[cfg(unix)] + PipeMode::UnixSocket { path } => { + use tokio::net::UnixStream; + + info!(path = %path.display(), "UNIX socket connect"); + + let stream = UnixStream::connect(path) + .await + .with_context(|| "UNIX socket connect failed")?; + + debug!("Connected"); + + Ok(Pipe { + name: "unix-socket", + stream: Box::new(stream), + _handle: None, + }) + } + #[cfg(unix)] + PipeMode::UnixSocketListen { path } => { + use tokio::net::UnixListener; + + info!(path = %path.display(), "Listening on UNIX socket"); + + let listener = UnixListener::bind(&path).context("failed to bind UNIX listener")?; + let handle = Box::new(UnlinkSocketOnDrop(path)); + + let (socket, peer_addr) = listener.accept().await.context("UNIX listener couldn't accept")?; + + info!(?peer_addr, "Accepted peer"); + + return Ok(Pipe { + name: "unix-socket-listener", + stream: Box::new(socket), + _handle: Some(handle), + }); + + struct UnlinkSocketOnDrop(PathBuf); + + impl Drop for UnlinkSocketOnDrop { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.0); + } + } + } } }