Skip to content

Commit

Permalink
UFT-8 printable payloads (#59)
Browse files Browse the repository at this point in the history
UFT-8 printable payloads (#59)
  • Loading branch information
gr211 authored Apr 28, 2024
1 parent 0ba803c commit 4dc980c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 25 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Arch Linux.
-o, --output-file <OUTPUT_FILE> Output file to write to
-c, --concurrent <CONCURRENT> Concurrent number of shards to tail
-v, --verbose Display additional information
-b, --base64-encoding Base64 encode the payload (for binary payloads)
--base64 Base64 encode payloads (eg. for binary data)
--utf8 Forces UTF-8 printable payloads
-h, --help Print help
-V, --version Print version

Expand Down
25 changes: 22 additions & 3 deletions src/cli_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::sink::PayloadEnc;
use anyhow::{anyhow, Result};
use aws_sdk_kinesis::meta::PKG_VERSION;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -88,9 +89,27 @@ pub struct Opt {
#[structopt(short, long)]
pub verbose: bool,

/// Base64 encode the payload (eg. for binary payloads)
#[structopt(short, long)]
pub base64_encoding: bool,
/// Base64 encode payloads (eg. for binary data)
#[structopt(long)]
#[arg(group = "encoding")]
pub base64: bool,

/// Forces UTF-8 printable payloads
#[structopt(long)]
#[arg(group = "encoding")]
pub utf8: bool,
}

impl Opt {
pub fn encoding(&self) -> PayloadEnc {
if self.base64 {
PayloadEnc::Base64
} else if self.utf8 {
PayloadEnc::Utf8
} else {
PayloadEnc::Raw
}
}
}

pub(crate) fn selected_shards(
Expand Down
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ async fn main() -> Result<()> {

let handle = tokio::spawn({
let tx_records = tx_records.clone();
let encoding = opt.encoding().clone();

async move {
match opt.output_file {
Some(file) => {
Expand All @@ -67,7 +69,7 @@ async fn main() -> Result<()> {
opt.print_shard_id,
opt.print_timestamp,
opt.print_delimiter,
opt.base64_encoding,
encoding,
shard_count,
file,
)
Expand All @@ -83,7 +85,7 @@ async fn main() -> Result<()> {
opt.print_shard_id,
opt.print_timestamp,
opt.print_delimiter,
opt.base64_encoding,
encoding,
shard_count,
)
.run(tx_records, rx_records)
Expand Down
30 changes: 19 additions & 11 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ use crate::kinesis::models::{ProcessError, RecordResult, ShardProcessorADT};
pub mod console;
pub mod file;

#[derive(Clone, Default)]
pub enum PayloadEnc {
Base64,
Utf8,
#[default]
Raw,
}

#[derive(Clone, Default)]
pub struct SinkConfig {
max_messages: Option<u32>,
Expand All @@ -27,7 +35,7 @@ pub struct SinkConfig {
print_timestamp: bool,
print_delimiter: bool,
exit_after_termination: bool,
base64_encoding: bool,
encoding: PayloadEnc,
}

pub trait Configurable {
Expand Down Expand Up @@ -248,17 +256,17 @@ where
fn format_record(&self, record_result: &RecordResult) -> Vec<u8> {
let line_feed = vec![b'\n'];

let payload = match &record_result.data {
payload if self.get_config().base64_encoding => {
let payload: Cow<[u8]> = match self.get_config().encoding {
PayloadEnc::Base64 => {
use base64::{engine::general_purpose, Engine as _};
Cow::Owned(
general_purpose::STANDARD
.encode(payload)
.as_bytes()
.to_vec(),
)
let base64_str = general_purpose::STANDARD.encode(&record_result.data);
Cow::Owned(Vec::from(base64_str.as_bytes()))
}
PayloadEnc::Utf8 => {
let utf8_str = String::from_utf8_lossy(&record_result.data);
unsafe { std::mem::transmute::<Cow<str>, Cow<'_, [u8]>>(utf8_str) }
}
payload => Cow::Borrowed(payload),
PayloadEnc::Raw => Cow::Borrowed(&record_result.data),
};

let partition_key = if self.get_config().print_key {
Expand Down Expand Up @@ -306,7 +314,7 @@ where
sequence_number.as_bytes(),
shard_id.as_bytes(),
date.as_bytes(),
payload.as_slice(),
payload.as_ref(),
line_feed.as_slice(),
]
.concat()
Expand Down
6 changes: 3 additions & 3 deletions src/sink/console.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::sink::{Configurable, SinkConfig, SinkOutput};
use crate::sink::{Configurable, PayloadEnc, SinkConfig, SinkOutput};
use anyhow::Result;
use colored::Colorize;
use std::io;
Expand All @@ -21,7 +21,7 @@ impl ConsoleSink {
print_shard_id: bool,
print_timestamp: bool,
print_delimiter: bool,
no_base64: bool,
encoding: PayloadEnc,
shard_count: usize,
) -> Self {
ConsoleSink {
Expand All @@ -33,7 +33,7 @@ impl ConsoleSink {
print_shard_id,
print_timestamp,
print_delimiter,
base64_encoding: no_base64,
encoding,
exit_after_termination: true,
},
shard_count,
Expand Down
30 changes: 28 additions & 2 deletions src/sink/console_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn format_outputs() {
fn format_outputs_base64() {
let console = ConsoleSink {
config: SinkConfig {
base64_encoding: true,
encoding: PayloadEnc::Base64,
no_color: true,
..Default::default()
},
Expand All @@ -75,11 +75,37 @@ fn format_outputs_base64() {
assert_eq!(result, "SGVsbG8g8JCAV29ybGQ=\n");
}

#[test]
fn format_outputs_utf8() {
let console = ConsoleSink {
config: SinkConfig {
encoding: PayloadEnc::Utf8,
no_color: true,
..Default::default()
},
shard_count: 1,
};

let input = b"Hello \xF0\x90\x80World";

let record = RecordResult {
shard_id: Arc::new("".to_string()),
sequence_id: "sequence_id".to_string(),
partition_key: "partition_key".to_string(),
datetime: DateTime::from_secs(1_000_000_i64),
data: input.to_vec(),
};

let vec = console.format_record(&record);
let result = String::from_utf8_lossy(vec.as_slice());
assert_eq!(result, "Hello �World\n");
}

#[test]
fn format_outputs_raw() {
let console = ConsoleSink {
config: SinkConfig {
base64_encoding: false,
encoding: PayloadEnc::Raw,
no_color: true,
..Default::default()
},
Expand Down
6 changes: 3 additions & 3 deletions src/sink/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::io::BufWriter;
use std::path::PathBuf;

use crate::sink::{Configurable, SinkConfig, SinkOutput};
use crate::sink::{Configurable, PayloadEnc, SinkConfig, SinkOutput};

pub const FILE_BUF_SIZE: usize = 512 * 1024; // 512Ko

Expand All @@ -24,7 +24,7 @@ impl FileSink {
print_shard_id: bool,
print_timestamp: bool,
print_delimiter: bool,
no_base64: bool,
encoding: PayloadEnc,
shard_count: usize,
file: P,
) -> Self {
Expand All @@ -37,7 +37,7 @@ impl FileSink {
print_shard_id,
print_timestamp,
print_delimiter,
base64_encoding: no_base64,
encoding,
exit_after_termination: true,
},
file: file.into(),
Expand Down

0 comments on commit 4dc980c

Please sign in to comment.