Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 0.2 #19

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
340cdae
Update dependency versions, use tokio-native-tls.
elwerene Jan 7, 2021
9ee9fe3
Switch to reqwest for basic_auth support.
elwerene Jan 11, 2021
7d8c887
error handling
elwerene Jan 11, 2021
c53fc1a
update Readme
elwerene Jan 11, 2021
61a3525
fix path
elwerene Jan 11, 2021
2a6b180
add Send to source to make the whole Stream Send
elwerene Jan 11, 2021
a38c9f5
make types public
elwerene Jan 14, 2021
4811585
make pending optional
elwerene Jan 14, 2021
607ac05
use Map for doc
elwerene Jan 18, 2021
b8df6a9
handle stream error
elwerene Mar 11, 2021
bac1567
update dependencies
elwerene Mar 15, 2021
9d3b11f
rename to changes-stream2
elwerene Mar 15, 2021
7d1d3a5
add note, that this is a fork
elwerene Mar 15, 2021
38dd74e
add features so we can switch to rustls
elwerene Nov 23, 2021
79cf5a9
continue searching for newline characters
elwerene Dec 10, 2021
b98bf7b
add badges
elwerene Jan 6, 2022
b9c4605
update reqwest, more readable buffer code
elwerene Apr 30, 2024
c893fe5
optimize away some copy operations
elwerene Apr 30, 2024
4e11a7a
update edition, dependency versions and bump version
elwerene Apr 30, 2024
80aafe0
support to deserialize a change doc as serde_json::value::RawValue
elwerene May 7, 2024
c02d63b
add metrics support (optional)
elwerene May 7, 2024
b09833f
add features overview to Readme
elwerene May 7, 2024
0c6bb86
expect a change event to optimize parsing a bit
elwerene May 8, 2024
0204114
add _total postfix
elwerene May 8, 2024
ed3add6
bump version
elwerene May 8, 2024
a9c4453
change error to use thiserror
elwerene May 14, 2024
98ffe1c
support for post request bodys to support _selector filters.
elwerene May 21, 2024
c7324d8
prefix metrics names with "couchdb_changes_"
elwerene May 21, 2024
a1e99b6
fix Readme
elwerene May 21, 2024
dfa461d
update metrics
elwerene Jun 6, 2024
1c681f6
change how metrics are reported
elwerene Jun 7, 2024
30f5ab8
fix metrics
elwerene Jun 7, 2024
a275b57
change error response to include body
elwerene Jul 8, 2024
aa8ded5
add body to error message
elwerene Jul 8, 2024
a28febf
update dependencies
elwerene Nov 21, 2024
963d893
new version
elwerene Nov 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,44 @@
[package]
name = "changes-stream"
name = "changes-stream2"
description = "couchdb follower"
version = "0.1.0"
authors = ["Ashley Williams <[email protected]>"]
version = "0.2.19"
authors = [
"Ashley Williams <[email protected]>",
"René Rössler <[email protected]>",
]
repository = "https://github.com/elwerene/changes-stream-rust.git"
license = "MIT"
edition = "2021"

[dependencies]
futures = "0.1.10"
tokio-core = "0.1.4"
serde = "0.8"
serde_json = "0.8"
serde_derive = "0.8"
bytes = "1"
futures-util = "0.3"
serde_json = "1"
log = "0.4"
metrics = { version = "0.24", optional = true }
regex = { version = "1", optional = true }
reqwest = { version = "0.12", default-features = false, features = [
"stream",
"json",
] }
serde = "1"
serde_derive = "1"
thiserror = "2"
tokio = "1"
url = "2"

[dependencies.hyper]
git = "https://github.com/hyperium/hyper.git"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

[dependencies.hyper-tls]
git = "https://github.com/hyperium/hyper-tls.git"
[features]
default = ["native-tls"]

# change.doc as serde_json::value::RawValue
raw_value_doc = ["serde_json/raw_value"]

# metrics
metrics = ["dep:metrics", "dep:regex"]

# tls library selection
native-tls = ["reqwest/native-tls"]
rustls-tls = ["reqwest/rustls-tls"]
69 changes: 42 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,64 @@
# changes-stream-rust
# changes_stream2

[![travis badge](https://travis-ci.org/ashleygwilliams/changes-stream-rust.svg?branch=master)](https://travis-ci.org/ashleygwilliams/changes-stream-rust)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](
https://github.com/elwerene/changes-stream-rust/blob/master/LICENSE)
[![Cargo](https://img.shields.io/crates/v/changes-stream2.svg)](
https://crates.io/crates/changes-stream2)
[![Documentation](https://docs.rs/changes-stream2/badge.svg)](
https://docs.rs/changes-stream2)

Fork of https://github.com/ashleygwilliams/changes-stream-rust / https://crates.io/crates/changes-stream.

an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust.
An implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust.

this code reads in a readable stream from an endpoint and returns each chunk in JSON.
This code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs).

this code works off of the [`tokio` branch] of [`hyper`] to take advantage of new Rust Futures.

[`tokio` branch]: https://github.com/hyperium/hyper/tree/tokio
[`hyper`]: https:///crates.io/crates/hyper

`changes-stream-rust` only works on nightly because it uses [`serde`].

[`serde`]: https://crates.io/crates/serde

## usage

in your `Cargo.toml`:

```toml
[dependencies]
changes-stream = { git = "https://github.com/ashleygwilliams/changes-stream-rust.git" }
changes-stream2 = "0.2"
```

from [examples/follower.rs](/examples/follower.rs):

```rust
extern crate changes_stream;
extern crate futures;
use changes_stream2::{ChangesStream, Event};
use futures_util::stream::StreamExt;

use std::io;
use std::io::Write;
#[tokio::main]
async fn main() {
let url = "https://replicate.npmjs.com/_changes".to_string();
let mut changes = ChangesStream::new(url).await.unwrap();
while let Some(event) = changes.next().await {
match event {
Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
Err(err) => println!("Error: {:?}", err),
}
}
}
```

use changes_stream::ChangesStream;
## features

fn main() {
let url = "https://replicate.npmjs.com/_changes".to_string();
let mut changes = ChangesStream::new(url);
### metrics

changes.on(|change| {
io::stdout().write_all(&change).unwrap();
});
Enables metrics collection of the changes stream as counter values. The name is generated from the host and path of the url(database name). The metrics are:
* `couchdb_changes_{name}_bytes`: Total bytes read from the changes stream
* `couchdb_changes_{name}_entries`: Total parsed change entries

changes.run();
}
```
### raw_value_doc

Changes the type of ChangeEvent::Doc from `serde_json::Map<String, serde_json::Value>` to `serde_json::value::RawValue`.

### native-tls

Use the native-tls crate for TLS connections. This is the default.

### rustls-tls

Use the rustls crate for TLS connections.
25 changes: 13 additions & 12 deletions examples/follower.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
extern crate serde_json;
extern crate changes_stream;
extern crate futures;
use changes_stream2::{ChangesStream, Event};
use futures_util::stream::StreamExt;

use changes_stream::ChangesStream;

fn main() {
let url = String::from("https://replicate.npmjs.com/_changes");
let mut changes = ChangesStream::new(url);
changes.on(|change| {
println!("{}: {}", change.seq, change.id);
});
changes.run();
#[tokio::main]
async fn main() {
let url = "https://replicate.npmjs.com/_changes".to_string();
let mut changes = ChangesStream::new(url).await.unwrap();
while let Some(event) = changes.next().await {
match event {
Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id),
Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq),
Err(err) => println!("Error: {:?}", err),
}
}
}
20 changes: 20 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("Could not parse the url")]
InvalidUrl(#[from] url::ParseError),
#[error("Request failed")]
RequestFailed(#[from] reqwest::Error),
#[error("Server answered with non-ok status: {status}. body: {body}")]
InvalidResponse {
status: reqwest::StatusCode,
body: String,
},
#[error("Could not parse server response: {json}")]
ParsingFailed {
#[source]
error: serde_json::Error,
json: String,
},
}
30 changes: 27 additions & 3 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Serialize, Deserialize, Debug)]
pub struct Event {
pub seq: u64,
pub enum Event {
Change(ChangeEvent),
Finished(FinishedEvent),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChangeEvent {
pub seq: Value,
pub id: String,
pub changes: Vec<Change>
pub changes: Vec<Change>,

#[serde(default)]
pub deleted: bool,

#[serde(default)]
#[cfg(feature = "raw_value_doc")]
pub doc: Option<Box<serde_json::value::RawValue>>,
#[cfg(not(feature = "raw_value_doc"))]
pub doc: Option<serde_json::Map<String, Value>>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct FinishedEvent {
pub last_seq: Value,
pub pending: Option<u64>, // not available on CouchDB 1.0
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Change {
pub rev: String,
}
Loading