diff --git a/src/sync/server.rs b/src/sync/server.rs index a8e5883d..554d77eb 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -385,7 +385,11 @@ impl Server { .spawn(move || { loop { trace!("listening..."); - let pipe_connection = match listener.accept(&listener_quit_flag) { + if listener_quit_flag.load(Ordering::SeqCst) { + info!("listener shutdown for quit flag"); + break; + } + let pipe_connection = match listener.accept() { Ok(None) => { continue; } @@ -394,7 +398,7 @@ impl Server { } Err(e) => { error!("listener accept got {:?}", e); - break; + continue; } }; diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 3fdf47b8..f7d94624 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -21,8 +21,6 @@ use std::os::unix::prelude::AsRawFd; use nix::Error; use nix::unistd::*; -use std::sync::{Arc}; -use std::sync::atomic::{AtomicBool, Ordering}; use crate::common::{self, client_connect, SOCK_CLOEXEC}; #[cfg(target_os = "macos")] use crate::common::set_fd_close_exec; @@ -81,11 +79,7 @@ impl PipeListener { // - Ok(Some(PipeConnection)) if a new connection is established // - Ok(None) if spurious wake up with no new connection // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept( &self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { let mut pollers = vec![ libc::pollfd { fd: self.monitor_fd.0, @@ -117,16 +111,13 @@ impl PipeListener { error!("fatal error in listener_loop:{:?}", err); return Err(err); } else if returned < 1 { - return Ok(None) + return Ok(None); } if pollers[0].revents != 0 || pollers[pollers.len() - 1].revents == 0 { return Ok(None); } - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } #[cfg(target_os = "linux")] let fd = match accept4(self.fd, SockFlag::SOCK_CLOEXEC) { diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index e416c189..77ee40b8 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -1,36 +1,42 @@ /* - Copyright The containerd Authors. + Copyright The containerd Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ -use crate::error::Result; use crate::error::Error; +use crate::error::Result; use std::cell::UnsafeCell; use std::ffi::OsStr; use std::fs::OpenOptions; +use std::io; use std::os::windows::ffi::OsStrExt; use std::os::windows::fs::OpenOptionsExt; -use std::os::windows::io::{IntoRawHandle}; +use std::os::windows::io::IntoRawHandle; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc}; -use std::{io}; -use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE }; -use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX }; -use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED }; -use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS }; +use windows_sys::Win32::Foundation::{ + CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE, +}; +use windows_sys::Win32::Storage::FileSystem::{ + ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, +}; +use windows_sys::Win32::System::Pipes::{ + ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_REJECT_REMOTE_CLIENTS, + PIPE_UNLIMITED_INSTANCES, PIPE_WAIT, +}; use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent}; +use windows_sys::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; const PIPE_BUFFER_SIZE: u32 = 65536; const WAIT_FOR_EVENT: i32 = 1; @@ -48,7 +54,7 @@ struct Overlapped { } impl Overlapped { - fn new_with_event(event: isize) -> Overlapped { + fn new_with_event(event: isize) -> Overlapped { let mut ol = Overlapped { inner: UnsafeCell::new(unsafe { std::mem::zeroed() }), }; @@ -68,21 +74,14 @@ impl PipeListener { first_instance: AtomicBool::new(true), shutting_down: AtomicBool::new(false), address: sockaddr.to_string(), - connection_event + connection_event, }) } // accept returns: // - Ok(Some(PipeConnection)) if a new connection is established // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept(&self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new( - io::ErrorKind::Other, - "listener shutdown for quit flag", - )); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { // Create a new pipe instance for every new client let instance = self.new_instance()?; let np = match PipeConnection::new(instance) { @@ -94,11 +93,11 @@ impl PipeListener { )); } }; - + let ol = Overlapped::new_with_event(self.connection_event); trace!("listening for connection"); - let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())}; + let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr()) }; if result != 0 { if let Some(error) = self.handle_shutdown(&np) { return Err(error); @@ -109,7 +108,14 @@ impl PipeListener { match io::Error::last_os_error() { e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => { let mut bytes_transfered = 0; - let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; + let res = unsafe { + GetOverlappedResult( + np.named_pipe, + ol.as_mut_ptr(), + &mut bytes_transfered, + WAIT_FOR_EVENT, + ) + }; match res { 0 => { return Err(io::Error::last_os_error()); @@ -139,11 +145,9 @@ impl PipeListener { fn handle_shutdown(&self, np: &PipeConnection) -> Option { if self.shutting_down.load(Ordering::SeqCst) { - np.close().unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err)); - return Some(io::Error::new( - io::ErrorKind::Other, - "closing pipe", - )); + np.close() + .unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err)); + return Some(io::Error::new(io::ErrorKind::Other, "closing pipe")); } None } @@ -154,7 +158,7 @@ impl PipeListener { .chain(Some(0)) // add NULL termination .collect::>(); - let mut open_mode = PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED ; + let mut open_mode = PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED; if self.first_instance.load(Ordering::SeqCst) { open_mode |= FILE_FLAG_FIRST_PIPE_INSTANCE; @@ -192,18 +196,22 @@ pub struct PipeConnection { // This is required since a read and write be issued at the same time on a given named pipe instance. // // An event is created for the read and write operations. When the read or write is issued -// it either returns immediately or the thread is suspended until the event is signaled when +// it either returns immediately or the thread is suspended until the event is signaled when // the overlapped (async) operation completes and the event is triggered allow the thread to continue. -// -// Due to the implementation of the sync Server and client there is always only one read and one write +// +// Due to the implementation of the sync Server and client there is always only one read and one write // operation in flight at a time so we can reuse the same event. -// +// // For more information on overlapped and events: https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getoverlappedresult#remarks -// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device." +// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device." // "In this situation, there is no way to know which operation caused the object's state to be signaled." impl PipeConnection { pub(crate) fn new(h: isize) -> Result { - trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32); + trace!( + "creating events for thread {:?} on pipe instance {}", + std::thread::current().id(), + h as i32 + ); let read_event = create_event()?; let write_event = create_event()?; Ok(PipeConnection { @@ -218,22 +226,41 @@ impl PipeConnection { } pub fn read(&self, buf: &mut [u8]) -> Result { - trace!("starting read for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32); + trace!( + "starting read for thread {:?} on pipe instance {}", + std::thread::current().id(), + self.named_pipe as i32 + ); let ol = Overlapped::new_with_event(self.read_event); let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; - let mut bytes_read= 0; - let result = unsafe { ReadFile(self.named_pipe, buf.as_mut_ptr() as *mut _, len, &mut bytes_read,ol.as_mut_ptr()) }; + let mut bytes_read = 0; + let result = unsafe { + ReadFile( + self.named_pipe, + buf.as_mut_ptr() as *mut _, + len, + &mut bytes_read, + ol.as_mut_ptr(), + ) + }; if result > 0 && bytes_read > 0 { // Got result no need to wait for pending read to complete - return Ok(bytes_read as usize) + return Ok(bytes_read as usize); } // wait for pending operation to complete (thread will be suspended until event is signaled) match io::Error::last_os_error() { ref e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => { let mut bytes_transfered = 0; - let res = unsafe {GetOverlappedResult(self.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; + let res = unsafe { + GetOverlappedResult( + self.named_pipe, + ol.as_mut_ptr(), + &mut bytes_transfered, + WAIT_FOR_EVENT, + ) + }; match res { 0 => { return Err(handle_windows_error(io::Error::last_os_error())) @@ -250,21 +277,40 @@ impl PipeConnection { } pub fn write(&self, buf: &[u8]) -> Result { - trace!("starting write for thread {:?} on pipe instance {}", std::thread::current().id(), self.named_pipe as i32); + trace!( + "starting write for thread {:?} on pipe instance {}", + std::thread::current().id(), + self.named_pipe as i32 + ); let ol = Overlapped::new_with_event(self.write_event); let mut bytes_written = 0; let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32; - let result = unsafe { WriteFile(self.named_pipe, buf.as_ptr() as *const _,len, &mut bytes_written, ol.as_mut_ptr())}; + let result = unsafe { + WriteFile( + self.named_pipe, + buf.as_ptr() as *const _, + len, + &mut bytes_written, + ol.as_mut_ptr(), + ) + }; if result > 0 && bytes_written > 0 { // No need to wait for pending write to complete - return Ok(bytes_written as usize) + return Ok(bytes_written as usize); } // wait for pending operation to complete (thread will be suspended until event is signaled) match io::Error::last_os_error() { ref e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => { let mut bytes_transfered = 0; - let res = unsafe {GetOverlappedResult(self.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; + let res = unsafe { + GetOverlappedResult( + self.named_pipe, + ol.as_mut_ptr(), + &mut bytes_transfered, + WAIT_FOR_EVENT, + ) + }; match res { 0 => { return Err(handle_windows_error(io::Error::last_os_error())) @@ -296,7 +342,7 @@ impl PipeConnection { } pub struct ClientConnection { - address: String + address: String, } fn close_handle(handle: isize) -> Result<()> { @@ -328,14 +374,14 @@ impl ClientConnection { Ok(ClientConnection::new(sockaddr)) } - pub(crate) fn new(sockaddr: &str) -> ClientConnection { + pub(crate) fn new(sockaddr: &str) -> ClientConnection { ClientConnection { - address: sockaddr.to_string() + address: sockaddr.to_string(), } } pub fn ready(&self) -> std::result::Result, io::Error> { - // Windows is a "completion" based system so "readiness" isn't really applicable + // Windows is a "completion" based system so "readiness" isn't really applicable Ok(Some(())) } @@ -376,6 +422,7 @@ fn handle_windows_error(e: io::Error) -> Error { #[cfg(test)] mod test { use super::*; + use std::sync::Arc; use windows_sys::Win32::Foundation::ERROR_FILE_NOT_FOUND; #[test] @@ -398,8 +445,7 @@ mod test { let listener_server = listener.clone(); let thread = std::thread::spawn(move || { - let quit_flag = Arc::new(AtomicBool::new(false)); - match listener_server.accept(&quit_flag) { + match listener_server.accept() { Ok(Some(_)) => { // pipe is working } @@ -443,9 +489,7 @@ mod test { fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> { for _i in 0..count { let client = match ClientConnection::client_connect(address) { - Ok(c) => { - c - } + Ok(c) => c, Err(_) => { std::thread::sleep(std::time::Duration::from_millis(interval_in_ms)); continue; @@ -462,5 +506,5 @@ mod test { } } Err(Error::Others("timed out".to_string())) - } + } }