Skip to content

Commit

Permalink
Add compress-output flag and uncompress subcommand (#33)
Browse files Browse the repository at this point in the history
* Add compress-output flag and uncompress subcommand

* added high level test for compress/uncompress

Co-authored-by: Daniel Turner <[email protected]>
  • Loading branch information
rohan-b99 and Daniel Turner authored Aug 22, 2022
1 parent fd073f0 commit 84dfabd
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 23 deletions.
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] }
structopt = "0.3"
uuid = { version = "0.8", features = [ "v4"] }
mimalloc = "0.1.29"
zstd = "0.11.2"

[dev-dependencies]
pretty_assertions = "1.2.1"
6 changes: 5 additions & 1 deletion src/anonymiser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ pub fn anonymise(
input_file: String,
output_file: String,
strategy_file: String,
compress_output: bool,
transformer_overrides: TransformerOverrides,
) -> Result<(), std::io::Error> {
match strategy_file::read(&strategy_file, transformer_overrides) {
Ok(strategies) => {
file_reader::read(input_file, output_file, &strategies)?;
file_reader::read(input_file, output_file, &strategies, compress_output)?;
Ok(())
}
Err(_) => {
Expand All @@ -33,6 +34,7 @@ mod tests {
"test_files/dump_file.sql".to_string(),
"test_files/results.sql".to_string(),
"non_existing_strategy_file.json".to_string(),
false,
TransformerOverrides::none(),
)
.is_ok());
Expand All @@ -45,6 +47,7 @@ mod tests {
"non_existing_input_file.sql".to_string(),
"test_files/results.sql".to_string(),
"test_files/strategy.json".to_string(),
false,
TransformerOverrides::none(),
)
.is_ok());
Expand All @@ -56,6 +59,7 @@ mod tests {
"test_files/dump_file.sql".to_string(),
"test_files/results.sql".to_string(),
"test_files/strategy.json".to_string(),
false,
TransformerOverrides::none(),
)
.is_ok());
Expand Down
81 changes: 60 additions & 21 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ pub fn read(
input_file_path: String,
output_file_path: String,
strategies: &Strategies,
compress_output: bool,
) -> Result<(), std::io::Error> {
let output_file = File::create(output_file_path).unwrap();
let mut file_writer = BufWriter::new(output_file);
let output_file = File::create(output_file_path)?;
let mut file_writer: Box<dyn Write> = match compress_output {
true => Box::new(zstd::Encoder::new(output_file, 1)?.auto_finish()),
false => Box::new(BufWriter::new(output_file)),
};

let file_reader = File::open(&input_file_path)
.unwrap_or_else(|_| panic!("Input file '{}' does not exist", input_file_path));
Expand All @@ -26,21 +30,14 @@ pub fn read(
let mut rng = rng::get();

loop {
match reader.read_line(&mut line) {
Ok(bytes_read) => {
if bytes_read == 0 {
break;
}

let transformed_row =
row_parser::parse(&mut rng, &line, &mut row_parser_state, strategies);
file_writer.write_all(transformed_row.as_bytes())?;
line.clear();
}
Err(err) => {
return Err(err);
}
let bytes_read = reader.read_line(&mut line)?;
if bytes_read == 0 {
break;
}

let transformed_row = row_parser::parse(&mut rng, &line, &mut row_parser_state, strategies);
file_writer.write_all(transformed_row.as_bytes())?;
line.clear();
}
Ok(())
}
Expand All @@ -49,14 +46,13 @@ pub fn read(
mod tests {
use super::*;
use crate::parsers::strategy_structs::*;
use crate::uncompress::uncompress;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;

#[test]
fn can_read() {
let input_file = "test_files/dump_file.sql".to_string();
let output_file = "test_files/file_reader_test_results.sql".to_string();
fn default_strategies() -> Strategies {
let mut strategies = Strategies::new();
strategies.insert(
"public.orders".to_string(),
Expand Down Expand Up @@ -90,8 +86,17 @@ mod tests {
strategy_tuple("phone_number"),
]),
);
strategies
}

#[test]
fn can_read() {
let input_file = "test_files/dump_file.sql".to_string();
let output_file = "test_files/file_reader_test_results.sql".to_string();
let _ = fs::remove_file(&output_file).ok();
let strategies = default_strategies();

assert!(read(input_file.clone(), output_file.clone(), &strategies).is_ok());
assert!(read(input_file.clone(), output_file.clone(), &strategies, false).is_ok());

let original =
fs::read_to_string(&input_file).expect("Something went wrong reading the file");
Expand All @@ -102,6 +107,40 @@ mod tests {
assert_eq!(original, processed);
}

#[test]
fn can_read_and_output_compressed() {
let input_file = "test_files/dump_file.sql".to_string();
let compressed_file = "test_files/compressed_file_reader_test_results.sql".to_string();
let uncompressed_file_name = "test_files/uncompressed_file_reader_test_results.sql";

let _ = fs::remove_file(&compressed_file);
let _ = fs::remove_file(&uncompressed_file_name);

let strategies = default_strategies();

assert!(read(
input_file.clone(),
compressed_file.clone(),
&strategies,
true
)
.is_ok());

uncompress(
PathBuf::from(&compressed_file),
Some(PathBuf::from(uncompressed_file_name)),
)
.expect("Should not fail to uncompress!");

let original =
fs::read_to_string(&input_file).expect("Something went wrong reading the file");

let processed = fs::read_to_string(&uncompressed_file_name)
.expect("Something went wrong reading the file");

assert_eq!(original, processed);
}

fn strategy_tuple(column_name: &str) -> (String, ColumnInfo) {
(
column_name.to_string(),
Expand Down
10 changes: 9 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
mod anonymiser;
mod file_reader;
use std::fmt::Write;
mod fixer;
mod opts;
mod parsers;
mod uncompress;

use crate::opts::{Anonymiser, Opts};
use crate::parsers::strategies::Strategies;
use crate::parsers::strategy_structs::{MissingColumns, SimpleColumn, TransformerOverrides};
use itertools::Itertools;
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use std::fmt::Write;

use parsers::{db_schema, strategy_file};
use structopt::StructOpt;
Expand All @@ -27,6 +29,7 @@ fn main() -> Result<(), std::io::Error> {
input_file,
output_file,
strategy_file,
compress_output,
allow_potential_pii,
allow_commercially_sensitive,
} => {
Expand All @@ -39,6 +42,7 @@ fn main() -> Result<(), std::io::Error> {
input_file,
output_file,
strategy_file,
compress_output,
transformer_overrides,
)?
}
Expand Down Expand Up @@ -84,6 +88,10 @@ fn main() -> Result<(), std::io::Error> {
}
}
}
Anonymiser::Uncompress {
input_file,
output_file,
} => uncompress::uncompress(input_file, output_file).expect("failed to uncompress"),
}
Ok(())
}
Expand Down
14 changes: 14 additions & 0 deletions src/opts.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::PathBuf;

use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "Anonymiser", about = "Anonymise your database backups!")]
Expand All @@ -16,6 +18,9 @@ pub enum Anonymiser {
output_file: String,
#[structopt(short, long, default_value = "./strategy.json")]
strategy_file: String,
/// Compress output using zstd
#[structopt(short, long)]
compress_output: bool,
/// Does not transform PotentiallPii data types
#[structopt(long)]
allow_potential_pii: bool,
Expand Down Expand Up @@ -49,4 +54,13 @@ pub enum Anonymiser {
#[structopt(short, long, env = "DATABASE_URL")]
db_url: String,
},
/// Uncompress a zstd sql dump to a file, or stdout if no file specified
Uncompress {
/// Input file (*.sql.zst)
#[structopt(short, long)]
input_file: PathBuf,
/// Output file, will write to standard output if not specified
#[structopt(short, long)]
output_file: Option<PathBuf>,
},
}
64 changes: 64 additions & 0 deletions src/uncompress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::fs::File;
use std::path::PathBuf;

pub fn uncompress(input_file: PathBuf, output_file: Option<PathBuf>) -> Result<(), std::io::Error> {
let input = File::open(input_file)?;
match output_file {
Some(output) => zstd::stream::copy_decode(input, File::create(output)?),
None => zstd::stream::copy_decode(input, std::io::stdout()),
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use crate::{anonymiser::anonymise, parsers::strategy_structs::TransformerOverrides};

use super::uncompress;

#[test]
fn compress_gives_correct_output() {
let test_dir_path = PathBuf::from("test_files/compress");
std::fs::create_dir_all(&test_dir_path).unwrap();

anonymise(
"test_files/dump_file.sql".to_string(),
"test_files/compress/results.sql".to_string(),
"test_files/strategy.json".to_string(),
false,
TransformerOverrides::none(),
)
.unwrap();

anonymise(
"test_files/dump_file.sql".to_string(),
"test_files/compress/results.sql.zst".to_string(),
"test_files/strategy.json".to_string(),
true,
TransformerOverrides::none(),
)
.unwrap();

uncompress(
PathBuf::from("test_files/compress/results.sql.zst"),
Some(test_dir_path.join("uncompressed.sql")),
)
.unwrap();

// Can't compare actual content because of randomization, but # of lines
// should be the same
assert_eq!(
std::fs::read_to_string("test_files/compress/results.sql")
.unwrap()
.lines()
.count(),
std::fs::read_to_string("test_files/compress/uncompressed.sql")
.unwrap()
.lines()
.count()
);

std::fs::remove_dir_all(test_dir_path).unwrap();
}
}

0 comments on commit 84dfabd

Please sign in to comment.