From 079a69396a85e568bba7fe563bd3c1d776145bdf Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Fri, 31 Jan 2025 15:12:54 +0100 Subject: [PATCH 1/5] Add debug asserts to UntypedArenaPtr::build_with, ::get_tag and raw_ptr_as_cell! These two functions are pretty unsafe, but having these assertions makes it easier to catch UB in testing. --- src/macros.rs | 1 + src/types.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/macros.rs b/src/macros.rs index 30a863cac..7778c89e6 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -175,6 +175,7 @@ macro_rules! raw_ptr_as_cell { // TODO use <*{const,mut} _>::addr instead of as when the strict_provenance feature is stable rust-lang/rust#95228 // we might need <*{const,mut} _>::expose_provenance for strict provenance, dependening on how we recreate a pointer later let ptr : *const _ = $ptr; + debug_assert!(!$ptr.is_null()); HeapCellValue::from_ptr_addr(ptr as usize) }}; } diff --git a/src/types.rs b/src/types.rs index 49ff0a6e9..d68daac7c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -656,6 +656,7 @@ pub struct UntypedArenaPtr { impl UntypedArenaPtr { #[inline(always)] pub fn build_with(ptr: usize) -> Self { + debug_assert!(ptr != 0); UntypedArenaPtr::new().with_ptr(ptr as u64) } } @@ -698,6 +699,7 @@ impl UntypedArenaPtr { #[inline] pub fn get_tag(self) -> ArenaHeaderTag { unsafe { + debug_assert!(!self.get_ptr().is_null()); let header = *(self.get_ptr() as *const ArenaHeader); header.get_tag() } From fdc35f8b40d2e43438d66c32716af5b846723cb2 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Fri, 31 Jan 2025 17:26:40 +0100 Subject: [PATCH 2/5] Fix UB caused by interactions with null streams --- src/machine/lib_machine/mod.rs | 2 +- src/machine/mod.rs | 2 +- src/machine/streams.rs | 96 +++++++++++++++++++++++++++++++++- src/machine/system_calls.rs | 83 ++++++++++++++--------------- src/macros.rs | 6 --- 5 files changed, 136 insertions(+), 53 deletions(-) diff --git a/src/machine/lib_machine/mod.rs b/src/machine/lib_machine/mod.rs index 24f22fe78..35668e64e 100644 --- a/src/machine/lib_machine/mod.rs +++ b/src/machine/lib_machine/mod.rs @@ -535,7 +535,7 @@ impl Machine { /// Consults a module into the [`Machine`] from a string. pub fn consult_module_string(&mut self, module_name: &str, program: impl Into) { let stream = Stream::from_owned_string(program.into(), &mut self.machine_st.arena); - self.machine_st.registers[1] = stream_as_cell!(stream); + self.machine_st.registers[1] = stream.into(); self.machine_st.registers[2] = atom_as_cell!(&atom_table::AtomTable::build_with( &self.machine_st.atom_tbl, module_name diff --git a/src/machine/mod.rs b/src/machine/mod.rs index a62c0caa8..0313a2d90 100644 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@ -291,7 +291,7 @@ impl Machine { } fn load_file(&mut self, path: &str, stream: Stream) { - self.machine_st.registers[1] = stream_as_cell!(stream); + self.machine_st.registers[1] = stream.into(); self.machine_st.registers[2] = atom_as_cell!(AtomTable::build_with(&self.machine_st.atom_tbl, path)); diff --git a/src/machine/streams.rs b/src/machine/streams.rs index d434c569a..7b616b920 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -982,6 +982,20 @@ fn cursor_position( } } +impl From for HeapCellValue { + #[inline(always)] + fn from(stream: Stream) -> Self { + if stream.is_null_stream() { + let res = atom!("null_stream"); + atom_as_cell!(res) + } else { + let res = stream.as_ptr(); + debug_assert!(!res.is_null()); + raw_ptr_as_cell!(res) + } + } +} + impl Stream { #[inline] pub(crate) fn position(&mut self) -> Option<(u64, usize)> { @@ -1626,7 +1640,7 @@ impl MachineState { match_untyped_arena_ptr!(ptr, (ArenaHeaderTag::Stream, stream) => { return if stream.is_null_stream() { - Err(self.open_permission_error(stream_as_cell!(stream), caller, arity)) + Err(self.open_permission_error(HeapCellValue::from(stream), caller, arity)) } else { Ok(stream) }; @@ -1689,7 +1703,7 @@ impl MachineState { if let Some(alias) = stream.options().get_alias() { atom_as_cell!(alias) } else { - stream_as_cell!(stream) + stream.into() }, ); @@ -1893,3 +1907,81 @@ impl MachineState { } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::machine::config::*; + + #[test] + #[cfg_attr(miri, ignore)] + fn current_input_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("current_input(S).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn read_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("get_code(C).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_err()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn current_output_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine.run_query("current_output(S).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn write_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine.run_query("write(hello).").collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_err()); + } + + /// A variant of the [`write_null_stream`] that tries to write to a (null) input stream. + #[test] + #[cfg_attr(miri, ignore)] + fn write_null_input_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("current_input(Stream), write(Stream, hello).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_err()); + } +} diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 14f9094b8..11fe5c5f1 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -1881,7 +1881,7 @@ impl Machine { let stream = self.user_input; if let Some(var) = addr.as_var() { - self.machine_st.bind(var, stream_as_cell!(stream)); + self.machine_st.bind(var, stream.into()); return Ok(()); } @@ -1916,7 +1916,7 @@ impl Machine { let stream = self.user_output; if let Some(var) = addr.as_var() { - self.machine_st.bind(var, stream_as_cell!(stream)); + self.machine_st.bind(var, stream.into()); return Ok(()); } @@ -3273,7 +3273,7 @@ impl Machine { match stream.write_all(&bytes) { Ok(_) => {} _ => { - let addr = stream_as_cell!(stream); + let addr = stream.into(); let err = self .machine_st .existence_error(ExistenceError::Stream(addr)); @@ -3323,7 +3323,7 @@ impl Machine { _ => { let err = self .machine_st - .existence_error(ExistenceError::Stream(stream_as_cell!(stream))); + .existence_error(ExistenceError::Stream(stream.into())); return Err(self.machine_st.error_form(err, stub_gen())); } @@ -3336,9 +3336,9 @@ impl Machine { return Ok(()); } _ => { - let err = self.machine_st.existence_error(ExistenceError::Stream( - stream_as_cell!(stream), - )); + let err = self + .machine_st + .existence_error(ExistenceError::Stream(stream.into())); return Err(self.machine_st.error_form(err, stub_gen())); } @@ -3730,7 +3730,7 @@ impl Machine { self.indices.streams = self.indices.streams.sub(&null_streams); if let Some(first_stream) = first_stream { - let stream = stream_as_cell!(first_stream); + let stream = first_stream.into(); let var = self.deref_register(1).as_var().unwrap(); @@ -3761,8 +3761,7 @@ impl Machine { if let Some(next_stream) = next_stream { let var = self.deref_register(2).as_var().unwrap(); - let next_stream = stream_as_cell!(next_stream); - self.machine_st.bind(var, next_stream); + self.machine_st.bind(var, next_stream.into()); } else { self.machine_st.fail = true; } @@ -3779,7 +3778,7 @@ impl Machine { if !stream.is_output_stream() { let stub = functor_stub(atom!("flush_output"), 1); - let addr = stream_as_cell!(stream); + let addr = HeapCellValue::from(stream); let err = self.machine_st @@ -3899,7 +3898,7 @@ impl Machine { if close_result.is_err() { let stub = functor_stub(atom!("close"), 1); - let addr = stream_as_cell!(stream); + let addr = stream.into(); let err = self .machine_st .existence_error(ExistenceError::Stream(addr)); @@ -4472,10 +4471,9 @@ impl Machine { self.indices.streams.insert(stream); - let stream = stream_as_cell!(stream); - let stream_addr = self.deref_register(2); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); + self.machine_st + .bind(stream_addr.as_var().unwrap(), stream.into()); } Err(_) => { self.machine_st.fail = true; @@ -4689,7 +4687,7 @@ impl Machine { *stream.options_mut() = StreamOptions::default(); stream.options_mut().set_stream_type(StreamType::Binary); self.indices.streams.insert(stream); - let stream = stream_as_cell!(stream); + let stream: HeapCellValue = stream.into(); let handle: TypedArenaPtr = arena_alloc!(request.response, &mut self.machine_st.arena); @@ -4792,27 +4790,26 @@ impl Machine { read_heap_cell!(culprit, (HeapCellValueTag::Cons, cons_ptr) => { - match_untyped_arena_ptr!(cons_ptr, - (ArenaHeaderTag::HttpResponse, http_response) => { - let mut stream = Stream::from_http_sender( - http_response, - status_code, - headers, - &mut self.machine_st.arena + match_untyped_arena_ptr!(cons_ptr, + (ArenaHeaderTag::HttpResponse, http_response) => { + let mut stream = Stream::from_http_sender( + http_response, + status_code, + headers, + &mut self.machine_st.arena + ); + *stream.options_mut() = StreamOptions::default(); + stream.options_mut().set_stream_type(StreamType::Binary); + self.indices.streams.insert(stream); + self.machine_st.bind(stream_addr.as_var().unwrap(), stream.into()); + } + _ => { + unreachable!(); + } ); - *stream.options_mut() = StreamOptions::default(); - stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); - let stream = stream_as_cell!(stream); - self.machine_st.bind(stream_addr.as_var().unwrap(), stream); - } - _ => { - unreachable!(); - } - ); } _ => { - unreachable!(); + unreachable!(); } ); @@ -5125,7 +5122,7 @@ impl Machine { let stream_var = self.deref_register(3); self.machine_st - .bind(stream_var.as_var().unwrap(), stream_as_cell!(stream)); + .bind(stream_var.as_var().unwrap(), stream.into()); } else { let err = self .machine_st @@ -6559,7 +6556,7 @@ impl Machine { self.indices.streams.insert(stream); - stream_as_cell!(stream) + HeapCellValue::from(stream) } Err(ErrorKind::PermissionDenied) => { return Err(self.machine_st.open_permission_error( @@ -6715,14 +6712,13 @@ impl Machine { self.indices.streams.insert(tcp_stream); - let tcp_stream = stream_as_cell!(tcp_stream); let client = atom_as_cell!(client); let client_addr = self.deref_register(2); let stream_addr = self.deref_register(3); self.machine_st.bind(client_addr.as_var().unwrap(), client); - self.machine_st.bind(stream_addr.as_var().unwrap(), tcp_stream); + self.machine_st.bind(stream_addr.as_var().unwrap(), tcp_stream.into()); } None => { self.machine_st.fail = true; @@ -6770,10 +6766,11 @@ impl Machine { let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena); self.indices.streams.insert(stream); - self.machine_st.heap.push(stream_as_cell!(stream)); + // FIXME: why are we pushing a random, unreferenced cell on the heap? + self.machine_st.heap.push(stream.into()); let stream_addr = self.deref_register(3); self.machine_st - .bind(stream_addr.as_var().unwrap(), stream_as_cell!(stream)); + .bind(stream_addr.as_var().unwrap(), stream.into()); Ok(()) } else { @@ -6826,7 +6823,7 @@ impl Machine { let stream_addr = self.deref_register(4); self.machine_st - .bind(stream_addr.as_var().unwrap(), stream_as_cell!(stream)); + .bind(stream_addr.as_var().unwrap(), stream.into()); } else { unreachable!(); } @@ -6875,7 +6872,7 @@ impl Machine { let err = self.machine_st.permission_error( Permission::Reposition, atom!("stream"), - stream_as_cell!(stream), + HeapCellValue::from(stream), ); return Err(self.machine_st.error_form(err, stub)); @@ -8099,7 +8096,7 @@ impl Machine { let lib_stream = Stream::from_static_string(library, &mut self.machine_st.arena); unify!( self.machine_st, - stream_as_cell!(lib_stream), + HeapCellValue::from(lib_stream), self.machine_st.registers[2] ); diff --git a/src/macros.rs b/src/macros.rs index 7778c89e6..634959304 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -218,12 +218,6 @@ macro_rules! string_as_pstr_cell { }}; } -macro_rules! stream_as_cell { - ($ptr:expr) => { - raw_ptr_as_cell!($ptr.as_ptr()) - }; -} - macro_rules! cell_as_stream { ($cell:expr) => {{ let ptr = cell_as_untyped_arena_ptr!($cell); From 7108e87e92f1ab16dbb6c0ee4e6a954c943ab84e Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Sun, 2 Feb 2025 17:38:59 +0100 Subject: [PATCH 3/5] Make Stream::Null behave like /dev/null --- src/machine/mod.rs | 4 +++ src/machine/streams.rs | 74 +++++++++++++++++++++++------------------- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/machine/mod.rs b/src/machine/mod.rs index 0313a2d90..107240300 100644 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@ -509,6 +509,10 @@ impl Machine { .insert(atom!("user_error"), self.user_error); self.indices.streams.insert(self.user_error); + + self.indices + .stream_aliases + .insert(atom!("null_stream"), Stream::Null(StreamOptions::default())); } #[inline(always)] diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 7b616b920..443c01830 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -836,13 +836,13 @@ impl Read for Stream { ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, )), - Stream::OutputFile(_) - | Stream::StandardError(_) - | Stream::StandardOutput(_) - | Stream::Null(_) => Err(std::io::Error::new( - ErrorKind::PermissionDenied, - StreamError::ReadFromOutputStream, - )), + Stream::Null(_) => Ok(buf.len()), + Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => { + Err(std::io::Error::new( + ErrorKind::PermissionDenied, + StreamError::ReadFromOutputStream, + )) + } } } } @@ -864,13 +864,10 @@ impl Write for Stream { ErrorKind::PermissionDenied, StreamError::WriteToInputStream, )), - Stream::StaticString(_) - | Stream::Readline(_) - | Stream::InputFile(..) - | Stream::Null(_) => Err(std::io::Error::new( - ErrorKind::PermissionDenied, - StreamError::WriteToInputStream, - )), + Stream::Null(_) => Ok(buf.len()), + Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(..) => Err( + std::io::Error::new(ErrorKind::PermissionDenied, StreamError::WriteToInputStream), + ), } } @@ -890,13 +887,10 @@ impl Write for Stream { ErrorKind::PermissionDenied, StreamError::FlushToInputStream, )), - Stream::StaticString(_) - | Stream::Readline(_) - | Stream::InputFile(_) - | Stream::Null(_) => Err(std::io::Error::new( - ErrorKind::PermissionDenied, - StreamError::FlushToInputStream, - )), + Stream::Null(_) => Ok(()), + Stream::StaticString(_) | Stream::Readline(_) | Stream::InputFile(_) => Err( + std::io::Error::new(ErrorKind::PermissionDenied, StreamError::FlushToInputStream), + ), } } } @@ -1144,6 +1138,7 @@ impl Stream { } } } + Stream::Null(_) => AtEndOfStream::At, #[cfg(feature = "http")] Stream::HttpRead(stream_layout) => { if stream_layout @@ -1356,7 +1351,8 @@ impl Stream { | Stream::Byte(_) | Stream::Readline(_) | Stream::StaticString(_) - | Stream::InputFile(..) => true, + | Stream::InputFile(..) + | Stream::Null(_) => true, _ => false, } } @@ -1372,7 +1368,8 @@ impl Stream { | Stream::StandardOutput(_) | Stream::NamedTcp(..) | Stream::Byte(_) - | Stream::OutputFile(..) => true, + | Stream::OutputFile(..) + | Stream::Null(_) => true, _ => false, } } @@ -1607,7 +1604,7 @@ impl MachineState { debug_assert_eq!(arity, 0); return match stream_aliases.get(&name) { - Some(stream) if !stream.is_null_stream() => Ok(*stream), + Some(stream) => Ok(*stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@ -1625,7 +1622,7 @@ impl MachineState { debug_assert_eq!(arity, 0); return match stream_aliases.get(&name) { - Some(stream) if !stream.is_null_stream() => Ok(*stream), + Some(stream) => Ok(*stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@ -1639,11 +1636,10 @@ impl MachineState { (HeapCellValueTag::Cons, ptr) => { match_untyped_arena_ptr!(ptr, (ArenaHeaderTag::Stream, stream) => { - return if stream.is_null_stream() { - Err(self.open_permission_error(HeapCellValue::from(stream), caller, arity)) - } else { - Ok(stream) - }; + if stream.is_null_stream() { + unreachable!("Null streams have no Cons representation"); + } + return Ok(stream); } (ArenaHeaderTag::Dropped, _value) => { let stub = functor_stub(caller, arity); @@ -1936,7 +1932,11 @@ mod test { let results = machine.run_query("get_code(C).").collect::>(); assert_eq!(results.len(), 1); - assert!(results[0].is_err()); + assert!( + results[0].is_ok(), + "Expected read to succeed, got {:?}", + results[0] + ); } #[test] @@ -1966,7 +1966,11 @@ mod test { let results = machine.run_query("write(hello).").collect::>(); assert_eq!(results.len(), 1); - assert!(results[0].is_err()); + assert!( + results[0].is_ok(), + "Expected write to succeed, got {:?}", + results[0] + ); } /// A variant of the [`write_null_stream`] that tries to write to a (null) input stream. @@ -1982,6 +1986,10 @@ mod test { .collect::>(); assert_eq!(results.len(), 1); - assert!(results[0].is_err()); + assert!( + results[0].is_ok(), + "Expected write to succeed, got {:?}", + results[0] + ); } } From 8966e175f1a60b594bbc21752838f66920db75cf Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Fri, 7 Feb 2025 14:35:27 +0100 Subject: [PATCH 4/5] [fixup] return that reading from a null stream wrote 0 bytes to the buffer --- src/machine/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 443c01830..253bd47ea 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -836,7 +836,7 @@ impl Read for Stream { ErrorKind::PermissionDenied, StreamError::ReadFromOutputStream, )), - Stream::Null(_) => Ok(buf.len()), + Stream::Null(_) => Ok(0), Stream::OutputFile(_) | Stream::StandardError(_) | Stream::StandardOutput(_) => { Err(std::io::Error::new( ErrorKind::PermissionDenied, From f45b0bcfe8cb0bebcdcc7994f9b63661cfc7ad77 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Fri, 7 Feb 2025 14:59:32 +0100 Subject: [PATCH 5/5] Remove redundant alias resolution in at_end_of_stream/1, add corresponding tests for null streams Also fixed at_end_of_stream/0 leaving a choicepoint. --- src/lib/builtins.pl | 10 +++--- src/machine/mod.rs | 5 ++- src/machine/streams.rs | 75 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/src/lib/builtins.pl b/src/lib/builtins.pl index 745cbc487..f6df8da75 100644 --- a/src/lib/builtins.pl +++ b/src/lib/builtins.pl @@ -2188,12 +2188,10 @@ %% at_end_of_stream(+Stream). % % True iff the stream Stream has ended -at_end_of_stream(S_or_a) :- - ( var(S_or_a) -> +at_end_of_stream(S) :- + ( var(S) -> throw(error(instantiation_error, at_end_of_stream/1)) - ; atom(S_or_a) -> - stream_property(S, alias(S_or_a)) - ; S = S_or_a + ; true ), stream_property(S, end_of_stream(E)), ( E = at -> true ; E = past ). @@ -2205,7 +2203,7 @@ current_input(S), stream_property(S, end_of_stream(E)), !, - ( E = at ; E = past ). + ( E = at -> true ; E = past ). %% set_stream_position(+Stream, +Position). % diff --git a/src/machine/mod.rs b/src/machine/mod.rs index 107240300..e2306283b 100644 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@ -510,9 +510,12 @@ impl Machine { self.indices.streams.insert(self.user_error); + let mut null_options = StreamOptions::default(); + null_options.set_alias_to_atom_opt(Some(atom!("null_stream"))); + self.indices .stream_aliases - .insert(atom!("null_stream"), Stream::Null(StreamOptions::default())); + .insert(atom!("null_stream"), Stream::Null(null_options)); } #[inline(always)] diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 253bd47ea..54efbd39c 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -1908,6 +1908,14 @@ impl MachineState { mod test { use super::*; use crate::machine::config::*; + use crate::LeafAnswer; + + fn is_successful(answer: &Result) -> bool { + matches!( + answer, + Ok(LeafAnswer::True) | Ok(LeafAnswer::LeafAnswer { .. }) + ) + } #[test] #[cfg_attr(miri, ignore)] @@ -1919,7 +1927,7 @@ mod test { let results = machine.run_query("current_input(S).").collect::>(); assert_eq!(results.len(), 1); - assert!(results[0].is_ok()); + assert!(is_successful(&results[0])); } #[test] @@ -1933,7 +1941,7 @@ mod test { assert_eq!(results.len(), 1); assert!( - results[0].is_ok(), + is_successful(&results[0]), "Expected read to succeed, got {:?}", results[0] ); @@ -1951,7 +1959,7 @@ mod test { let results = machine.run_query("current_output(S).").collect::>(); assert_eq!(results.len(), 1); - assert!(results[0].is_ok()); + assert!(is_successful(&results[0])); } #[test] @@ -1967,7 +1975,28 @@ mod test { assert_eq!(results.len(), 1); assert!( - results[0].is_ok(), + is_successful(&results[0]), + "Expected write to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn put_code_null_stream() { + // TODO: switch to a proper solution for configuring the machine with null streams + // once `StreamConfig` supports it. + let mut machine = MachineBuilder::new().build(); + machine.user_output = Stream::Null(StreamOptions::default()); + machine.configure_streams(); + + let results = machine + .run_query("put_code(user_output, 65).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), "Expected write to succeed, got {:?}", results[0] ); @@ -1987,9 +2016,45 @@ mod test { assert_eq!(results.len(), 1); assert!( - results[0].is_ok(), + is_successful(&results[0]), "Expected write to succeed, got {:?}", results[0] ); } + + #[test] + #[cfg_attr(miri, ignore)] + fn at_end_of_stream_0_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine.run_query("at_end_of_stream.").collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected at_end_of_stream to succeed, got {:?}", + results[0] + ); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn at_end_of_stream_1_null_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("current_input(Stream), at_end_of_stream(Stream).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!( + is_successful(&results[0]), + "Expected at_end_of_stream to succeed, got {:?}", + results[0] + ); + } }