Skip to content

Commit 5c9a66f

Browse files
authored
Merge pull request #54 from huntc/logged-cli
Introduces the logged CLI
2 parents 6cc5686 + 93a67fa commit 5c9a66f

File tree

11 files changed

+357
-12
lines changed

11 files changed

+357
-12
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"streambed-confidant",
77
"streambed-kafka",
88
"streambed-logged",
9+
"streambed-logged-cli",
910
"streambed-patterns",
1011
"streambed-storage",
1112
"streambed-test",
@@ -15,7 +16,7 @@ members = [
1516
]
1617

1718
[workspace.package]
18-
version = "0.9.1" # WHEN CHANGING THIS, CHANGE THE "STREAMBED" DEPENDENCIES BELOW ALSO.
19+
version = "0.9.1" # WHEN CHANGING THIS, CHANGE THE "STREAMBED" DEPENDENCIES BELOW ALSO.
1920
edition = "2021"
2021
rust-version = "1.70.0"
2122
license = "Apache-2.0"

streambed-confidant/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ assert!(ss.get_secret("some.secret").await.unwrap().is_some());
4343
The primary functional use-cases of confidant are:
4444

4545
* retrieve and store a user's secrets (user meaning operating system
46-
user); and
46+
user); and
4747
* share secrets with other users.
4848

4949
The primary operational use-cases of confidant are:
5050

5151
* to be hosted by resource-constrained devices, typically with less
52-
than 128MiB memory and 8GB of storage.
52+
than 128MiB memory and 8GB of storage.
5353

5454
## What is confidant?
5555

streambed-kafka/src/args.rs

-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ pub struct CommitLogArgs {
1212
#[clap(env, long, default_value = "100ms")]
1313
pub cl_idle_timeout: humantime::Duration,
1414

15-
/// A namespace to connect to
16-
#[clap(env, long)]
17-
pub cl_namespace: Option<String>,
18-
1915
/// A namespace to use when communicating with the Commit Log
2016
#[clap(env, long, default_value = "default")]
2117
pub cl_ns: String,

streambed-logged-cli/Cargo.toml

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "streambed-logged-cli"
3+
version.workspace = true
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
license.workspace = true
7+
repository.workspace = true
8+
9+
[dependencies]
10+
clap = { workspace = true }
11+
env_logger = { workspace = true }
12+
git-version = { workspace = true }
13+
humantime = { workspace = true }
14+
serde_json = { workspace = true }
15+
smol_str = { workspace = true }
16+
tokio = { workspace = true }
17+
tokio-stream = { workspace = true }
18+
19+
streambed = { path = "../streambed" }
20+
streambed-logged = { path = "../streambed-logged" }
21+
22+
[[bin]]
23+
name = "logged"
24+
path = "src/main.rs"

streambed-logged-cli/README.md

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
logged
2+
===
3+
4+
The `logged` command provides a utility for conveniently operating on file-based commit logs. Functions such as the ability to consume a JSON file of records, or produce them, are available. No assumptions are made regarding the structure of a record's value (payload), or whether it is encrypted or not. The expectation is that a separate tool for that concern is used in a pipeline if required.
5+
6+
Running with an example write followed by a read
7+
---
8+
9+
First build the executable:
10+
11+
```
12+
cargo build --bin logged --release
13+
```
14+
15+
...make it available on the PATH:
16+
17+
```
18+
export PATH="$PWD/target/release":$PATH
19+
```
20+
21+
...then write some data to a topic named `my-topic`:
22+
23+
```
24+
echo '{"topic":"my-topic","headers":[],"key":0,"value":"SGkgdGhlcmU=","partition":0}' | \
25+
logged --root-path=/tmp produce --file -
26+
```
27+
28+
...then read it back:
29+
30+
```
31+
logged --root-path=/tmp subscribe --subscription my-topic --idle-timeout=100ms
32+
```
33+
34+
Use `--help` to discover all of the options.

streambed-logged-cli/src/errors.rs

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::{error::Error, fmt, io};
2+
3+
use streambed::commit_log::ProducerError;
4+
5+
#[derive(Debug)]
6+
pub enum Errors {
7+
Io(io::Error),
8+
Producer(ProducerError),
9+
}
10+
11+
impl From<io::Error> for Errors {
12+
fn from(value: io::Error) -> Self {
13+
Self::Io(value)
14+
}
15+
}
16+
17+
impl fmt::Display for Errors {
18+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19+
match self {
20+
Self::Io(e) => e.fmt(f),
21+
Self::Producer(_) => f.write_str("CannotProduce"),
22+
}
23+
}
24+
}
25+
26+
impl Error for Errors {
27+
fn source(&self) -> Option<&(dyn Error + 'static)> {
28+
match self {
29+
Self::Io(e) => e.source(),
30+
Self::Producer(_) => None,
31+
}
32+
}
33+
}

streambed-logged-cli/src/main.rs

+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use std::{
2+
error::Error,
3+
fmt, fs,
4+
io::{self, BufReader, BufWriter, Read, Write},
5+
path::PathBuf,
6+
};
7+
8+
use clap::{Args, Parser, Subcommand};
9+
use errors::Errors;
10+
use git_version::git_version;
11+
use streambed::commit_log::{ConsumerOffset, Subscription};
12+
use streambed_logged::FileLog;
13+
14+
pub mod errors;
15+
pub mod producer;
16+
pub mod subscriber;
17+
18+
/// A utility for conveniently operating on file-based commit logs.
19+
/// Functions such as the ability to consume a JSON file of records,
20+
/// or produce them, are available.
21+
/// No assumptions are made regarding the structure of a record's
22+
/// value (payload), or whether it is encrypted or not. The expectation
23+
/// is that a separate tool for that concern is used in a pipeline.
24+
#[derive(Parser, Debug)]
25+
#[clap(author, about, long_about = None, version = git_version ! ())]
26+
struct ProgramArgs {
27+
/// The location of all topics in the Commit Log
28+
#[clap(env, long, default_value = "/var/lib/logged")]
29+
pub root_path: PathBuf,
30+
31+
#[command(subcommand)]
32+
pub command: Command,
33+
}
34+
35+
#[derive(Subcommand, Debug)]
36+
enum Command {
37+
Produce(ProduceCommand),
38+
Subscribe(SubscribeCommand),
39+
}
40+
41+
/// Consume JSON records from a stream until EOF and append them to the log.
42+
#[derive(Args, Debug)]
43+
struct ProduceCommand {
44+
/// The file to consume records from, or `-` to indicate STDIN.
45+
#[clap(env, short, long)]
46+
pub file: PathBuf,
47+
}
48+
49+
/// Subscribe to topics and consume from them producing JSON records to a stream.
50+
#[derive(Args, Debug)]
51+
struct SubscribeCommand {
52+
/// The amount of time to indicate that no more events are immediately
53+
/// available from the Commit Log endpoint. If unspecified then the
54+
/// CLI will wait indefinitely for records to appear.
55+
#[clap(env, long)]
56+
pub idle_timeout: Option<humantime::Duration>,
57+
58+
/// In the case that an offset is supplied, it is
59+
/// associated with their respective topics such that any
60+
/// subsequent subscription will source from the offset.
61+
/// The fields are topic name, partition and offset which
62+
/// are separated by commas with no spaces e.g. "--offset=my-topic,0,1000".
63+
#[clap(env, long)]
64+
#[arg(value_parser = parse_offset)]
65+
pub offset: Vec<Offset>,
66+
67+
/// By default, records of the topic are consumed and output to STDOUT.
68+
/// This option can be used to write to a file. Records are output as JSON.
69+
#[clap(env, short, long)]
70+
pub output: Option<PathBuf>,
71+
72+
/// In the case where a subscription topic names are supplied, the consumer
73+
/// instance will subscribe and reply with a stream of records
74+
/// ending only when the connection to the topic is severed.
75+
/// Topics may be namespaced by prefixing with characters followed by
76+
/// a `:`. For example, "my-ns:my-topic".
77+
#[clap(env, long, required = true)]
78+
pub subscription: Vec<String>,
79+
}
80+
81+
#[derive(Clone, Debug)]
82+
struct Offset {
83+
pub topic: String,
84+
pub partition: u32,
85+
pub offset: u64,
86+
}
87+
88+
impl From<Offset> for ConsumerOffset {
89+
fn from(value: Offset) -> Self {
90+
ConsumerOffset {
91+
topic: value.topic.into(),
92+
partition: value.partition,
93+
offset: value.offset,
94+
}
95+
}
96+
}
97+
98+
#[derive(Debug)]
99+
enum OffsetParseError {
100+
MissingTopic,
101+
MissingPartition,
102+
InvalidPartition,
103+
MissingOffset,
104+
InvalidOffset,
105+
}
106+
107+
impl fmt::Display for OffsetParseError {
108+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109+
match self {
110+
OffsetParseError::MissingTopic => {
111+
f.write_str("Missing the topic as the first part of the argument")
112+
}
113+
OffsetParseError::MissingPartition => {
114+
f.write_str("Missing the partition number as the second part to the argument")
115+
}
116+
OffsetParseError::InvalidPartition => {
117+
f.write_str("An invalid partition number was provided")
118+
}
119+
OffsetParseError::MissingOffset => {
120+
f.write_str("Missing the offset as the third part to the argument")
121+
}
122+
OffsetParseError::InvalidOffset => f.write_str("An invalid offset number was provided"),
123+
}
124+
}
125+
}
126+
127+
impl Error for OffsetParseError {}
128+
129+
fn parse_offset(arg: &str) -> Result<Offset, OffsetParseError> {
130+
let mut iter = arg.split(',');
131+
let Some(topic) = iter.next().map(|s| s.to_string()) else {
132+
return Err(OffsetParseError::MissingTopic);
133+
};
134+
let Some(partition) = iter.next() else {
135+
return Err(OffsetParseError::MissingPartition);
136+
};
137+
let Ok(partition) = partition.parse() else {
138+
return Err(OffsetParseError::InvalidPartition);
139+
};
140+
let Some(offset) = iter.next() else {
141+
return Err(OffsetParseError::MissingOffset);
142+
};
143+
let Ok(offset) = offset.parse() else {
144+
return Err(OffsetParseError::InvalidOffset);
145+
};
146+
Ok(Offset {
147+
topic,
148+
partition,
149+
offset,
150+
})
151+
}
152+
153+
#[tokio::main]
154+
async fn main() -> Result<(), Box<dyn Error>> {
155+
let args = ProgramArgs::parse();
156+
157+
env_logger::builder().format_timestamp_millis().init();
158+
159+
let cl = FileLog::new(args.root_path);
160+
161+
let task = tokio::spawn(async move {
162+
match args.command {
163+
Command::Produce(command) => {
164+
let input: Box<dyn Read + Send> = if command.file.as_os_str() == "-" {
165+
Box::new(io::stdin())
166+
} else {
167+
Box::new(BufReader::new(
168+
fs::File::open(command.file).map_err(Errors::from)?,
169+
))
170+
};
171+
producer::produce(cl, input).await
172+
}
173+
Command::Subscribe(command) => {
174+
let output: Box<dyn Write + Send> = if let Some(output) = command.output {
175+
Box::new(BufWriter::new(
176+
fs::File::create(output).map_err(Errors::from)?,
177+
))
178+
} else {
179+
Box::new(io::stdout())
180+
};
181+
subscriber::subscribe(
182+
cl,
183+
command.idle_timeout.map(|d| d.into()),
184+
command.offset.into_iter().map(|o| o.into()).collect(),
185+
output,
186+
command
187+
.subscription
188+
.into_iter()
189+
.map(|s| Subscription { topic: s.into() })
190+
.collect(),
191+
)
192+
.await
193+
}
194+
}
195+
});
196+
197+
task.await
198+
.map_err(|e| e.into())
199+
.and_then(|r: Result<(), Errors>| r.map_err(|e| e.into()))
200+
}

streambed-logged-cli/src/producer.rs

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use std::io::Read;
2+
3+
use streambed::commit_log::{CommitLog, ProducerRecord};
4+
5+
use crate::errors::Errors;
6+
7+
pub async fn produce(cl: impl CommitLog, input: impl Read) -> Result<(), Errors> {
8+
let deserialiser = serde_json::from_reader::<_, ProducerRecord>(input);
9+
for record in deserialiser.into_iter() {
10+
let record = ProducerRecord {
11+
topic: record.topic,
12+
headers: record.headers,
13+
timestamp: record.timestamp,
14+
key: record.key,
15+
value: record.value,
16+
partition: record.partition,
17+
};
18+
cl.produce(record).await.map_err(Errors::Producer)?;
19+
}
20+
Ok(())
21+
}

0 commit comments

Comments
 (0)