Skip to content

Commit

Permalink
feat(jetsocat): Windows named pipes and Unix sockets (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
CBenoit authored Sep 24, 2024
1 parent 3b3a079 commit b13caba
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 4 deletions.
2 changes: 1 addition & 1 deletion jetsocat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ seahorse = "2.2"
humantime = "2.1"

# async
tokio = { version = "1.38", features = ["io-std", "io-util", "net", "fs", "time", "rt", "sync", "process", "rt-multi-thread", "macros"] }
tokio = { version = "1.38", features = ["io-std", "io-util", "net", "fs", "signal", "time", "rt", "sync", "process", "rt-multi-thread", "macros"] }
tokio-tungstenite = "0.21"
futures-util = "0.3"
transport = { path = "../crates/transport" }
Expand Down
43 changes: 40 additions & 3 deletions jetsocat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ pub fn run<F: Future<Output = anyhow::Result<()>>>(f: F) -> anyhow::Result<()> {
.build()
.context("runtime build failed")?;

match rt.block_on(f) {
match rt.block_on(async {
tokio::select! {
res = f => res,
res = tokio::signal::ctrl_c() => res.context("ctrl-c event"),
}
}) {
Ok(()) => info!("Terminated successfully"),
Err(e) => {
error!("{:#}", e);
return Err(e);
}
}

rt.shutdown_timeout(std::time::Duration::from_millis(100)); // just to be safe
rt.shutdown_timeout(std::time::Duration::from_millis(100)); // Just to be safe.

Ok(())
}
Expand All @@ -111,7 +116,11 @@ const PIPE_FORMATS: &str = r#"Pipe formats:
`jet-tcp-accept://<ADDRESS>/<ASSOCIATION ID>/<CANDIDATE ID>`: TCP stream over JET protocol as server
`ws://<URL>`: WebSocket
`wss://<URL>`: WebSocket Secure
`ws-listen://<BINDING ADDRESS>`: WebSocket listener"#;
`ws-listen://<BINDING ADDRESS>`: WebSocket listener
`np://<SERVER NAME>/pipe/<PIPE NAME>: Connect to a named pipe (Windows)`
`np-listen://./pipe/<PIPE NAME>`: Open a named pipe and listen on it (Windows)
`np://<UNIX SOCKET PATH>: Connect to a UNIX socket (non-Windows)`
`np-listen://<UNIX SOCKET PATH>: Create a UNIX socket and listen on it (non-Windows)`"#;

// forward

Expand Down Expand Up @@ -515,6 +524,34 @@ fn parse_pipe_mode(arg: String) -> anyhow::Result<PipeMode> {
"ws-listen" => Ok(PipeMode::WebSocketListen {
bind_addr: value.to_owned(),
}),
"np" => {
#[cfg(windows)]
{
Ok(PipeMode::NamedPipe {
name: format!("\\\\{}", value.replace('/', "\\")),
})
}
#[cfg(unix)]
{
Ok(PipeMode::UnixSocket {
path: PathBuf::from(value.to_owned()),
})
}
}
"np-listen" => {
#[cfg(windows)]
{
Ok(PipeMode::NamedPipeListen {
name: format!("\\\\{}", value.replace('/', "\\")),
})
}
#[cfg(unix)]
{
Ok(PipeMode::UnixSocketListen {
path: PathBuf::from(value.to_owned()),
})
}
}
_ => anyhow::bail!("Unknown pipe scheme: {}", scheme),
}
}
Expand Down
112 changes: 112 additions & 0 deletions jetsocat/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ pub enum PipeMode {
WebSocketListen {
bind_addr: String,
},
#[cfg(windows)]
NamedPipe {
name: String,
},
#[cfg(windows)]
NamedPipeListen {
name: String,
},
#[cfg(unix)]
UnixSocket {
path: PathBuf,
},
#[cfg(unix)]
UnixSocketListen {
path: PathBuf,
},
}

pub struct Pipe {
Expand Down Expand Up @@ -282,6 +298,102 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
_handle: None,
})
}
#[cfg(windows)]
PipeMode::NamedPipe { name } => {
use std::time::Duration;
use tokio::net::windows::named_pipe::ClientOptions;
use tokio::time;

const ERROR_PIPE_BUSY: i32 = 231;

info!(%name, "Open named pipe...");

let named_pipe = loop {
match ClientOptions::new().open(&name) {
Ok(named_pipe) => break named_pipe,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY) => (),
Err(e) => return Err(anyhow::Error::new(e).context("named pipe open")),
}

time::sleep(Duration::from_millis(50)).await;
};

debug!("Connected");

Ok(Pipe {
name: "named-pipe",
stream: Box::new(named_pipe),
_handle: None,
})
}
#[cfg(windows)]
PipeMode::NamedPipeListen { name } => {
use tokio::net::windows::named_pipe::ServerOptions;

info!(%name, "Create named pipe...");

let named_pipe = ServerOptions::new()
.first_pipe_instance(true)
.create(&name)
.context("create named pipe")?;

info!(%name, "Wait for a client to connect");

named_pipe.connect().await.context("named pipe connect")?;

info!("Peer connected");

Ok(Pipe {
name: "named-pipe-listener",
stream: Box::new(named_pipe),
_handle: None,
})
}
#[cfg(unix)]
PipeMode::UnixSocket { path } => {
use tokio::net::UnixStream;

info!(path = %path.display(), "UNIX socket connect");

let stream = UnixStream::connect(path)
.await
.with_context(|| "UNIX socket connect failed")?;

debug!("Connected");

Ok(Pipe {
name: "unix-socket",
stream: Box::new(stream),
_handle: None,
})
}
#[cfg(unix)]
PipeMode::UnixSocketListen { path } => {
use tokio::net::UnixListener;

info!(path = %path.display(), "Listening on UNIX socket");

let listener = UnixListener::bind(&path).context("failed to bind UNIX listener")?;
let handle = Box::new(UnlinkSocketOnDrop(path));

let (socket, peer_addr) = listener.accept().await.context("UNIX listener couldn't accept")?;

info!(?peer_addr, "Accepted peer");

return Ok(Pipe {
name: "unix-socket-listener",
stream: Box::new(socket),
_handle: Some(handle),
});

struct UnlinkSocketOnDrop(PathBuf);

impl Drop for UnlinkSocketOnDrop {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
}
}
}

Expand Down

0 comments on commit b13caba

Please sign in to comment.