diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index 12fa64869a..8aa7b319c4 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -444,7 +444,7 @@ impl ClickHouseProcess { // that we don't return from this until the server is actually ready to // accept connections. let ports = { - let new_ports = wait_for_ports(data_dir.log_path()).await?; + let new_ports = wait_for_ports(&data_dir.log_path()).await?; // If either port was specified, add an additional check that we // recovered exactly that port. ports.assert_consistent(&new_ports); @@ -541,7 +541,7 @@ impl ClickHouseProcess { } let data_path = data_dir.root_path().to_path_buf(); wait_for_ready( - data_dir.log_path(), + &data_dir.log_path(), CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY, ) @@ -632,7 +632,7 @@ impl ClickHouseProcess { let data_path = data_dir.root_path().to_path_buf(); let address = ipv6_localhost_on(port); wait_for_ready( - data_dir.keeper_log_path(), + &data_dir.keeper_log_path(), CLICKHOUSE_KEEPER_TIMEOUT, KEEPER_READY, ) @@ -1148,90 +1148,44 @@ pub enum NodeKind { // from this function successfully, but the server itself is not yet // ready to accept connections. pub async fn wait_for_ports( - log_path: Utf8PathBuf, + log_path: &Utf8Path, ) -> Result { - let p = poll::wait_for_condition( - || async { - let result = - discover_local_listening_ports(&log_path, CLICKHOUSE_TIMEOUT) - .await; - match result { - // Successfully extracted the ports, return them. - Ok(ports) => Ok(ports), - Err(e) => { - match e { - ClickHouseError::Io(ref inner) => { - if matches!( - inner.kind(), - std::io::ErrorKind::NotFound - ) { - return Err(poll::CondCheckError::NotYet); - } - } - _ => {} - } - Err(poll::CondCheckError::from(e)) - } - } - }, - &Duration::from_millis(500), - &CLICKHOUSE_TIMEOUT, - ) - .await - .context("waiting to discover ClickHouse ports")?; - Ok(p) -} - -// Parse the ClickHouse log file at the given path, looking for a line -// reporting the port number of the HTTP and native TCP servers. -async fn discover_local_listening_ports( - path: &Utf8Path, - timeout: Duration, -) -> Result { - let timeout = Instant::now() + timeout; - tokio::time::timeout_at(timeout, find_clickhouse_ports_in_log(path)) + wait_for_ready(log_path, CLICKHOUSE_TIMEOUT, CLICKHOUSE_READY).await?; + find_clickhouse_ports_in_log(log_path) .await - .map_err(|_| ClickHouseError::Timeout)? + .context("finding ClickHouse ports in log") } // Parse the ClickHouse log for the HTTP and native TCP port numbers. // -// NOTE: This function loops forever until the expected lines are found. It -// should be run under a timeout, or some other mechanism for cancelling it. +// # Panics +// +// This function panics if it reaches EOF without discovering the ports. This +// means the function can only be called after `wait_for_ready()` or a similar +// method. async fn find_clickhouse_ports_in_log( path: &Utf8Path, ) -> Result { - let mut reader = BufReader::new(File::open(path).await?); + let reader = BufReader::new(File::open(path).await?); let mut lines = reader.lines(); let mut ports = ClickHousePorts::zero(); - 'line_search: loop { - let line = lines.next_line().await?; - match line { - Some(line) => { - if let Some(http_port) = - find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)? - { - ports.http = http_port; - } else if let Some(native_port) = - find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)? - { - ports.native = native_port; - } else { - continue 'line_search; - } - if !ports.any_zero() { - return Ok(ports); - } - } - None => { - // Reached EOF, just sleep for an interval and check again. - sleep(Duration::from_millis(10)).await; - - // We might have gotten a partial line; close the file, reopen - // it, and start reading again from the beginning. - reader = BufReader::new(File::open(path).await?); - lines = reader.lines(); - } + loop { + let line = lines.next_line().await?.expect( + "Reached EOF in ClickHouse log file without discovering ports", + ); + if let Some(http_port) = + find_port_after_needle(&line, CLICKHOUSE_HTTP_PORT_NEEDLE)? + { + ports.http = http_port; + } else if let Some(native_port) = + find_port_after_needle(&line, CLICKHOUSE_TCP_PORT_NEEDLE)? + { + ports.native = native_port; + } else { + continue; + } + if !ports.any_zero() { + return Ok(ports); } } } @@ -1265,13 +1219,13 @@ fn find_port_after_needle( // Wait for the ClickHouse log file to report it is ready to receive connections pub async fn wait_for_ready( - log_path: Utf8PathBuf, + log_path: &Utf8Path, timeout: Duration, needle: &str, ) -> Result<(), anyhow::Error> { let p = poll::wait_for_condition( || async { - let result = discover_ready(&log_path, timeout, needle).await; + let result = discover_ready(log_path, timeout, needle).await; match result { Ok(ready) => Ok(ready), Err(e) => { @@ -1344,12 +1298,10 @@ async fn clickhouse_ready_from_log( #[cfg(test)] mod tests { - use crate::dev::clickhouse::CLICKHOUSE_TCP_PORT_NEEDLE; - use super::{ - discover_local_listening_ports, discover_ready, ClickHouseError, - ClickHousePorts, CLICKHOUSE_HTTP_PORT_NEEDLE, CLICKHOUSE_READY, - CLICKHOUSE_TIMEOUT, + discover_ready, wait_for_ports, ClickHouseError, ClickHousePorts, + CLICKHOUSE_HTTP_PORT_NEEDLE, CLICKHOUSE_READY, + CLICKHOUSE_TCP_PORT_NEEDLE, CLICKHOUSE_TIMEOUT, }; use camino_tempfile::NamedUtf8TempFile; use std::process::Stdio; @@ -1371,8 +1323,9 @@ mod tests { } #[tokio::test] - async fn test_discover_local_listening_ports() { - // Write some data to a fake log file + async fn wait_for_ports_finds_actual_ports() { + // Test the nominal case, where ClickHouse writes out the lines with + // the ports, and then the sentinel indicating readiness. let mut file = NamedUtf8TempFile::new().unwrap(); writeln!(file, "A garbage line").unwrap(); writeln!( @@ -1382,15 +1335,59 @@ mod tests { ) .unwrap(); writeln!(file, "Another garbage line").unwrap(); - writeln!(file, "{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT,) + writeln!(file, "{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT) .unwrap(); - writeln!(file, "Yet another garbage line").unwrap(); + writeln!(file, "{}", CLICKHOUSE_READY).unwrap(); file.flush().unwrap(); + let ports = wait_for_ports(&file.path()).await.unwrap(); + assert_eq!(ports.http, EXPECTED_HTTP_PORT); + assert_eq!(ports.native, EXPECTED_TCP_PORT); + } - let ports = - discover_local_listening_ports(file.path(), CLICKHOUSE_TIMEOUT) - .await - .unwrap(); + #[should_panic] + #[tokio::test] + async fn wait_for_ports_panics_with_sentinel_but_no_ports() { + let mut file = NamedUtf8TempFile::new().unwrap(); + writeln!(file, "A garbage line").unwrap(); + writeln!( + file, + "{}:{}", + CLICKHOUSE_HTTP_PORT_NEEDLE, EXPECTED_HTTP_PORT, + ) + .unwrap(); + writeln!(file, "Another garbage line").unwrap(); + writeln!(file, "{}", CLICKHOUSE_READY).unwrap(); + file.flush().unwrap(); + wait_for_ports(&file.path()).await.unwrap(); + } + + #[tokio::test] + async fn wait_for_ports_waits_for_sentinel_line() { + let file = Arc::new(Mutex::new(NamedUtf8TempFile::new().unwrap())); + // Start a task that slowly writes lines into the file. This ensures + // that we wait for a while until the sentinel line is written. + let file_ = file.clone(); + spawn(async move { + for line in [ + String::from("A garbage line"), + format!( + "{}:{}", + CLICKHOUSE_HTTP_PORT_NEEDLE, EXPECTED_HTTP_PORT + ), + String::from("Another garbage line"), + format!("{}:{}", CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT), + String::from(CLICKHOUSE_READY), + ] { + { + let mut f = file_.lock().await; + writeln!(f, "{}", line).unwrap(); + f.flush().unwrap(); + } + sleep(Duration::from_millis(100)).await; + } + }); + let path = file.lock().await.path().to_owned(); + let ports = wait_for_ports(&path).await.unwrap(); assert_eq!(ports.http, EXPECTED_HTTP_PORT); assert_eq!(ports.native, EXPECTED_TCP_PORT); } @@ -1439,147 +1436,6 @@ mod tests { )); } - // A regression test for #131. - // - // The function `discover_local_listening_ports` initially read from the log - // file until EOF, but there's no guarantee that ClickHouse has written the - // port we're searching for before the reader consumes the whole file. This - // test confirms that the file is read until the line is found, ignoring - // EOF, at least until the timeout is hit. - #[tokio::test] - async fn test_discover_local_listening_ports_slow_write() { - // In this case the writer is slightly "slower" than the reader. - let writer_interval = Duration::from_millis(20); - let ports = - read_log_file(CLICKHOUSE_TIMEOUT, writer_interval).await.unwrap(); - assert_eq!(ports.http, EXPECTED_HTTP_PORT); - assert_eq!(ports.native, EXPECTED_TCP_PORT); - } - - // An extremely slow write test, to verify the timeout handling. - #[tokio::test] - async fn test_discover_local_listening_ports_timeout() { - // In this case, the writer is _much_ slower than the reader, so that the reader times out - // entirely before finding the desired line. - let reader_timeout = Duration::from_millis(1); - let writer_interval = Duration::from_millis(100); - assert!(read_log_file(reader_timeout, writer_interval).await.is_err()); - } - - // Implementation of the above tests, simulating simultaneous - // reading/writing of the log file - // - // This uses Tokio's test utilities to manage time, rather than relying on - // timeouts. - async fn read_log_file( - reader_timeout: Duration, - writer_interval: Duration, - ) -> Result { - async fn write_and_wait( - file: &mut NamedUtf8TempFile, - line: String, - interval: Duration, - ) { - println!( - "Writing to log file: {:?}, contents: '{}'", - file.path(), - line - ); - write!(file, "{}", line).unwrap(); - file.flush().unwrap(); - sleep(interval).await; - } - - // Start a task that slowly writes lines to the log file. - // - // NOTE: This looks overly complicated, and it is. We have to wrap this - // in a mutex because both this function, and the writer task we're - // spawning, need access to the file. They may complete in any order, - // and so it's not possible to give one of them ownership over the - // `NamedTempFile`. If the owning task completes, that may delete the - // file before the other task accesses it. So we need interior - // mutability (because one of the references is mutable for writing), - // and _this_ scope must own it. - let file = Arc::new(Mutex::new(NamedUtf8TempFile::new()?)); - let path = file.lock().await.path().to_path_buf(); - let writer_file = file.clone(); - let writer_task = spawn(async move { - let mut file = writer_file.lock().await; - write_and_wait( - &mut file, - "A garbage line\n".to_string(), - writer_interval, - ) - .await; - - // Ensure we can still parse the line even if our buf reader hits - // EOF in the middle of the line - // (https://github.com/oxidecomputer/omicron/issues/3580). - write_and_wait( - &mut file, - (&CLICKHOUSE_HTTP_PORT_NEEDLE[..30]).to_string(), - writer_interval, - ) - .await; - write_and_wait( - &mut file, - format!( - "{}:{}\n", - &CLICKHOUSE_HTTP_PORT_NEEDLE[30..], - EXPECTED_HTTP_PORT - ), - writer_interval, - ) - .await; - write_and_wait( - &mut file, - "Another garbage line\n".to_string(), - writer_interval, - ) - .await; - write_and_wait( - &mut file, - format!( - "{}:{}\n", - &CLICKHOUSE_TCP_PORT_NEEDLE, EXPECTED_TCP_PORT - ), - writer_interval, - ) - .await; - write_and_wait( - &mut file, - "Yet another line of junk\n".to_string(), - writer_interval, - ) - .await; - }); - println!("Starting reader task"); - let reader_task = discover_local_listening_ports(&path, reader_timeout); - - // "Run" the test. - // - // Note that the futures for the reader/writer tasks must be pinned to - // the stack, so that they may be polled on multiple passes through the - // select loop without consuming them. - tokio::pin!(writer_task); - tokio::pin!(reader_task); - let mut poll_writer = true; - let reader_result = loop { - tokio::select! { - reader_result = &mut reader_task => { - println!("Reader finished"); - break reader_result; - }, - writer_result = &mut writer_task, if poll_writer => { - println!("Writer finished"); - let _ = writer_result.unwrap(); - poll_writer = false; - }, - } - }; - reader_result - } - #[test] fn test_clickhouse_ports_assert_consistent() { let second = ClickHousePorts { http: 1, native: 1 };