Skip to content

Commit

Permalink
Merge pull request #242 from quanweiZhou/fix-server-accept-0.7
Browse files Browse the repository at this point in the history
server: fix server exit once a accept failed
  • Loading branch information
Tim-Zhang authored Sep 24, 2024
2 parents 30c5913 + 52e831b commit 9954fe3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 25 deletions.
8 changes: 6 additions & 2 deletions src/sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ impl Server {
.spawn(move || {
loop {
trace!("listening...");
let pipe_connection = match listener.accept(&listener_quit_flag) {
if listener_quit_flag.load(Ordering::SeqCst) {
info!("listener shutdown for quit flag");
break;
}
let pipe_connection = match listener.accept() {
Ok(None) => {
continue;
}
Expand All @@ -394,7 +398,7 @@ impl Server {
}
Err(e) => {
error!("listener accept got {:?}", e);
break;
continue;
}
};

Expand Down
11 changes: 1 addition & 10 deletions src/sync/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use std::os::unix::prelude::AsRawFd;
use nix::Error;

use nix::unistd::*;
use std::sync::{Arc};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::common::{self, client_connect, SOCK_CLOEXEC};
#[cfg(target_os = "macos")]
use crate::common::set_fd_close_exec;
Expand Down Expand Up @@ -81,11 +79,7 @@ impl PipeListener {
// - Ok(Some(PipeConnection)) if a new connection is established
// - Ok(None) if spurious wake up with no new connection
// - Err(io::Error) if there is an error and listener loop should be shutdown
pub(crate) fn accept( &self, quit_flag: &Arc<AtomicBool>) -> std::result::Result<Option<PipeConnection>, io::Error> {
if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag"));
}

pub(crate) fn accept(&self) -> std::result::Result<Option<PipeConnection>, io::Error> {
let mut pollers = vec![
libc::pollfd {
fd: self.monitor_fd.0,
Expand Down Expand Up @@ -124,9 +118,6 @@ impl PipeListener {
return Ok(None);
}

if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag"));
}

#[cfg(target_os = "linux")]
let fd = match accept4(self.fd, SockFlag::SOCK_CLOEXEC) {
Expand Down
17 changes: 4 additions & 13 deletions src/sync/sys/windows/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::os::windows::ffi::OsStrExt;
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{IntoRawHandle};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc};
use std::{io};

use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE };
Expand Down Expand Up @@ -75,14 +74,7 @@ impl PipeListener {
// accept returns:
// - Ok(Some(PipeConnection)) if a new connection is established
// - Err(io::Error) if there is an error and listener loop should be shutdown
pub(crate) fn accept(&self, quit_flag: &Arc<AtomicBool>) -> std::result::Result<Option<PipeConnection>, io::Error> {
if quit_flag.load(Ordering::SeqCst) {
return Err(io::Error::new(
io::ErrorKind::Other,
"listener shutdown for quit flag",
));
}

pub(crate) fn accept(&self) -> std::result::Result<Option<PipeConnection>, io::Error> {
// Create a new pipe instance for every new client
let instance = self.new_instance()?;
let np = match PipeConnection::new(instance) {
Expand Down Expand Up @@ -376,6 +368,7 @@ fn handle_windows_error(e: io::Error) -> Error {
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use windows_sys::Win32::Foundation::ERROR_FILE_NOT_FOUND;

#[test]
Expand All @@ -398,8 +391,7 @@ mod test {

let listener_server = listener.clone();
let thread = std::thread::spawn(move || {
let quit_flag = Arc::new(AtomicBool::new(false));
match listener_server.accept(&quit_flag) {
match listener_server.accept() {
Ok(Some(_)) => {
// pipe is working
}
Expand All @@ -422,8 +414,7 @@ mod test {

let listener_server = listener.clone();
let thread = std::thread::spawn(move || {
let quit_flag = Arc::new(AtomicBool::new(false));
match listener_server.accept(&quit_flag) {
match listener_server.accept() {
Ok(_) => {
panic!("should not get pipe on close")
}
Expand Down

0 comments on commit 9954fe3

Please sign in to comment.