From ffaaba29ea79e7659f95efc86408b09a990add4a Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 14 Apr 2024 03:23:46 +0800 Subject: [PATCH] Cleanup macros --- core/src/snapshot.rs | 154 ++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 76 deletions(-) diff --git a/core/src/snapshot.rs b/core/src/snapshot.rs index 99f6d58..487306b 100644 --- a/core/src/snapshot.rs +++ b/core/src/snapshot.rs @@ -142,6 +142,37 @@ enum SnapshotRecord<'a, I: Clone, A: Clone> { const MAX_INLINED_BYTES: usize = 64; +macro_rules! encode { + ($w:ident.$node: ident::$status: ident) => {{ + let node = $node.as_ref(); + let encoded_node_len = T::node_encoded_len(node); + let encoded_len = 4 + 1 + encoded_node_len; + if encoded_len <= MAX_INLINED_BYTES { + let mut buf = [0u8; MAX_INLINED_BYTES]; + buf[0] = Self::$status; + buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes()); + T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?; + $w.write_all(&buf[..encoded_len]).map(|_| encoded_len) + } else { + let mut buf = BytesMut::with_capacity(encoded_len); + buf.put_u8(Self::$status); + buf.put_u32_le(encoded_node_len as u32); + T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?; + $w.write_all(&buf).map(|_| encoded_len) + } + }}; + ($w:ident.$t: ident($status: ident)) => {{ + const N: usize = mem::size_of::() + mem::size_of::(); + let mut data = [0u8; N]; + data[0] = Self::$status; + data[1..N].copy_from_slice(&$t.to_le_bytes()); + $w.write_all(&data).map(|_| N) + }}; + ($w:ident.$ident: ident) => {{ + $w.write_all(&[Self::$ident]).map(|_| 1) + }}; +} + impl<'a, I, A> SnapshotRecord<'a, I, A> where I: Id, @@ -160,46 +191,15 @@ where &self, w: &mut W, ) -> std::io::Result { - macro_rules! encode { - ($node: ident::$status: ident) => {{ - let node = $node.as_ref(); - let encoded_node_len = T::node_encoded_len(node); - let encoded_len = 4 + 1 + encoded_node_len; - if encoded_len <= MAX_INLINED_BYTES { - let mut buf = [0u8; MAX_INLINED_BYTES]; - buf[0] = Self::$status; - buf[1..5].copy_from_slice(&(encoded_node_len as u32).to_le_bytes()); - T::encode_node(node, &mut buf[5..]).map_err(invalid_data_io_error)?; - w.write_all(&buf[..encoded_len]).map(|_| encoded_len) - } else { - let mut buf = BytesMut::with_capacity(encoded_len); - buf.put_u8(Self::$status); - buf.put_u32_le(encoded_node_len as u32); - T::encode_node(node, &mut buf).map_err(invalid_data_io_error)?; - w.write_all(&buf).map(|_| encoded_len) - } - }}; - ($t: ident($status: ident)) => {{ - const N: usize = mem::size_of::() + mem::size_of::(); - let mut data = [0u8; N]; - data[0] = Self::$status; - data[1..N].copy_from_slice(&$t.to_le_bytes()); - w.write_all(&data).map(|_| N) - }}; - ($ident: ident) => {{ - w.write_all(&[Self::$ident]).map(|_| 1) - }}; - } - match self { - Self::Alive(id) => encode!(id::ALIVE), - Self::NotAlive(id) => encode!(id::NOT_ALIVE), - Self::Clock(t) => encode!(t(CLOCK)), - Self::EventClock(t) => encode!(t(EVENT_CLOCK)), - Self::QueryClock(t) => encode!(t(QUERY_CLOCK)), - Self::Coordinate => encode!(COORDINATE), - Self::Leave => encode!(LEAVE), - Self::Comment => encode!(COMMENT), + Self::Alive(id) => encode!(w.id::ALIVE), + Self::NotAlive(id) => encode!(w.id::NOT_ALIVE), + Self::Clock(t) => encode!(w.t(CLOCK)), + Self::EventClock(t) => encode!(w.t(EVENT_CLOCK)), + Self::QueryClock(t) => encode!(w.t(QUERY_CLOCK)), + Self::Coordinate => encode!(w.COORDINATE), + Self::Leave => encode!(w.LEAVE), + Self::Comment => encode!(w.COMMENT), } } } @@ -387,6 +387,39 @@ where metric_labels: std::sync::Arc, } +// flushEvent is used to handle writing out an event +macro_rules! stream_flush_event { + ($this:ident <- $event:ident) => {{ + // Stop recording events after a leave is issued + if $this.leaving { + break; + } + + match &$event { + CrateEvent::Member(e) => $this.process_member_event(e), + CrateEvent::User(e) => $this.process_user_event(e), + CrateEvent::Query(e) => $this.process_query_event(e.ltime), + CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime), + } + }}; +} + +macro_rules! tee_stream_flush_event { + ($stream_tx:ident <- $event:ident -> $out_tx:ident) => {{ + // Forward to the internal stream, do not block + futures::select! { + _ = $stream_tx.send($event.clone()).fuse() => {} + default => {} + } + + // Forward the event immediately, do not block + futures::select! { + _ = $out_tx.send($event).fuse() => {} + default => {} + } + }}; +} + impl Snapshot where D: Delegate::ResolvedAddress>, @@ -486,27 +519,11 @@ where out_tx: Sender>, shutdown_rx: Receiver<()>, ) { - macro_rules! flush_event { - ($event:ident) => {{ - // Forward to the internal stream, do not block - futures::select! { - _ = stream_tx.send($event.clone()).fuse() => {} - default => {} - } - - // Forward the event immediately, do not block - futures::select! { - _ = out_tx.send($event).fuse() => {} - default => {} - } - }}; - } - loop { futures::select! { ev = in_rx.recv().fuse() => { if let Ok(ev) = ev { - flush_event!(ev) + tee_stream_flush_event!(stream_tx <- ev -> out_tx) } else { break; } @@ -522,7 +539,7 @@ where futures::select! { ev = in_rx.recv().fuse() => { if let Ok(ev) = ev { - flush_event!(ev) + tee_stream_flush_event!(stream_tx <- ev -> out_tx) } else { break; } @@ -540,23 +557,6 @@ where ) { let mut clock_ticker = ::interval(CLOCK_UPDATE_INTERVAL); - // flushEvent is used to handle writing out an event - macro_rules! flush_event { - ($this:ident <- $event:ident) => {{ - // Stop recording events after a leave is issued - if $this.leaving { - break; - } - - match &$event { - CrateEvent::Member(e) => $this.process_member_event(e), - CrateEvent::User(e) => $this.process_user_event(e), - CrateEvent::Query(e) => $this.process_query_event(e.ltime), - CrateEvent::InternalQuery { query, .. } => $this.process_query_event(query.ltime), - } - }}; - } - loop { futures::select! { _ = self.leave_rx.recv().fuse() => { @@ -580,7 +580,7 @@ where } ev = self.stream_rx.recv().fuse() => { if let Ok(ev) = ev { - flush_event!(self <- ev) + stream_flush_event!(self <- ev) } else { break; } @@ -606,7 +606,9 @@ where futures::select! { ev = self.stream_rx.recv().fuse() => { if let Ok(ev) = ev { - flush_event!(self <- ev) + stream_flush_event!(self <- ev) + } else { + break; } } _ = (&mut flush_timeout).fuse() => {