Skip to content

Commit

Permalink
Add UDS support
Browse files Browse the repository at this point in the history
  • Loading branch information
yzsolt committed Dec 13, 2024
1 parent e50334c commit 4644ed8
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 12 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Once the handler sends a request, these settings become immutable and cannot be
|Http2MaxConcurrentResetStreams|Gets or sets the maximum number of HTTP2 concurrent locally reset streams. See the documentation of h2::client::Builder::max_concurrent_reset_streams for more details. The default value is determined by the h2 crate.|
|Http2MaxSendBufferSize|Gets or sets the maximum write buffer size for each HTTP/2 stream. Default is currently 1MB, but may change.|
|Http2InitialMaxSendStreams|Gets or sets the initial maximum of locally initiated (send) streams. This value will be overwritten by the value included in the initial SETTINGS frame received from the peer as part of a connection preface.|
|UnixDomainSocketPath|Gets or sets the path to a Unix Domain Socket to be used as HTTP communication channel instead of the default TCP.|

Most of them expose [hyper client settings](https://docs.rs/hyper-util/latest/hyper_util/client/legacy/struct.Builder.html), so please check those as well.

Expand Down Expand Up @@ -301,6 +302,22 @@ using var httpHandler = new YetAnotherHttpHandler()
};
```

### Using Unix Domain Sockets as HTTP transport layer

Unix Domain Sockets can be used as the HTTP transport layer for local usecases (e.g. IPC based on gRPC), instead of network-based TCP.

Set the `UnixDomainSocketPath` property to enable UDS-based communication to a server listening at the given path:

```csharp
using var handler = new YetAnotherHttpHandler() { Http2Only = true, UnixDomainSocketPath = "/tmp/example.sock" };
using var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions() { HttpHandler = handler });
```

Note:
- HTTPS is not supported over UDS. All HTTPS related configuration properties are ignored if UDS is enabled.
- The grpc-dotnet library doesn't handle non-HTTP schemes (like "unix://"), so keep passing an HTTP URI to `GrpcChannel`, e.g. http://localhost.
The actual HTTP requests will be redirected to `UnixDomainSocketPath` by YetAnotherHttpHandler internally.

## Development
### Build & Tests

Expand Down
31 changes: 30 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/yaha_native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ http-body-util = "0.1.1"
tokio-stream = "0.1.15"
futures-channel = "0.3.30"
futures-util = "0.3.30"

hyperlocal = "0.9.1"

[features]
default = [ "rustls" ]
Expand Down
17 changes: 14 additions & 3 deletions native/yaha_native/src/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,18 @@ pub extern "C" fn yaha_client_config_http2_initial_max_send_streams(
.http2_initial_max_send_streams(initial);
}

#[no_mangle]
pub extern "C" fn yaha_client_config_unix_domain_socket_path(
ctx: *mut YahaNativeContext,
uds_path: *const StringBuffer,
) {
let ctx = YahaNativeContextInternal::from_raw_context(ctx);

let uds_socket_path = unsafe { (*uds_path).to_str() };
ctx.uds_socket_path.get_or_insert(uds_socket_path.into());
}


#[no_mangle]
pub extern "C" fn yaha_build_client(ctx: *mut YahaNativeContext) {
let ctx = YahaNativeContextInternal::from_raw_context(ctx);
Expand Down Expand Up @@ -497,8 +509,7 @@ pub extern "C" fn yaha_request_begin(
(req_ctx.seq, builder.body(body).unwrap())
};

//
if ctx.client.as_ref().is_none() {
if ctx.tcp_client.is_none() && ctx.uds_client.is_none() {
LAST_ERROR.with(|v| {
*v.borrow_mut() = Some("The client has not been built. You need to build it before sending the request. ".to_string());
});
Expand All @@ -512,7 +523,7 @@ pub extern "C" fn yaha_request_begin(
(ctx.on_complete)(seq, state, CompletionReason::Aborted, 0);
return;
}
res = ctx.client.as_ref().unwrap().request(req) => {
res = ctx.request(req) => {
if let Err(err) = res {
complete_with_error(ctx, seq, state, err);
return;
Expand Down
44 changes: 37 additions & 7 deletions native/yaha_native/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ use std::{
num::NonZeroIsize,
sync::{Arc, Mutex},
time::Duration,
path::PathBuf,
};
use futures_channel::mpsc::Sender;
use http_body_util::combinators::BoxBody;
use tokio::runtime::{Handle, Runtime};

use hyper::{
body::Bytes,
StatusCode,
body::Bytes,
Request, StatusCode
};

use hyper_util::{
client::{self, legacy::{connect::HttpConnector, Client}},
client::{self, legacy::{connect::HttpConnector, Client, ResponseFuture}},
rt::{TokioExecutor, TokioTimer},
};

Expand All @@ -23,6 +24,7 @@ use hyper_rustls::ConfigBuilderExt;
use hyper_rustls::HttpsConnector;
#[cfg(feature = "native")]
use hyper_tls::HttpsConnector;
use hyperlocal::UnixConnector;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -61,10 +63,12 @@ pub struct YahaNativeContextInternal<'a> {
pub connect_timeout: Option<Duration>,
pub client_auth_certificates: Option<Vec<CertificateDer<'a>>>,
pub client_auth_key: Option<PrivateKeyDer<'a>>,
pub client: Option<Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, hyper::Error>>>,
pub tcp_client: Option<Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, hyper::Error>>>,
pub on_status_code_and_headers_receive: OnStatusCodeAndHeadersReceive,
pub on_receive: OnReceive,
pub on_complete: OnComplete,
pub uds_client: Option<Client<UnixConnector, BoxBody<Bytes, hyper::Error>>>,
pub uds_socket_path: Option<PathBuf>,
}

impl YahaNativeContextInternal<'_> {
Expand All @@ -80,7 +84,8 @@ impl YahaNativeContextInternal<'_> {
) -> Self {
YahaNativeContextInternal {
runtime: runtime_handle,
client: None,
tcp_client: None,
uds_client: None,
client_builder: Some(Client::builder(TokioExecutor::new())),
skip_certificate_verification: None,
server_certificate_verification_handler: None,
Expand All @@ -92,13 +97,20 @@ impl YahaNativeContextInternal<'_> {
on_status_code_and_headers_receive,
on_receive,
on_complete,
uds_socket_path: None,
}
}

pub fn build_client(&mut self) {
let mut builder = self.client_builder.take().unwrap();
let https = self.new_connector();
self.client = Some(builder.timer(TokioTimer::new()).build(https));
builder.timer(TokioTimer::new());

if self.uds_socket_path.is_some() {
self.uds_client = Some(builder.build(UnixConnector));
} else {
let https = self.new_connector();
self.tcp_client = Some(builder.build(https));
}
}

#[cfg(feature = "rustls")]
Expand Down Expand Up @@ -180,6 +192,24 @@ impl YahaNativeContextInternal<'_> {
let https = HttpsConnector::new();
https
}

pub fn request(&self, mut req: Request<BoxBody<Bytes, hyper::Error>>) -> ResponseFuture {
// Precondition (`uds_client` or `tcp_client` is set) ensured by `Self::build_client` and `yaha_request_begin`
if let Some(uds_socket_path) = &self.uds_socket_path {
// Transform HTTP URIs to the format expected by hyperlocal
let path_and_query = req
.uri()
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("/");
let uds_uri = hyperlocal::Uri::new(uds_socket_path, path_and_query);
*req.uri_mut() = uds_uri.into();

self.uds_client.as_ref().unwrap().request(req)
} else {
self.tcp_client.as_ref().unwrap().request(req)
}
}
}

#[cfg(feature = "rustls")]
Expand Down
10 changes: 10 additions & 0 deletions src/YetAnotherHttpHandler/NativeHttpHandlerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ private unsafe void Initialize(YahaNativeContext* ctx, NativeClientSettings sett
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"Option '{nameof(settings.Http2InitialMaxSendStreams)}' = {http2InitialMaxSendStreams}");
NativeMethods.yaha_client_config_http2_initial_max_send_streams(ctx, (nuint)http2InitialMaxSendStreams);
}
if (settings.UnixDomainSocketPath is { } unixDomainSocketPath)
{
if (YahaEventSource.Log.IsEnabled()) YahaEventSource.Log.Info($"Option '{nameof(settings.UnixDomainSocketPath)}' = {unixDomainSocketPath}");
var strBytes = Encoding.UTF8.GetBytes(unixDomainSocketPath);
fixed (byte* buffer = strBytes)
{
var sb = new StringBuffer(buffer, strBytes.Length);
NativeMethods.yaha_client_config_unix_domain_socket_path(ctx, &sb);
}
}

NativeMethods.yaha_build_client(ctx);

Expand Down
3 changes: 3 additions & 0 deletions src/YetAnotherHttpHandler/NativeMethods.Uwp.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ internal static unsafe partial class NativeMethods
[DllImport(__DllName, EntryPoint = "yaha_client_config_http2_initial_max_send_streams", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_client_config_http2_initial_max_send_streams(YahaNativeContext* ctx, nuint initial);

[DllImport(__DllName, EntryPoint = "yaha_client_config_unix_domain_socket_path", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_client_config_unix_domain_socket_path(YahaNativeContext* ctx, StringBuffer* uds_path);

[DllImport(__DllName, EntryPoint = "yaha_build_client", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_build_client(YahaNativeContext* ctx);

Expand Down
3 changes: 3 additions & 0 deletions src/YetAnotherHttpHandler/NativeMethods.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ internal static unsafe partial class NativeMethods
[DllImport(__DllName, EntryPoint = "yaha_client_config_http2_initial_max_send_streams", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_client_config_http2_initial_max_send_streams(YahaNativeContext* ctx, nuint initial);

[DllImport(__DllName, EntryPoint = "yaha_client_config_unix_domain_socket_path", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_client_config_unix_domain_socket_path(YahaNativeContext* ctx, StringBuffer* uds_path);

[DllImport(__DllName, EntryPoint = "yaha_build_client", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void yaha_build_client(YahaNativeContext* ctx);

Expand Down
7 changes: 7 additions & 0 deletions src/YetAnotherHttpHandler/YetAnotherHttpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public class YetAnotherHttpHandler : HttpMessageHandler
/// </remarks>
public ulong? Http2InitialMaxSendStreams { get => _settings.Http2InitialMaxSendStreams; set => _settings.Http2InitialMaxSendStreams = value; }

/// <summary>
/// Gets or sets the path to a Unix Domain Socket to be used as HTTP communication channel instead of the default TCP.
/// </summary>
public string? UnixDomainSocketPath { get => _settings.UnixDomainSocketPath; set => _settings.UnixDomainSocketPath = value; }

private NativeHttpHandlerCore SetupHandler()
{
var settings = _settings.Clone();
Expand Down Expand Up @@ -236,6 +241,7 @@ internal class NativeClientSettings
public ulong? Http2MaxConcurrentResetStreams { get; set; }
public ulong? Http2MaxSendBufferSize { get; set; }
public ulong? Http2InitialMaxSendStreams { get; set; }
public string? UnixDomainSocketPath { get; set; }

public NativeClientSettings Clone()
{
Expand All @@ -261,6 +267,7 @@ public NativeClientSettings Clone()
Http2MaxConcurrentResetStreams = this.Http2MaxConcurrentResetStreams,
Http2MaxSendBufferSize = this.Http2MaxSendBufferSize,
Http2InitialMaxSendStreams = this.Http2InitialMaxSendStreams,
UnixDomainSocketPath = this.UnixDomainSocketPath,
};
}
}
Expand Down

0 comments on commit 4644ed8

Please sign in to comment.