diff --git a/src/publish.rs b/src/publish.rs index 8c9725d..3760e4f 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -323,11 +323,15 @@ pub async fn receive_url_collector_data(recv: &mut RecvStream) -> Result let mut ts_buf = [0; std::mem::size_of::()]; frame::recv_bytes(recv, &mut ts_buf).await?; + let mut source_buf = Vec::new(); + frame::recv_raw(recv, &mut source_buf).await?; + let mut record_buf = Vec::new(); frame::recv_raw(recv, &mut record_buf).await?; let mut result_buf: Vec = Vec::new(); result_buf.extend_from_slice(&ts_buf); + result_buf.extend_from_slice(&source_buf); result_buf.extend_from_slice(&record_buf); Ok(result_buf) @@ -699,9 +703,13 @@ mod tests { post_body: Vec::new(), }; let raw_event = bincode::serialize(&http).unwrap(); + let source = bincode::serialize(&"hello").unwrap(); let raw_len = u32::try_from(raw_event.len()).unwrap().to_le_bytes(); + let source_len = u32::try_from(source.len()).unwrap().to_le_bytes(); let mut send_buf: Vec = Vec::new(); send_buf.extend_from_slice(&6666_i64.to_le_bytes()); + send_buf.extend_from_slice(&source_len); + send_buf.extend_from_slice(&source); send_buf.extend_from_slice(&raw_len); send_buf.extend_from_slice(&raw_event); send_bytes(&mut channel.server.send, &mut send_buf) @@ -713,6 +721,7 @@ mod tests { .unwrap(); let mut result_buf: Vec = Vec::new(); result_buf.extend_from_slice(&6666_i64.to_le_bytes()); + result_buf.extend_from_slice(&source); result_buf.extend_from_slice(&raw_event); assert_eq!(data, result_buf);