forked from plabayo/rama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp_conn_state.rs
158 lines (144 loc) · 4.85 KB
/
http_conn_state.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! An example to show how to wrap states as a way
//! to prepare typed state for inner layers, while keeping the
//! typed state of the outer layer.
//!
//! Examples where this can be useful is for caches that middlewares
//! might require for the life cycle of a connection, without it leaking
//! into the entire app (service) lifecycle.
//!
//! This example will create a server that listens on `127.0.0.1:8080`.
//!
//! # Run the example
//!
//! ```sh
//! cargo run --example http_conn_state
//! ```
//!
//! # Expected output
//!
//! The server will start and listen on `:8080`. You can use `curl` to check if the server is running:
//!
//! ```sh
//! curl -v http://127.0.0.1:8080
//! ```
//!
//! You should see an HTTP Status 200 OK with a HTML payload containing the
//! connection index and count of requests within that connection.
use rama::{
http::{response::Html, server::HttpServer, Request},
rt::Executor,
service::{context::AsRef, service_fn, Context, ServiceBuilder},
tcp::server::TcpListener,
};
use std::{
convert::Infallible,
sync::{
atomic::{AtomicBool, AtomicUsize},
Arc,
},
time::Duration,
};
#[derive(Debug, Default)]
struct AppMetrics {
connections: AtomicUsize,
}
#[derive(Debug)]
struct ConnMetrics {
/// connection index
pub index: usize,
/// amount of requests seen on this connection
pub requests: AtomicUsize,
}
#[derive(Debug, AsRef, Default)]
struct AppState {
/// metrics with the scope of the life cycle
pub app_metrics: AppMetrics,
}
#[derive(Debug, AsRef)]
struct ConnState {
#[as_ref(wrap)]
/// reference to app life cycle app state
app: Arc<AppState>,
/// global state injected directly into the connection state, true if app is alive
alive: Arc<AtomicBool>,
/// metrics with the scope of the connection
conn_metrics: ConnMetrics,
}
async fn handle_index<S>(ctx: Context<S>, _: Request) -> Result<Html<String>, Infallible>
where
// NOTE: This example is a bit silly, and only serves to show how one can use `AsRef`
// trait bounds regardless of how deep the state properties are "nested". In a production
// codebase however it probably makes more sense to work with the actual type
// for any non-generic middleware / service.
S: AsRef<AppMetrics> + AsRef<ConnMetrics> + AsRef<Arc<AtomicBool>> + Send + Sync + 'static,
{
let app_metrics: &AppMetrics = ctx.state().as_ref();
let conn_metrics: &ConnMetrics = ctx.state().as_ref();
let alive: &Arc<AtomicBool> = ctx.state().as_ref();
let conn_count = app_metrics
.connections
.load(std::sync::atomic::Ordering::SeqCst);
let request_count = conn_metrics
.requests
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
let is_alive = if alive.load(std::sync::atomic::Ordering::SeqCst) {
"yes"
} else {
"no"
};
let conn_index = conn_metrics.index;
Ok(Html(format!(
r##"
<html>
<head>
<title>Rama — Http Conn State</title>
</head>
<body>
<h1>Metrics</h1>
<p>Alive: {is_alive}
<p>Connection <code>{conn_index}</code> of <code>{conn_count}</code></p>
<p>Request Count: <code>{request_count}</code></p>
</body>
</html>"##
)))
}
#[tokio::main]
async fn main() {
let graceful = rama::graceful::Shutdown::default();
graceful.spawn_task_fn(|guard| async move {
let exec = Executor::graceful(guard.clone());
let tcp_http_service = HttpServer::auto(exec).service(service_fn(handle_index));
// example of data that can be stored as part of the state mapping closure
let alive = Arc::new(AtomicBool::new(true));
TcpListener::build_with_state(AppState::default())
.bind("127.0.0.1:8080")
.await
.expect("bind TCP Listener")
.serve_graceful(
guard,
ServiceBuilder::new()
.map_state(move |app: Arc<AppState>| {
let index = app
.app_metrics
.connections
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1;
Arc::new(ConnState {
app,
alive,
conn_metrics: ConnMetrics {
index,
requests: AtomicUsize::new(0),
},
})
})
.service(tcp_http_service),
)
.await;
});
graceful
.shutdown_with_limit(Duration::from_secs(30))
.await
.expect("graceful shutdown");
}