|
| 1 | +use std::{ |
| 2 | + collections::HashMap, |
| 3 | + io::{self, Write}, |
| 4 | + time::Duration, |
| 5 | +}; |
| 6 | + |
| 7 | +use rand::RngCore; |
| 8 | +use serde_json::Value; |
| 9 | +use streambed::{ |
| 10 | + crypto::{self, SALT_SIZE}, |
| 11 | + get_secret_value, |
| 12 | + secret_store::{SecretData, SecretStore}, |
| 13 | +}; |
| 14 | +use tokio::{sync::mpsc::channel, time}; |
| 15 | + |
| 16 | +use crate::errors::Errors; |
| 17 | + |
| 18 | +const FLUSH_DELAY: Duration = Duration::from_millis(100); |
| 19 | +const OUTPUT_QUEUE_SIZE: usize = 10; |
| 20 | + |
| 21 | +pub async fn process_records( |
| 22 | + ss: impl SecretStore, |
| 23 | + mut line_reader: impl FnMut() -> Result<Option<String>, io::Error>, |
| 24 | + mut output: impl Write, |
| 25 | + path: &str, |
| 26 | + process: fn(Vec<u8>, Vec<u8>) -> Option<Vec<u8>>, |
| 27 | + select: &str, |
| 28 | +) -> Result<(), Errors> { |
| 29 | + let (output_tx, mut output_rx) = channel(OUTPUT_QUEUE_SIZE); |
| 30 | + |
| 31 | + let processor = async move { |
| 32 | + while let Some(line) = line_reader()? { |
| 33 | + let mut record = serde_json::from_str::<Value>(&line).map_err(Errors::from)?; |
| 34 | + let Some(value) = record.get_mut(select) else { |
| 35 | + return Err(Errors::CannotSelectValue); |
| 36 | + }; |
| 37 | + let Some(str_value) = value.as_str() else { |
| 38 | + return Err(Errors::CannotGetValue); |
| 39 | + }; |
| 40 | + let Ok(bytes) = base64::decode(str_value) else { |
| 41 | + return Err(Errors::CannotDecodeValue); |
| 42 | + }; |
| 43 | + let decrypted_bytes = get_secret_value(&ss, path) |
| 44 | + .await |
| 45 | + .and_then(|secret_value| { |
| 46 | + let key = hex::decode(secret_value).ok()?; |
| 47 | + process(key, bytes) |
| 48 | + }) |
| 49 | + .ok_or(Errors::CannotDecryptValue)?; |
| 50 | + let encoded_decrypted_str = base64::encode(decrypted_bytes); |
| 51 | + *value = Value::String(encoded_decrypted_str); |
| 52 | + let _ = output_tx.send(record).await; |
| 53 | + } |
| 54 | + |
| 55 | + Ok(()) |
| 56 | + }; |
| 57 | + |
| 58 | + let outputter = async move { |
| 59 | + let mut timeout = FLUSH_DELAY; |
| 60 | + loop { |
| 61 | + tokio::select! { |
| 62 | + record = output_rx.recv() => { |
| 63 | + if let Some(record) = record { |
| 64 | + let buf = serde_json::to_string(&record)?; |
| 65 | + output.write_all(buf.as_bytes())?; |
| 66 | + output.write_all(b"\n")?; |
| 67 | + timeout = FLUSH_DELAY; |
| 68 | + } else { |
| 69 | + break; |
| 70 | + } |
| 71 | + } |
| 72 | + _ = time::sleep(timeout) => { |
| 73 | + output.flush()?; |
| 74 | + timeout = Duration::MAX; |
| 75 | + } |
| 76 | + } |
| 77 | + } |
| 78 | + Ok(()) |
| 79 | + }; |
| 80 | + |
| 81 | + tokio::try_join!(processor, outputter).map(|_| ()) |
| 82 | +} |
| 83 | + |
| 84 | +pub async fn decrypt( |
| 85 | + ss: impl SecretStore, |
| 86 | + line_reader: impl FnMut() -> Result<Option<String>, io::Error>, |
| 87 | + output: impl Write, |
| 88 | + path: &str, |
| 89 | + select: &str, |
| 90 | +) -> Result<(), Errors> { |
| 91 | + fn process(key: Vec<u8>, mut bytes: Vec<u8>) -> Option<Vec<u8>> { |
| 92 | + let (salt, data_bytes) = bytes.split_at_mut(crypto::SALT_SIZE); |
| 93 | + crypto::decrypt(data_bytes, &key.try_into().ok()?, &salt.try_into().ok()?); |
| 94 | + Some(data_bytes.to_vec()) |
| 95 | + } |
| 96 | + process_records(ss, line_reader, output, path, process, select).await |
| 97 | +} |
| 98 | + |
| 99 | +pub async fn encrypt( |
| 100 | + ss: impl SecretStore, |
| 101 | + line_reader: impl FnMut() -> Result<Option<String>, io::Error>, |
| 102 | + output: impl Write, |
| 103 | + path: &str, |
| 104 | + select: &str, |
| 105 | +) -> Result<(), Errors> { |
| 106 | + // As a convenience, we create the secret when encrypting if there |
| 107 | + // isn't one. |
| 108 | + if get_secret_value(&ss, path).await.is_none() { |
| 109 | + let mut key = vec![0; 16]; |
| 110 | + rand::thread_rng().fill_bytes(&mut key); |
| 111 | + let data = HashMap::from([("value".to_string(), hex::encode(key))]); |
| 112 | + ss.create_secret(path, SecretData { data }) |
| 113 | + .await |
| 114 | + .map_err(Errors::SecretStore)?; |
| 115 | + } |
| 116 | + |
| 117 | + fn process(key: Vec<u8>, mut data_bytes: Vec<u8>) -> Option<Vec<u8>> { |
| 118 | + let salt = crypto::salt(&mut rand::thread_rng()); |
| 119 | + crypto::encrypt(&mut data_bytes, &key.try_into().ok()?, &salt); |
| 120 | + let mut buf = Vec::with_capacity(SALT_SIZE + data_bytes.len()); |
| 121 | + buf.extend(salt); |
| 122 | + buf.extend(data_bytes); |
| 123 | + Some(buf) |
| 124 | + } |
| 125 | + process_records(ss, line_reader, output, path, process, select).await |
| 126 | +} |
0 commit comments