Skip to content

Commit

Permalink
server: fix server exit once a accept failed
Browse files Browse the repository at this point in the history
If the Accept error occurs, an error can be output to ensure that the
subsequent connect can be accepted normally.

Fixes: #239
Signed-off-by: Quanwei Zhou <[email protected]>
  • Loading branch information
quanweiZhou authored and quanwei.zqw committed Sep 23, 2024
1 parent b6e643f commit 9259662
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 75 deletions.
8 changes: 6 additions & 2 deletions src/sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ impl Server {
.spawn(move || {
loop {
trace!("listening...");
let pipe_connection = match listener.accept(&listener_quit_flag) {
if listener_quit_flag.load(Ordering::SeqCst) {
info!("listener shutdown for quit flag");
break;
}
let pipe_connection = match listener.accept() {
Ok(None) => {
continue;
}
Expand All @@ -394,7 +398,7 @@ impl Server {
}
Err(e) => {
error!("listener accept got {:?}", e);
break;
continue;
}
};

Expand Down
13 changes: 2 additions & 11 deletions src/sync/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::os::unix::prelude::AsRawFd;
use nix::Error;

use nix::unistd::*;
use std::sync::{Arc};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::common::{self, client_connect, SOCK_CLOEXEC};
#[cfg(target_os = "macos")]
use crate::common::set_fd_close_exec;
Expand Down Expand Up @@ -81,11 +79,7 @@ impl PipeListener {
// - Ok(Some(PipeConnection)) if a new connection is established
// - Ok(None) if spurious wake up with no new connection
// - Err(io::Error) if there is an error and listener loop should be shutdown
pub(crate) fn accept( &self, quit_flag: &Arc<AtomicBool>) -> std::result::Result<Option<PipeConnection>, io::Error> {
if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag"));
}

pub(crate) fn accept(&self) -> std::result::Result<Option<PipeConnection>, io::Error> {
let mut pollers = vec![
libc::pollfd {
fd: self.monitor_fd.0,
Expand Down Expand Up @@ -117,16 +111,13 @@ impl PipeListener {
error!("fatal error in listener_loop:{:?}", err);
return Err(err);
} else if returned < 1 {
return Ok(None)
return Ok(None);
}

if pollers[0].revents != 0 || pollers[pollers.len() - 1].revents == 0 {
return Ok(None);
}

if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag"));
}

#[cfg(target_os = "linux")]
let fd = match accept4(self.fd, SockFlag::SOCK_CLOEXEC) {
Expand Down
168 changes: 106 additions & 62 deletions src/sync/sys/windows/net.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
/*
Copyright The containerd Authors.
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use crate::error::Result;
use crate::error::Error;
use crate::error::Result;
use std::cell::UnsafeCell;
use std::ffi::OsStr;
use std::fs::OpenOptions;
use std::io;
use std::os::windows::ffi::OsStrExt;
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{IntoRawHandle};
use std::os::windows::io::IntoRawHandle;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc};
use std::{io};

use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE };
use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX };
use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED };
use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS };
use windows_sys::Win32::Foundation::{
CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE,
};
use windows_sys::Win32::Storage::FileSystem::{
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
};
use windows_sys::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_REJECT_REMOTE_CLIENTS,
PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
};
use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent};
use windows_sys::Win32::System::IO::{GetOverlappedResult, OVERLAPPED};

const PIPE_BUFFER_SIZE: u32 = 65536;
const WAIT_FOR_EVENT: i32 = 1;
Expand All @@ -48,7 +54,7 @@ struct Overlapped {
}

impl Overlapped {
fn new_with_event(event: isize) -> Overlapped {
fn new_with_event(event: isize) -> Overlapped {
let mut ol = Overlapped {
inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
};
Expand All @@ -68,21 +74,14 @@ impl PipeListener {
first_instance: AtomicBool::new(true),
shutting_down: AtomicBool::new(false),
address: sockaddr.to_string(),
connection_event
connection_event,
})
}

// accept returns:
// - Ok(Some(PipeConnection)) if a new connection is established
// - Err(io::Error) if there is an error and listener loop should be shutdown
pub(crate) fn accept(&self, quit_flag: &Arc<AtomicBool>) -> std::result::Result<Option<PipeConnection>, io::Error> {
if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(
io::ErrorKind::Other,
"listener shutdown for quit flag",
));
}

pub(crate) fn accept(&self) -> std::result::Result<Option<PipeConnection>, io::Error> {
// Create a new pipe instance for every new client
let instance = self.new_instance()?;
let np = match PipeConnection::new(instance) {
Expand All @@ -94,11 +93,11 @@ impl PipeListener {
));
}
};

let ol = Overlapped::new_with_event(self.connection_event);

trace!("listening for connection");
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())};
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr()) };
if result != 0 {
if let Some(error) = self.handle_shutdown(&np) {
return Err(error);
Expand All @@ -109,7 +108,14 @@ impl PipeListener {
match io::Error::last_os_error() {
e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
let mut bytes_transfered = 0;
let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
let res = unsafe {
GetOverlappedResult(
np.named_pipe,
ol.as_mut_ptr(),
&mut bytes_transfered,
WAIT_FOR_EVENT,
)
};
match res {
0 => {
return Err(io::Error::last_os_error());
Expand Down Expand Up @@ -139,11 +145,9 @@ impl PipeListener {

fn handle_shutdown(&self, np: &PipeConnection) -> Option<io::Error> {
if self.shutting_down.load(Ordering::SeqCst) {
np.close().unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err));
return Some(io::Error::new(
io::ErrorKind::Other,
"closing pipe",
));
np.close()
.unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err));
return Some(io::Error::new(io::ErrorKind::Other, "closing pipe"));
}
None
}
Expand All @@ -154,7 +158,7 @@ impl PipeListener {
.chain(Some(0)) // add NULL termination
.collect::<Vec<_>>();

let mut open_mode = PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED ;
let mut open_mode = PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED;

if self.first_instance.load(Ordering::SeqCst) {
open_mode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
Expand Down Expand Up @@ -192,18 +196,22 @@ pub struct PipeConnection {
// This is required since a read and write be issued at the same time on a given named pipe instance.
//
// An event is created for the read and write operations. When the read or write is issued
// it either returns immediately or the thread is suspended until the event is signaled when
// it either returns immediately or the thread is suspended until the event is signaled when
// the overlapped (async) operation completes and the event is triggered allow the thread to continue.
//
// Due to the implementation of the sync Server and client there is always only one read and one write
//
// Due to the implementation of the sync Server and client there is always only one read and one write
// operation in flight at a time so we can reuse the same event.
//
//
// For more information on overlapped and events: https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getoverlappedresult#remarks
// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device."
// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device."
// "In this situation, there is no way to know which operation caused the object's state to be signaled."
impl PipeConnection {
pub(crate) fn new(h: isize) -> Result<PipeConnection> {
trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32);
trace!(
"creating events for thread {:?} on pipe instance {}",
std::thread::current().id(),
h as i32
);
let read_event = create_event()?;
let write_event = create_event()?;
Ok(PipeConnection {
Expand All @@ -218,22 +226,41 @@ impl PipeConnection {
}

pub fn read(&self, buf: &mut [u8]) -> Result<usize> {
trace!("starting read for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32);
trace!(
"starting read for thread {:?} on pipe instance {}",
std::thread::current().id(),
self.named_pipe as i32
);
let ol = Overlapped::new_with_event(self.read_event);

let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
let mut bytes_read= 0;
let result = unsafe { ReadFile(self.named_pipe, buf.as_mut_ptr() as *mut _, len, &mut bytes_read,ol.as_mut_ptr()) };
let mut bytes_read = 0;
let result = unsafe {
ReadFile(
self.named_pipe,
buf.as_mut_ptr() as *mut _,
len,
&mut bytes_read,
ol.as_mut_ptr(),
)
};
if result > 0 && bytes_read > 0 {
// Got result no need to wait for pending read to complete
return Ok(bytes_read as usize)
return Ok(bytes_read as usize);
}

// wait for pending operation to complete (thread will be suspended until event is signaled)
match io::Error::last_os_error() {
ref e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
let mut bytes_transfered = 0;
let res = unsafe {GetOverlappedResult(self.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
let res = unsafe {
GetOverlappedResult(
self.named_pipe,
ol.as_mut_ptr(),
&mut bytes_transfered,
WAIT_FOR_EVENT,
)
};
match res {
0 => {
return Err(handle_windows_error(io::Error::last_os_error()))
Expand All @@ -250,21 +277,40 @@ impl PipeConnection {
}

pub fn write(&self, buf: &[u8]) -> Result<usize> {
trace!("starting write for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32);
trace!(
"starting write for thread {:?} on pipe instance {}",
std::thread::current().id(),
self.named_pipe as i32
);
let ol = Overlapped::new_with_event(self.write_event);
let mut bytes_written = 0;
let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
let result = unsafe { WriteFile(self.named_pipe, buf.as_ptr() as *const _,len, &mut bytes_written, ol.as_mut_ptr())};
let result = unsafe {
WriteFile(
self.named_pipe,
buf.as_ptr() as *const _,
len,
&mut bytes_written,
ol.as_mut_ptr(),
)
};
if result > 0 && bytes_written > 0 {
// No need to wait for pending write to complete
return Ok(bytes_written as usize)
return Ok(bytes_written as usize);
}

// wait for pending operation to complete (thread will be suspended until event is signaled)
match io::Error::last_os_error() {
ref e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
let mut bytes_transfered = 0;
let res = unsafe {GetOverlappedResult(self.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
let res = unsafe {
GetOverlappedResult(
self.named_pipe,
ol.as_mut_ptr(),
&mut bytes_transfered,
WAIT_FOR_EVENT,
)
};
match res {
0 => {
return Err(handle_windows_error(io::Error::last_os_error()))
Expand Down Expand Up @@ -296,7 +342,7 @@ impl PipeConnection {
}

pub struct ClientConnection {
address: String
address: String,
}

fn close_handle(handle: isize) -> Result<()> {
Expand Down Expand Up @@ -328,14 +374,14 @@ impl ClientConnection {
Ok(ClientConnection::new(sockaddr))
}

pub(crate) fn new(sockaddr: &str) -> ClientConnection {
pub(crate) fn new(sockaddr: &str) -> ClientConnection {
ClientConnection {
address: sockaddr.to_string()
address: sockaddr.to_string(),
}
}

pub fn ready(&self) -> std::result::Result<Option<()>, io::Error> {
// Windows is a "completion" based system so "readiness" isn't really applicable
// Windows is a "completion" based system so "readiness" isn't really applicable
Ok(Some(()))
}

Expand Down Expand Up @@ -376,6 +422,7 @@ fn handle_windows_error(e: io::Error) -> Error {
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use windows_sys::Win32::Foundation::ERROR_FILE_NOT_FOUND;

#[test]
Expand All @@ -398,8 +445,7 @@ mod test {

let listener_server = listener.clone();
let thread = std::thread::spawn(move || {
let quit_flag = Arc::new(AtomicBool::new(false));
match listener_server.accept(&quit_flag) {
match listener_server.accept() {
Ok(Some(_)) => {
// pipe is working
}
Expand Down Expand Up @@ -443,9 +489,7 @@ mod test {
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
let client = match ClientConnection::client_connect(address) {
Ok(c) => {
c
}
Ok(c) => c,
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(interval_in_ms));
continue;
Expand All @@ -462,5 +506,5 @@ mod test {
}
}
Err(Error::Others("timed out".to_string()))
}
}
}

0 comments on commit 9259662

Please sign in to comment.