diff --git a/Cargo.toml b/Cargo.toml index 04ea12039..ae1eea33c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,19 +1,44 @@ [package] -name = "changes-stream" +name = "changes-stream2" description = "couchdb follower" -version = "0.1.0" -authors = ["Ashley Williams "] +version = "0.2.19" +authors = [ + "Ashley Williams ", + "René Rössler ", +] +repository = "https://github.com/elwerene/changes-stream-rust.git" license = "MIT" +edition = "2021" [dependencies] -futures = "0.1.10" -tokio-core = "0.1.4" -serde = "0.8" -serde_json = "0.8" -serde_derive = "0.8" +bytes = "1" +futures-util = "0.3" +serde_json = "1" +log = "0.4" +metrics = { version = "0.24", optional = true } +regex = { version = "1", optional = true } +reqwest = { version = "0.12", default-features = false, features = [ + "stream", + "json", +] } +serde = "1" +serde_derive = "1" +thiserror = "2" +tokio = "1" +url = "2" -[dependencies.hyper] -git = "https://github.com/hyperium/hyper.git" +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -[dependencies.hyper-tls] -git = "https://github.com/hyperium/hyper-tls.git" +[features] +default = ["native-tls"] + +# change.doc as serde_json::value::RawValue +raw_value_doc = ["serde_json/raw_value"] + +# metrics +metrics = ["dep:metrics", "dep:regex"] + +# tls library selection +native-tls = ["reqwest/native-tls"] +rustls-tls = ["reqwest/rustls-tls"] diff --git a/README.md b/README.md index 43ae7863a..e01455f7a 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,18 @@ -# changes-stream-rust +# changes_stream2 -[![travis badge](https://travis-ci.org/ashleygwilliams/changes-stream-rust.svg?branch=master)](https://travis-ci.org/ashleygwilliams/changes-stream-rust) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)]( +https://github.com/elwerene/changes-stream-rust/blob/master/LICENSE) +[![Cargo](https://img.shields.io/crates/v/changes-stream2.svg)]( +https://crates.io/crates/changes-stream2) +[![Documentation](https://docs.rs/changes-stream2/badge.svg)]( +https://docs.rs/changes-stream2) +Fork of https://github.com/ashleygwilliams/changes-stream-rust / https://crates.io/crates/changes-stream. -an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. +An implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. -this code reads in a readable stream from an endpoint and returns each chunk in JSON. +This code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs). -this code works off of the [`tokio` branch] of [`hyper`] to take advantage of new Rust Futures. - -[`tokio` branch]: https://github.com/hyperium/hyper/tree/tokio -[`hyper`]: https:///crates.io/crates/hyper - -`changes-stream-rust` only works on nightly because it uses [`serde`]. - -[`serde`]: https://crates.io/crates/serde ## usage @@ -22,28 +20,45 @@ in your `Cargo.toml`: ```toml [dependencies] -changes-stream = { git = "https://github.com/ashleygwilliams/changes-stream-rust.git" } +changes-stream2 = "0.2" ``` from [examples/follower.rs](/examples/follower.rs): ```rust -extern crate changes_stream; -extern crate futures; +use changes_stream2::{ChangesStream, Event}; +use futures_util::stream::StreamExt; -use std::io; -use std::io::Write; +#[tokio::main] +async fn main() { + let url = "https://replicate.npmjs.com/_changes".to_string(); + let mut changes = ChangesStream::new(url).await.unwrap(); + while let Some(event) = changes.next().await { + match event { + Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + Err(err) => println!("Error: {:?}", err), + } + } +} +``` -use changes_stream::ChangesStream; +## features -fn main() { - let url = "https://replicate.npmjs.com/_changes".to_string(); - let mut changes = ChangesStream::new(url); +### metrics - changes.on(|change| { - io::stdout().write_all(&change).unwrap(); - }); +Enables metrics collection of the changes stream as counter values. The name is generated from the host and path of the url(database name). The metrics are: + * `couchdb_changes_{name}_bytes`: Total bytes read from the changes stream + * `couchdb_changes_{name}_entries`: Total parsed change entries - changes.run(); -} -``` +### raw_value_doc + +Changes the type of ChangeEvent::Doc from `serde_json::Map` to `serde_json::value::RawValue`. + +### native-tls + +Use the native-tls crate for TLS connections. This is the default. + +### rustls-tls + +Use the rustls crate for TLS connections. \ No newline at end of file diff --git a/examples/follower.rs b/examples/follower.rs index d256463d3..aa1c377f8 100644 --- a/examples/follower.rs +++ b/examples/follower.rs @@ -1,14 +1,15 @@ -extern crate serde_json; -extern crate changes_stream; -extern crate futures; +use changes_stream2::{ChangesStream, Event}; +use futures_util::stream::StreamExt; -use changes_stream::ChangesStream; - -fn main() { - let url = String::from("https://replicate.npmjs.com/_changes"); - let mut changes = ChangesStream::new(url); - changes.on(|change| { - println!("{}: {}", change.seq, change.id); - }); - changes.run(); +#[tokio::main] +async fn main() { + let url = "https://replicate.npmjs.com/_changes".to_string(); + let mut changes = ChangesStream::new(url).await.unwrap(); + while let Some(event) = changes.next().await { + match event { + Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + Err(err) => println!("Error: {:?}", err), + } + } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 000000000..e0a6a25d2 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,20 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Could not parse the url")] + InvalidUrl(#[from] url::ParseError), + #[error("Request failed")] + RequestFailed(#[from] reqwest::Error), + #[error("Server answered with non-ok status: {status}. body: {body}")] + InvalidResponse { + status: reqwest::StatusCode, + body: String, + }, + #[error("Could not parse server response: {json}")] + ParsingFailed { + #[source] + error: serde_json::Error, + json: String, + }, +} diff --git a/src/event.rs b/src/event.rs index a9ab5c3bb..c319745c8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,11 +1,35 @@ +use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; + #[derive(Serialize, Deserialize, Debug)] -pub struct Event { - pub seq: u64, +pub enum Event { + Change(ChangeEvent), + Finished(FinishedEvent), +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChangeEvent { + pub seq: Value, pub id: String, - pub changes: Vec + pub changes: Vec, + + #[serde(default)] + pub deleted: bool, + + #[serde(default)] + #[cfg(feature = "raw_value_doc")] + pub doc: Option>, + #[cfg(not(feature = "raw_value_doc"))] + pub doc: Option>, } #[derive(Serialize, Deserialize, Debug)] +pub struct FinishedEvent { + pub last_seq: Value, + pub pending: Option, // not available on CouchDB 1.0 +} + +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Change { pub rev: String, } diff --git a/src/lib.rs b/src/lib.rs index ad06e5745..02e07545c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,224 +1,206 @@ -//! This is documentation for the `changes-stream` crate. -//! -//! The `changes-stream` crate is designed to give you a readable stream of -//! chunked data, upon which you can register multiple handlers, that are -//! called on Read of the data chunk. - -#[macro_use] -extern crate serde_derive; - -extern crate serde; -extern crate serde_json; -extern crate futures; -extern crate hyper; -extern crate hyper_tls; -extern crate tokio_core; - -use futures::Future; -use futures::stream::Stream; -use hyper::Client; -use std::cell::RefCell; +//! The `changes-stream` crate is designed to give you a +//! futures::Stream of CouchDB changes stream events. +use bytes::{buf::IntoIter, Bytes}; +use futures_util::stream::Stream; +use log::error; +use serde_json::Value; +use std::{mem::replace, pin::Pin, task::Poll}; + +mod error; mod event; -use event::Event; - -const DELIMITER: &'static str = ",\n"; -const PROLOGUE: &'static str = "{\"results\":["; - -/// A structure to generate a readable stream on which you can register handlers. -/// -/// Internally, the `ChangesStream` struct holds 3 members: -/// -/// | Member | Type | Notes | -/// |-------------|---------------------------------------|-------------------------------------------------------------------------| -/// | `db` | `String` | A url pointing to the data you'd like to stream. | -/// | `lp` | [`tokio_core::reactor::Core`] | The event loop | -/// | `handlers` | `Vec where F: Fn(&`[`hyper::Chunk`]`)` | A vector of handlers to be called on each Chunk from the Stream on Read | -/// -/// [`tokio_core::reactor::Core`]: ../tokio_core/reactor/struct.Core.html -/// [`hyper::Chunk`]: ../hyper/struct.Chunk.html +pub use error::Error; +pub use event::{Change, ChangeEvent, Event, FinishedEvent}; + +/// A structure which implements futures::Stream pub struct ChangesStream { - db: hyper::Uri, - lp: tokio_core::reactor::Core, - handlers: Vec>, + /// metrics bytes counter + #[cfg(feature = "metrics")] + bytes: metrics::Counter, + /// metrics entries counter + #[cfg(feature = "metrics")] + entries: metrics::Counter, + /// Source of http chunks provided by reqwest + source: Pin> + Send>>, + /// Buffer of current line and current chunk iterator + buf: (Vec, Option>), } impl ChangesStream { - /// Constructs a new `ChangesStream` struct + /// Constructs a new `ChangesStream` struct with a post payload /// /// Takes a single argument, `db`, which represents the /// url of the data you wish to stream. /// - /// Every `ChangesStream` struct is initialized with - /// an event loop ([`tokio_core::reactor::Core`]) and an - /// empty vector of handlers. See above for more details. - /// - /// [`tokio_core::reactor::Core`]: ../tokio_core/reactor/struct.Core.html - /// /// For example, to create a new `ChangesStream` struct /// for the npmjs registry, you would write: /// /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; + /// # use changes_stream2::{ChangesStream, Event}; + /// # use futures_util::stream::StreamExt; /// # - /// # - /// # use changes_stream::ChangesStream; - /// # - /// # fn main() { - /// let url = "https://replicate.npmjs.com/_changes".to_string(); - /// let mut changes = ChangesStream::new(url); - /// # - /// # changes.on(|change| { - /// # let data = serde_json::to_string(change).unwrap(); - /// # println!("{}", data); - /// # }); - /// # - /// # changes.run(); + /// # #[tokio::main] + /// # async fn main() { + /// # let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string(); + /// # let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap(); + /// # while let Some(event) = changes.next().await { + /// # match event { + /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + /// # Err(err) => println!("Error: {:?}", err), + /// # } + /// # } /// # } /// ``` - pub fn new(db: String) -> ChangesStream { - ChangesStream { - db: db.parse().unwrap(), - lp: tokio_core::reactor::Core::new().unwrap(), - handlers: vec![], + pub async fn with_post_payload(db: String, payload: &Value) -> Result { + let url = url::Url::parse(&db)?; + #[cfg(feature = "metrics")] + let database = regex::Regex::new(r"(?m)[_/]+") + .unwrap() + .replace_all( + &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()), + "_", + ) + .to_string(); + + let client = reqwest::Client::new(); + let res = client.post(url).json(payload).send().await?; + let status = res.status(); + if !status.is_success() { + let body = res.text().await.unwrap_or_default(); + return Err(Error::InvalidResponse { status, body }); } + let source = Pin::new(Box::new(res.bytes_stream())); + + #[cfg(feature = "metrics")] + let (bytes, entries) = { + let bytes_name = "couchdb_changes_bytes_total"; + let entries_name = "couchdb_changes_entries_total"; + metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes"); + metrics::describe_counter!( + entries_name, + metrics::Unit::Count, + "Changes stream entries" + ); + ( + metrics::counter!(bytes_name, "database" => database.clone()), + metrics::counter!(entries_name, "database" => database), + ) + }; + + Ok(Self { + #[cfg(feature = "metrics")] + bytes, + #[cfg(feature = "metrics")] + entries, + source, + buf: (Vec::new(), None), + }) } - /// Registers a handler. A handler is simply a function - /// you'd like to call on a chunk from the stream at the - /// time the chunk is read. - /// - /// `.on()` takes a single argument, a closure. The - /// closure you pass should take a single [`hyper::Chunk`] - /// as an argument. + /// Constructs a new `ChangesStream` struct /// - /// [`hyper::Chunk`]: ../hyper/struct.Chunk.html + /// Takes a single argument, `db`, which represents the + /// url of the data you wish to stream. /// - /// For example, to write the data in a chunk to standard - /// out, you would write: + /// For example, to create a new `ChangesStream` struct + /// for the npmjs registry, you would write: /// /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; + /// # use changes_stream2::{ChangesStream, Event}; + /// # use futures_util::stream::StreamExt; /// # - /// # use changes_stream::ChangesStream; - /// # - /// # fn main() { - /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url); - /// # - /// changes.on(|change| { - /// let data = serde_json::to_string(change).unwrap(); - /// println!("{}", data); - /// }); - /// # - /// # changes.run(); + /// # #[tokio::main] + /// # async fn main() { + /// # let url = "https://replicate.npmjs.com/_changes".to_string(); + /// # let mut changes = ChangesStream::new(url).await.unwrap(); + /// # while let Some(event) = changes.next().await { + /// # match event { + /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + /// # Err(err) => println!("Error: {:?}", err), + /// # } + /// # } /// # } /// ``` - pub fn on(&mut self, handler: F) { - self.handlers.push(Box::new(handler)); + pub async fn new(db: String) -> Result { + Self::with_post_payload(db, &serde_json::json!({})).await } +} - /// Runs the `ChangesStream` struct's event loop, `lp`. - /// - /// Call this after you have regsitered all handlers using - /// `on`. - /// - /// Takes no arguments. - /// - /// For example: - /// - /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; - /// # - /// # use changes_stream::ChangesStream; - /// # - /// # fn main() { - /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url); - /// # - /// # changes.on(|change| { - /// # let data = serde_json::to_string(change).unwrap(); - /// # println!("{}", data); - /// # }); - /// # - /// changes.run(); - /// # } - /// ``` - pub fn run(mut self) { - let handle = self.lp.handle(); - let client = Client::configure() - // 4 is number of threads to use for dns resolution - .connector(hyper_tls::HttpsConnector::new(4, &handle)) - .build(&handle); - - let handlers = self.handlers; - self.lp - .run(client.get(self.db).and_then(move |res| { - assert!(res.status().is_success()); - - // Buffer up incomplete json lines. - let buffer: Vec = vec![]; - let buffer_cell = RefCell::new(buffer); - - res.body().for_each(move |chunk| { - if chunk.starts_with(PROLOGUE.as_bytes()) { - return Ok(()); - } - let mut source = chunk.to_vec(); - let mut borrowed = buffer_cell.borrow_mut(); - if borrowed.len() > 0 { - source = [borrowed.clone(), chunk.to_vec()].concat(); - borrowed.clear(); +impl Stream for ChangesStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + 'main: loop { + if self.buf.1.is_none() { + match Stream::poll_next(self.source.as_mut(), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(Ok(chunk))) => { + #[cfg(feature = "metrics")] + self.bytes.increment(chunk.len() as u64); + + self.buf.1 = Some(chunk.into_iter()) } - if chunk.starts_with(DELIMITER.as_bytes()) { - source = chunk[2..].to_vec(); + Poll::Ready(Some(Err(err))) => { + error!("Error getting next chunk: {:?}", err); + return Poll::Ready(None); } - - match serde_json::from_slice(source.as_slice()) { - Err(_) => { - // We probably have an incomplete chunk of json. Buffer it & move on. - borrowed.append(&mut chunk.to_vec()); - } - Ok(json) => { - let event: Event = json; - for handler in &handlers { - handler(&event); - } + } + } else { + let (line, chunk_iter) = &mut self.buf; + let iter = chunk_iter.as_mut().unwrap(); + + loop { + if let Some(byte) = iter.next() { + if byte == 0x0A { + // Found '\n', end of line + break; } + line.push(byte); + } else { + // We need another chunk to fill the line + *chunk_iter = None; + continue 'main; } - Ok(()) - }) - })) - .unwrap(); - } - - pub fn run_with(self, mut lp: tokio_core::reactor::Core) { - let handle = self.lp.handle(); - let client = Client::configure() - // 4 is number of threads to use for dns resolution - .connector(hyper_tls::HttpsConnector::new(4, &handle)) - .build(&handle); - - let handlers = self.handlers; - lp - .run(client.get(self.db).and_then(move |res| { - assert!(res.status().is_success()); - - res.body().for_each(move |chunk| { - let event: Event = serde_json::from_slice(&chunk).unwrap(); - for handler in &handlers { - handler(&event); - } - Ok(()) - }) - })) - .unwrap(); - + } + + let line = replace(line, Vec::with_capacity(line.len() * 2)); + if line.len() < 14 { + // skip prologue, epilogue and empty lines (continuous mode) + continue; + } + + let mut len = line.len(); + if line[len - 1] == 0x0D { + // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n" + len -= 1; + } + if line[len - 1] == 0x2C { + // 0x2C is ',' + len -= 1; + } + + let result = serde_json::from_slice::(&line[..len]) + .map(Event::Change) + .or_else(|error| { + serde_json::from_slice::(&line[..len]) + .map(Event::Finished) + .map_err(|_err| Error::ParsingFailed { + error, + json: String::from_utf8(line).unwrap_or_default(), + }) + }); + + #[cfg(feature = "metrics")] + self.entries.increment(1); + + return Poll::Ready(Some(result)); + } + } } } diff --git a/tests/changes-stream.rs b/tests/changes-stream.rs deleted file mode 100644 index 7b3564ab3..000000000 --- a/tests/changes-stream.rs +++ /dev/null @@ -1,9 +0,0 @@ -extern crate changes_stream; - -use changes_stream::ChangesStream; - -#[test] -fn test_new() { - let db = String::from("https://replicate.npmjs.com/_changes"); - let mut changes = ChangesStream::new(db); -}