diff --git a/Cargo.toml b/Cargo.toml index 97a8845..5da457b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ resolver = "2" members = [ "streambed", "streambed-confidant", + "streambed-confidant-cli", "streambed-kafka", "streambed-logged", "streambed-logged-cli", @@ -59,5 +60,6 @@ tokio-util = "0.7.4" warp = "0.3" streambed = { path = "streambed", version = "0.10.2" } +streambed-confidant = { path = "streambed-confidant", version = "0.10.2" } streambed-logged = { path = "streambed-logged", version = "0.10.2" } streambed-test = { path = "streambed-test", version = "0.10.2" } diff --git a/streambed-confidant-cli/Cargo.toml b/streambed-confidant-cli/Cargo.toml new file mode 100644 index 0000000..27e24f3 --- /dev/null +++ b/streambed-confidant-cli/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "streambed-confidant-cli" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +repository.workspace = true +description = "A CLI for a file-system-based secret store that applies streambed-crypto to data" + +[dependencies] +base64 = { workspace = true } +clap = { workspace = true } +env_logger = { workspace = true } +hex = { workspace = true } +humantime = { workspace = true } +rand = { workspace = true } +serde_json = { workspace = true } +streambed = { workspace = true } +streambed-confidant = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-stream = { workspace = true } + +[[bin]] +name = "confidant" +path = "src/main.rs" diff --git a/streambed-confidant-cli/README.md b/streambed-confidant-cli/README.md new file mode 100644 index 0000000..4c45ecf --- /dev/null +++ b/streambed-confidant-cli/README.md @@ -0,0 +1,75 @@ +confidant +=== + +The `confidant` command provides a utility for conveniently operating on file-based secret store. It is +often used in conjunction `logged` to encrypt payloads as a pre-stage, or decrypt as a post-stage. No +assumption is made regarding the nature of a payload beyond it being encrypted using streambed crypto +functions. + +Running with an example encrypt followed by a decrypt +--- + +First get the executable: + +``` +cargo install streambed-confidant-cli +``` + +...or build it from this repo: + +``` +cargo build --bin confidant --release +``` + +...and make a build available on the PATH (only for `cargo build` and just once per shell session): + +``` +export PATH="$PWD/target/release":$PATH +``` + +Before you can use `confidant`, you must provide it with a root secret and a "secret id" (password) +to authenticate the session. Here's an example with some dummy data: + +``` +echo "1800af9b273e4b9ea71ec723426933a4" > /tmp/root-secret +echo "unused-id" > /tmp/ss-secret-id +``` + +We also need to create a directory for `confidant` to read and write its secrets. A security feature +of the `confidant` library is that the directory must have a permission of `600` for the owner user. +ACLs should then be used to control access for individual processes. Here's how the directory can be +created: + +``` +mkdir /tmp/confidant +chmod 700 /tmp/confidant +``` + +You would normally source the above secrets from your production system, preferably without +them leaving your production system. + +Given the root secret, encrypt some data : + +``` +echo '{"value":"SGkgdGhlcmU="}' | \ + confidant --root-path=/tmp/confidant encrypt --file - --path="default/my-secret-path" +``` + +...which would output: + +``` +{"value":"EZy4HLnFC4c/W63Qtp288WWFj8U="} +``` + +That value is now encrypted with a salt. + +We can also decrypt in a similar fashion: + +``` +echo '{"value":"EZy4HLnFC4c/W63Qtp288WWFj8U="}' | \ + confidant --root-path=/tmp/confidant decrypt --file - --path="default/my-secret-path" +``` + +...which will yield the original BASE64 value that we encrypted. + +Use `--help` to discover all of the options. diff --git a/streambed-confidant-cli/src/cryptor.rs b/streambed-confidant-cli/src/cryptor.rs new file mode 100644 index 0000000..fadf446 --- /dev/null +++ b/streambed-confidant-cli/src/cryptor.rs @@ -0,0 +1,126 @@ +use std::{ + collections::HashMap, + io::{self, Write}, + time::Duration, +}; + +use rand::RngCore; +use serde_json::Value; +use streambed::{ + crypto::{self, SALT_SIZE}, + get_secret_value, + secret_store::{SecretData, SecretStore}, +}; +use tokio::{sync::mpsc::channel, time}; + +use crate::errors::Errors; + +const FLUSH_DELAY: Duration = Duration::from_millis(100); +const OUTPUT_QUEUE_SIZE: usize = 10; + +pub async fn process_records( + ss: impl SecretStore, + mut line_reader: impl FnMut() -> Result, io::Error>, + mut output: impl Write, + path: &str, + process: fn(Vec, Vec) -> Option>, + select: &str, +) -> Result<(), Errors> { + let (output_tx, mut output_rx) = channel(OUTPUT_QUEUE_SIZE); + + let processor = async move { + while let Some(line) = line_reader()? { + let mut record = serde_json::from_str::(&line).map_err(Errors::from)?; + let Some(value) = record.get_mut(select) else { + return Err(Errors::CannotSelectValue); + }; + let Some(str_value) = value.as_str() else { + return Err(Errors::CannotGetValue); + }; + let Ok(bytes) = base64::decode(str_value) else { + return Err(Errors::CannotDecodeValue); + }; + let decrypted_bytes = get_secret_value(&ss, path) + .await + .and_then(|secret_value| { + let key = hex::decode(secret_value).ok()?; + process(key, bytes) + }) + .ok_or(Errors::CannotDecryptValue)?; + let encoded_decrypted_str = base64::encode(decrypted_bytes); + *value = Value::String(encoded_decrypted_str); + let _ = output_tx.send(record).await; + } + + Ok(()) + }; + + let outputter = async move { + let mut timeout = FLUSH_DELAY; + loop { + tokio::select! { + record = output_rx.recv() => { + if let Some(record) = record { + let buf = serde_json::to_string(&record)?; + output.write_all(buf.as_bytes())?; + output.write_all(b"\n")?; + timeout = FLUSH_DELAY; + } else { + break; + } + } + _ = time::sleep(timeout) => { + output.flush()?; + timeout = Duration::MAX; + } + } + } + Ok(()) + }; + + tokio::try_join!(processor, outputter).map(|_| ()) +} + +pub async fn decrypt( + ss: impl SecretStore, + line_reader: impl FnMut() -> Result, io::Error>, + output: impl Write, + path: &str, + select: &str, +) -> Result<(), Errors> { + fn process(key: Vec, mut bytes: Vec) -> Option> { + let (salt, data_bytes) = bytes.split_at_mut(crypto::SALT_SIZE); + crypto::decrypt(data_bytes, &key.try_into().ok()?, &salt.try_into().ok()?); + Some(data_bytes.to_vec()) + } + process_records(ss, line_reader, output, path, process, select).await +} + +pub async fn encrypt( + ss: impl SecretStore, + line_reader: impl FnMut() -> Result, io::Error>, + output: impl Write, + path: &str, + select: &str, +) -> Result<(), Errors> { + // As a convenience, we create the secret when encrypting if there + // isn't one. + if get_secret_value(&ss, path).await.is_none() { + let mut key = vec![0; 16]; + rand::thread_rng().fill_bytes(&mut key); + let data = HashMap::from([("value".to_string(), hex::encode(key))]); + ss.create_secret(path, SecretData { data }) + .await + .map_err(Errors::SecretStore)?; + } + + fn process(key: Vec, mut data_bytes: Vec) -> Option> { + let salt = crypto::salt(&mut rand::thread_rng()); + crypto::encrypt(&mut data_bytes, &key.try_into().ok()?, &salt); + let mut buf = Vec::with_capacity(SALT_SIZE + data_bytes.len()); + buf.extend(salt); + buf.extend(data_bytes); + Some(buf) + } + process_records(ss, line_reader, output, path, process, select).await +} diff --git a/streambed-confidant-cli/src/errors.rs b/streambed-confidant-cli/src/errors.rs new file mode 100644 index 0000000..0a5adf2 --- /dev/null +++ b/streambed-confidant-cli/src/errors.rs @@ -0,0 +1,96 @@ +use std::{ + error::Error, + fmt::{self, Debug}, + io, +}; + +use streambed::secret_store; + +#[derive(Debug)] +pub enum Errors { + CannotDecodeRootSecretAsHex, + CannotDecodeValue, + CannotDecryptValue, + CannotEncryptValue, + CannotGetValue, + CannotSelectValue, + EmptyRootSecretFile, + EmptySecretIdFile, + InvalidRootSecret, + InvalidSaltLen, + Io(io::Error), + RootSecretFileIo(io::Error), + SecretIdFileIo(io::Error), + SecretStore(secret_store::Error), + Serde(serde_json::Error), +} + +impl From for Errors { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +impl From for Errors { + fn from(value: serde_json::Error) -> Self { + Self::Serde(value) + } +} + +impl fmt::Display for Errors { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::CannotDecodeRootSecretAsHex => { + f.write_str("Cannot decode the root-secret as hex") + } + Self::CannotDecodeValue => { + f.write_str("Cannot decode the selected value of the JSON object") + } + Self::CannotDecryptValue => { + f.write_str("Cannot decrypt the selected value of the JSON object") + } + Self::CannotEncryptValue => { + f.write_str("Cannot encrypt the selected value of the JSON object") + } + Self::CannotGetValue => { + f.write_str("Cannot get the selected value of the selected field") + } + Self::CannotSelectValue => { + f.write_str("Cannot select the value within the JSON object") + } + Self::EmptyRootSecretFile => f.write_str("Empty root-secret file"), + Self::EmptySecretIdFile => f.write_str("Empty secret-id file"), + Self::InvalidRootSecret => { + f.write_str("Invalid root secret - must be a hex string of 32 characters") + } + Self::InvalidSaltLen => f.write_str("Invalid salt length - must be 12 bytes"), + Self::Io(e) => fmt::Display::fmt(&e, f), + Self::SecretStore(_) => f.write_str("Unauthorised access"), + Self::RootSecretFileIo(_) => f.write_str("root-secret file problem"), + Self::SecretIdFileIo(_) => f.write_str("secret-id file problem"), + Self::Serde(e) => fmt::Display::fmt(&e, f), + } + } +} + +impl Error for Errors { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::CannotDecodeValue + | Self::CannotDecodeRootSecretAsHex + | Self::CannotDecryptValue + | Self::CannotEncryptValue + | Self::CannotGetValue + | Self::CannotSelectValue + | Self::EmptyRootSecretFile + | Self::EmptySecretIdFile + | Self::InvalidRootSecret + | Self::InvalidSaltLen + | Self::SecretStore(_) => None, + Self::Io(e) => e.source(), + Self::RootSecretFileIo(e) => e.source(), + Self::SecretIdFileIo(e) => e.source(), + Self::Serde(e) => e.source(), + } + } +} diff --git a/streambed-confidant-cli/src/main.rs b/streambed-confidant-cli/src/main.rs new file mode 100644 index 0000000..cfeea56 --- /dev/null +++ b/streambed-confidant-cli/src/main.rs @@ -0,0 +1,264 @@ +use std::{ + error::Error, + fs::{self, File}, + io::{self, BufRead, BufReader, BufWriter, Read, Write}, + path::PathBuf, + time::Duration, +}; + +use clap::{Args, Parser, Subcommand}; +use errors::Errors; +use streambed::secret_store::SecretStore; +use streambed_confidant::FileSecretStore; + +pub mod cryptor; +pub mod errors; + +/// The `confidant` command provides a utility for conveniently operating on file-based secret store. It is +/// often used in conjunction `logged` to encrypt payloads as a pre-stage, or decrypt as a post-stage. JSON objects +/// followed by a newline are expected to be streamed in and out. The value to be encrypted/decypted is defaulted to a +/// field named "value", which can be altered via the `select` option. This value is expected to be encoded as BASE64 and +/// will output as encoded BASE64. No assumption is made regarding the nature of a value passed in beyond it to be +/// encrypted/decrypted using streambed crypto functions. +#[derive(Parser, Debug)] +#[clap(author, about, long_about = None, version)] +struct ProgramArgs { + /// In order to initialise the secret store, a root secret is also required. A credentials-directory path can be provided + /// where a `root-secret`` file is expected. This argument corresponds conveniently with systemd's CREDENTIALS_DIRECTORY + /// environment variable and is used by various services we have written. + /// Also associated with this argument is the `secret_id` file` for role-based authentication with the secret store. + /// This secret is expected to be found in a ss-secret-id file of the directory. + #[clap(env, long, default_value = "/tmp")] + pub credentials_directory: PathBuf, + + /// The max number of Vault Secret Store secrets to retain by our cache at any time. + /// Least Recently Used (LRU) secrets will be evicted from our cache once this value + /// is exceeded. + #[clap(env, long, default_value_t = 10_000)] + pub max_secrets: usize, + + /// The Secret Store role_id to use for approle authentication. + #[clap(env, long, default_value = "streambed-confidant-cli")] + pub role_id: String, + + /// The location of all secrets belonging to confidant. The recommendation is to + /// create a user for confidant and a requirement is to remove group and world permissions. + /// Then, use ACLs to express further access conditions. + #[clap(env, long, default_value = "/var/lib/confidant")] + pub root_path: PathBuf, + + /// A data field to used in place of Vault's lease_duration field. Time + /// will be interpreted as a humantime string e.g. "1m", "1s" etc. Note + /// that v2 of the Vault server does not appear to populate the lease_duration + /// field for the KV secret store any longer. Instead, we can use a "ttl" field + /// from the data. + #[clap(env, long)] + pub ttl_field: Option, + + /// How long we wait until re-requesting the Secret Store for an + /// unauthorized secret again. + #[clap(env, long, default_value = "1m")] + pub unauthorized_timeout: humantime::Duration, + + #[command(subcommand)] + pub command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + Decrypt(DecryptCommand), + Encrypt(EncryptCommand), +} + +/// Consume a JSON object followed by a newline character, from a stream until EOF and decrypt it. +#[derive(Args, Debug)] +struct DecryptCommand { + /// The file to consume JSON objects with newlines from, or `-` to indicate STDIN. + #[clap(short, long)] + pub file: PathBuf, + + /// The max number utf-8 characters to read from the file. + #[clap(long, default_value_t = 8 * 1024)] + pub max_line_len: u64, + + /// By default, records are output to STDOUT as JSON followed newlines. + /// This option can be used to write to a file. + #[clap(short, long)] + pub output: Option, + + /// The JSON field to select that holds the BASE64 value for decryption. + /// The first 12 bytes of a decoded BASE64 value is expected to be a salt + /// value. + #[clap(long, default_value = "value")] + pub select: String, + + /// The path to the secret e.g. "default/secrets.configurator-events.key" + #[clap(long)] + pub path: String, +} + +/// Consume a JSON object followed by a newline character, from a stream until EOF and encrypt it. +#[derive(Args, Debug)] +struct EncryptCommand { + /// The file to consume JSON objects with newlines from, or `-` to indicate STDIN. + #[clap(short, long)] + pub file: PathBuf, + + /// The max number utf-8 characters to read from the file. + #[clap(long, default_value_t = 8 * 1024)] + pub max_line_len: u64, + + /// By default, records are output to STDOUT as a JSON followed newlines. + /// This option can be used to write to a file. + #[clap(short, long)] + pub output: Option, + + /// The JSON field to select that holds the BASE64 value for encryption. + /// The first 12 bytes of a decoded BASE64 value is expected to be a salt + /// value. + #[clap(long, default_value = "value")] + pub select: String, + + /// The path to the secret e.g. "default/secrets.configurator-events.key". + /// NOTE: as a convenience, in the case where the there is no secret at + /// this path, then one will be attempted to be created. + #[clap(long)] + pub path: String, +} + +async fn secret_store( + credentials_directory: PathBuf, + max_secrets: usize, + root_path: PathBuf, + role_id: String, + ttl_field: Option<&str>, + unauthorized_timeout: Duration, +) -> Result { + let ss = { + let (root_secret, ss_secret_id) = { + let f = File::open(credentials_directory.join("root-secret")) + .map_err(Errors::RootSecretFileIo)?; + let f = BufReader::new(f); + let root_secret = f + .lines() + .next() + .ok_or(Errors::EmptyRootSecretFile)? + .map_err(Errors::RootSecretFileIo)?; + + let f = File::open(credentials_directory.join("ss-secret-id")) + .map_err(Errors::SecretIdFileIo)?; + let f = BufReader::new(f); + let ss_secret_id = f + .lines() + .next() + .ok_or(Errors::EmptyRootSecretFile)? + .map_err(Errors::SecretIdFileIo)?; + + (root_secret, ss_secret_id) + }; + + let root_secret = + hex::decode(root_secret).map_err(|_| Errors::CannotDecodeRootSecretAsHex)?; + + let ss = FileSecretStore::new( + root_path, + &root_secret + .try_into() + .map_err(|_| Errors::InvalidRootSecret)?, + unauthorized_timeout, + max_secrets, + ttl_field, + ); + + ss.approle_auth(&role_id, &ss_secret_id) + .await + .map_err(Errors::SecretStore)?; + + ss + }; + Ok(ss) +} + +type LineReaderFn = Box Result, io::Error> + Send>; +type OutputFn = Box; + +fn pipeline( + file: PathBuf, + output: Option, + max_line_len: u64, +) -> Result<(LineReaderFn, OutputFn), Errors> { + // A line reader function is provided rather than something based on the + // Read trait as reading a line from stdin involves locking it via its + // own read_line function. A Read trait will cause a lock to occur for + // each access of the Read methods, and we cannot be guaranteed which of those will + // be used - serde, for example, may only read a byte at a time. This would cause + // many locks. + let line_reader: LineReaderFn = if file.as_os_str() == "-" { + let stdin = io::stdin(); + let mut line = String::new(); + Box::new(move || { + line.clear(); + if stdin.lock().take(max_line_len).read_line(&mut line)? > 0 { + Ok(Some(line.clone())) + } else { + Ok(None) + } + }) + } else { + let mut buf = + BufReader::new(fs::File::open(file).map_err(Errors::from)?).take(max_line_len); + let mut line = String::new(); + Box::new(move || { + line.clear(); + if buf.read_line(&mut line)? > 0 { + Ok(Some(line.clone())) + } else { + Ok(None) + } + }) + }; + let output: OutputFn = if let Some(output) = output { + Box::new(BufWriter::new( + fs::File::create(output).map_err(Errors::from)?, + )) + } else { + Box::new(io::stdout()) + }; + Ok((line_reader, output)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = ProgramArgs::parse(); + + env_logger::builder().format_timestamp_millis().init(); + + let ss = secret_store( + args.credentials_directory, + args.max_secrets, + args.root_path, + args.role_id, + args.ttl_field.as_deref(), + args.unauthorized_timeout.into(), + ) + .await?; + + let task = tokio::spawn(async move { + match args.command { + Command::Decrypt(command) => { + let (line_reader, output) = + pipeline(command.file, command.output, command.max_line_len)?; + cryptor::decrypt(ss, line_reader, output, &command.path, &command.select).await + } + Command::Encrypt(command) => { + let (line_reader, output) = + pipeline(command.file, command.output, command.max_line_len)?; + cryptor::encrypt(ss, line_reader, output, &command.path, &command.select).await + } + } + }); + + task.await + .map_err(|e| e.into()) + .and_then(|r: Result<(), Errors>| r.map_err(|e| e.into())) +} diff --git a/streambed-logged-cli/README.md b/streambed-logged-cli/README.md index bd97489..30830b7 100644 --- a/streambed-logged-cli/README.md +++ b/streambed-logged-cli/README.md @@ -6,13 +6,19 @@ The `logged` command provides a utility for conveniently operating on file-based Running with an example write followed by a read --- -First build the executable: +First get the executable: + +``` +cargo install streambed-logged-cli +``` + +...or build it from this repo: ``` cargo build --bin logged --release ``` -...make it available on the PATH: +...and make a build available on the PATH (only for `cargo build` and just once per shell session): ``` export PATH="$PWD/target/release":$PATH @@ -28,7 +34,30 @@ echo '{"topic":"my-topic","headers":[],"key":0,"value":"SGkgdGhlcmU=","partition ...then read it back: ``` -logged --root-path=/tmp subscribe --subscription my-topic --idle-timeout=100ms +logged --root-path=/tmp subscribe --subscription my-topic --idle-timeout=0ms +``` + +...which would output: + +``` +{"topic":"my-topic","headers":[],"timestamp":null,"key":0,"value":"SGkgdGhlcmU=","partition":0,"offset":0} +``` + +If you're using [nushell](https://www.nushell.sh/) then you can do nice things like base64 decode payload +values along the way: + +``` +logged --root-path=/tmp subscribe --subscription my-topic --idle-timeout=0m | from json --objects | update value {decode base64} +``` + +...which would output: + +``` +╭────┬──────────┬────────────────┬───────────┬─────┬──────────┬───────────┬────────╮ +│ # │ topic │ headers │ timestamp │ key │ value │ partition │ offset │ +├────┼──────────┼────────────────┼───────────┼─────┼──────────┼───────────┼────────┤ +│ 0 │ my-topic │ [list 0 items] │ │ 0 │ Hi there │ 0 │ 0 │ +╰────┴──────────┴────────────────┴───────────┴─────┴──────────┴───────────┴────────╯ ``` Use `--help` to discover all of the options. diff --git a/streambed-logged-cli/src/errors.rs b/streambed-logged-cli/src/errors.rs index cfe67b4..9895524 100644 --- a/streambed-logged-cli/src/errors.rs +++ b/streambed-logged-cli/src/errors.rs @@ -6,6 +6,7 @@ use streambed::commit_log::ProducerError; pub enum Errors { Io(io::Error), Producer(ProducerError), + Serde(serde_json::Error), } impl From for Errors { @@ -14,11 +15,18 @@ impl From for Errors { } } +impl From for Errors { + fn from(value: serde_json::Error) -> Self { + Self::Serde(value) + } +} + impl fmt::Display for Errors { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Io(e) => e.fmt(f), Self::Producer(_) => f.write_str("CannotProduce"), + Self::Serde(e) => e.fmt(f), } } } @@ -28,6 +36,7 @@ impl Error for Errors { match self { Self::Io(e) => e.source(), Self::Producer(_) => None, + Self::Serde(e) => e.source(), } } } diff --git a/streambed-logged-cli/src/main.rs b/streambed-logged-cli/src/main.rs index fb16b31..25ea81d 100644 --- a/streambed-logged-cli/src/main.rs +++ b/streambed-logged-cli/src/main.rs @@ -1,7 +1,7 @@ use std::{ error::Error, fmt, fs, - io::{self, BufReader, BufWriter, Read, Write}, + io::{self, BufRead, BufReader, BufWriter, Read, Write}, path::PathBuf, }; @@ -38,20 +38,27 @@ enum Command { } /// Consume JSON records from a stream until EOF and append them to the log. +/// Records may be represented using JSON-line, or not. +/// JSON-line is JSON with a newline character between each object streamed. #[derive(Args, Debug)] struct ProduceCommand { /// The file to consume records from, or `-` to indicate STDIN. - #[clap(env, short, long)] + #[clap(short, long)] pub file: PathBuf, + + /// The max number utf-8 characters to read from the file. + #[clap(long, default_value_t = 8 * 1024)] + pub max_line_len: u64, } -/// Subscribe to topics and consume from them producing JSON records to a stream. +/// Subscribe to topics and consume from them producing JSON-line records to a stream. +/// JSON-line is JSON with a newline character between each object streamed. #[derive(Args, Debug)] struct SubscribeCommand { /// The amount of time to indicate that no more events are immediately /// available from the Commit Log endpoint. If unspecified then the /// CLI will wait indefinitely for records to appear. - #[clap(env, long)] + #[clap(long)] pub idle_timeout: Option, /// In the case that an offset is supplied, it is @@ -59,13 +66,13 @@ struct SubscribeCommand { /// subsequent subscription will source from the offset. /// The fields are topic name, partition and offset which /// are separated by commas with no spaces e.g. "--offset=my-topic,0,1000". - #[clap(env, long)] + #[clap(long)] #[arg(value_parser = parse_offset)] pub offset: Vec, /// By default, records of the topic are consumed and output to STDOUT. /// This option can be used to write to a file. Records are output as JSON. - #[clap(env, short, long)] + #[clap(short, long)] pub output: Option, /// In the case where a subscription topic names are supplied, the consumer @@ -73,7 +80,7 @@ struct SubscribeCommand { /// ending only when the connection to the topic is severed. /// Topics may be namespaced by prefixing with characters followed by /// a `:`. For example, "my-ns:my-topic". - #[clap(env, long, required = true)] + #[clap(long, required = true)] pub subscription: Vec, } @@ -149,6 +156,9 @@ fn parse_offset(arg: &str) -> Result { }) } +type LineReaderFn = Box Result, io::Error> + Send>; +type OutputFn = Box; + #[tokio::main] async fn main() -> Result<(), Box> { let args = ProgramArgs::parse(); @@ -160,17 +170,47 @@ async fn main() -> Result<(), Box> { let task = tokio::spawn(async move { match args.command { Command::Produce(command) => { - let input: Box = if command.file.as_os_str() == "-" { - Box::new(io::stdin()) + // A line reader function is provided rather than something based on the + // Read trait as reading a line from stdin involves locking it via its + // own read_line function. A Read trait will cause a lock to occur for + // each access of the Read methods, and we cannot be guaranteed which of those will + // be used - serde, for example, may only read a byte at a time. This would cause + // many locks. + let line_reader: LineReaderFn = if command.file.as_os_str() == "-" { + let stdin = io::stdin(); + let mut line = String::new(); + Box::new(move || { + line.clear(); + if stdin + .lock() + .take(command.max_line_len) + .read_line(&mut line)? + > 0 + { + Ok(Some(line.clone())) + } else { + Ok(None) + } + }) } else { - Box::new(BufReader::new( - fs::File::open(command.file).map_err(Errors::from)?, - )) + let mut buf = + BufReader::new(fs::File::open(command.file).map_err(Errors::from)?) + .take(command.max_line_len); + let mut line = String::new(); + Box::new(move || { + line.clear(); + if buf.read_line(&mut line)? > 0 { + Ok(Some(line.clone())) + } else { + Ok(None) + } + }) }; - producer::produce(cl, input).await + + producer::produce(cl, line_reader).await } Command::Subscribe(command) => { - let output: Box = if let Some(output) = command.output { + let output: OutputFn = if let Some(output) = command.output { Box::new(BufWriter::new( fs::File::create(output).map_err(Errors::from)?, )) diff --git a/streambed-logged-cli/src/producer.rs b/streambed-logged-cli/src/producer.rs index da5c3c8..acec628 100644 --- a/streambed-logged-cli/src/producer.rs +++ b/streambed-logged-cli/src/producer.rs @@ -1,12 +1,15 @@ -use std::io::Read; +use std::io::{self}; use streambed::commit_log::{CommitLog, ProducerRecord}; use crate::errors::Errors; -pub async fn produce(cl: impl CommitLog, input: impl Read) -> Result<(), Errors> { - let deserialiser = serde_json::from_reader::<_, ProducerRecord>(input); - for record in deserialiser.into_iter() { +pub async fn produce( + cl: impl CommitLog, + mut line_reader: impl FnMut() -> Result, io::Error>, +) -> Result<(), Errors> { + while let Some(line) = line_reader()? { + let record = serde_json::from_str::(&line)?; let record = ProducerRecord { topic: record.topic, headers: record.headers,