-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathgraceful_shutdown.rs
95 lines (82 loc) · 4.04 KB
/
graceful_shutdown.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#![deny(warnings)]
use std::convert::Infallible;
use std::net::SocketAddr;
use std::time::Duration;
use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use tokio::net::TcpListener;
use tokio::pin;
#[path = "../benches/support/mod.rs"]
mod support;
use support::TokioIo;
// An async function that consumes a request, does nothing with it and returns a
// response.
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
// Sleep for 6 seconds to simulate long processing.
// This is longer than the initial 5 second connection timeout,
// but within the 2 second graceful shutdown timeout.
println!("in hello before sleep");
tokio::time::sleep(Duration::from_secs(6)).await;
println!("in hello after sleep");
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pretty_env_logger::init();
// This address is localhost
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
// Use a 5 second timeout for incoming connections to the server.
// If a request is in progress when the 5 second timeout elapses,
// use a 2 second timeout for processing the final request and graceful shutdown.
let connection_timeouts = vec![Duration::from_secs(5), Duration::from_secs(2)];
// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);
loop {
// When an incoming TCP connection is received grab a TCP stream for
// client<->server communication.
let (tcp, remote_address) = listener.accept().await?;
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(tcp);
// Print the remote address connecting to our server.
println!("accepted connection from {:?}", remote_address);
// Clone the connection_timeouts so they can be passed to the new task.
let connection_timeouts_clone = connection_timeouts.clone();
// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
// current task without waiting for the processing of the HTTP1 connection we just received
// to finish
tokio::task::spawn(async move {
// Pin the connection object so we can use tokio::select! below.
let conn = http1::Builder::new().serve_connection(io, service_fn(hello));
pin!(conn);
// Iterate the timeouts. Use tokio::select! to wait on the
// result of polling the connection itself,
// and also on tokio::time::sleep for the current timeout duration.
for (iter, sleep_duration) in connection_timeouts_clone.iter().enumerate() {
println!("iter = {} sleep_duration = {:?}", iter, sleep_duration);
tokio::select! {
res = conn.as_mut() => {
// Polling the connection returned a result.
// In this case print either the successful or error result for the connection
// and break out of the loop.
match res {
Ok(()) => println!("after polling conn, no error"),
Err(e) => println!("error serving connection: {:?}", e),
};
break;
}
_ = tokio::time::sleep(*sleep_duration) => {
// tokio::time::sleep returned a result.
// Call graceful_shutdown on the connection and continue the loop.
println!("iter = {} got timeout_interval, calling conn.graceful_shutdown", iter);
conn.as_mut().graceful_shutdown();
}
}
}
});
}
}