Skip to content

Commit

Permalink
Take care when overwriting - need to truncate file if it already exis…
Browse files Browse the repository at this point in the history
…ts. Otherwise writing will start at the beginning and overwrite and potentially leave extra data at the end of the file.
  • Loading branch information
curtisalexander committed Aug 24, 2022
1 parent e16f481 commit 4ce8fb1
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 26 deletions.
48 changes: 40 additions & 8 deletions readstat/src/rs_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,19 @@ impl ReadStatWriter {
fn write_data_to_csv(&mut self, d: &ReadStatData, rsp: &ReadStatPath) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Some(p) = &rsp.out_path {
// if already started writing, then need to append to file; otherwise create file
let f = OpenOptions::new()
let f = if self.wrote_start {
OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(p)?;
.open(p)?
} else {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(p)?
};

// set message for what is being read/written
self.write_message_for_rows(d, rsp)?;
Expand Down Expand Up @@ -274,11 +282,19 @@ impl ReadStatWriter {
fn write_data_to_feather(&mut self, d: &ReadStatData, rsp: &ReadStatPath) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Some(p) = &rsp.out_path {
// if already started writing, then need to append to file; otherwise create file
let f = OpenOptions::new()
let f = if self.wrote_start {
OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(p)?;
.open(p)?
} else {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(p)?
};

// set message for what is being read/written
self.write_message_for_rows(d, rsp)?;
Expand Down Expand Up @@ -332,11 +348,19 @@ impl ReadStatWriter {
fn write_data_to_ndjson(&mut self, d: &ReadStatData, rsp: &ReadStatPath) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Some(p) = &rsp.out_path {
// if already started writing, then need to append to file; otherwise create file
let f = OpenOptions::new()
let f = if self.wrote_start {
OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(p)?;
.open(p)?
} else {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(p)?
};

// set message for what is being read/written
self.write_message_for_rows(d, rsp)?;
Expand Down Expand Up @@ -380,11 +404,19 @@ impl ReadStatWriter {
fn write_data_to_parquet(&mut self, d: &ReadStatData, rsp: &ReadStatPath) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Some(p) = &rsp.out_path {
// if already started writing, then need to append to file; otherwise create file
let f = OpenOptions::new()
let f = if self.wrote_start {
OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(p)?;
.open(p)?
} else {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(p)?
};

// set message for what is being read/written
self.write_message_for_rows(d, rsp)?;
Expand Down
98 changes: 80 additions & 18 deletions readstat/tests/cli_data_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,60 @@ use assert_fs::NamedTempFile;
use polars::prelude::*;
use std::{fs::File, path::PathBuf, process::Command, result::Result};

enum OverwriteOption {
Overwrite(NamedTempFile),
DoNotOverwrite,
}

fn cli_data_to_parquet(
base_file_name: &str,
overwrite: OverwriteOption,
rows_to_stream: Option<u32>,
) -> Result<(Command, NamedTempFile), Box<dyn std::error::Error>> {
let tempfile = NamedTempFile::new(format!("{}.parquet", base_file_name))?;

let mut cmd = Command::cargo_bin("readstat")?;

if let Some(rows) = rows_to_stream {
cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()])
.args(["--stream-rows", rows.to_string().as_str()])
.arg("--overwrite");
} else {
cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()])
.arg("--overwrite");
}
let tempfile = match (overwrite, rows_to_stream) {
(OverwriteOption::Overwrite(tempfile), Some(rows)) => {
cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()])
.args(["--stream-rows", rows.to_string().as_str()])
.arg("--overwrite");

tempfile
}
(OverwriteOption::DoNotOverwrite, Some(rows)) => {
let tempfile = NamedTempFile::new(format!("{}.parquet", base_file_name))?;

cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()])
.args(["--stream-rows", rows.to_string().as_str()]);

tempfile
}
(OverwriteOption::Overwrite(tempfile), None) => {
cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()])
.arg("--overwrite");

tempfile
}
(OverwriteOption::DoNotOverwrite, None) => {
let tempfile = NamedTempFile::new(format!("{}.parquet", base_file_name))?;

cmd.arg("data")
.arg(format!("tests/data/{}.sas7bdat", base_file_name))
.args(["--format", "parquet"])
.args(["--output", tempfile.as_os_str().to_str().unwrap()]);

tempfile
}
};

Ok((cmd, tempfile))
}
Expand All @@ -40,7 +72,8 @@ fn parquet_to_df(path: PathBuf) -> Result<DataFrame, Box<dyn std::error::Error>>

#[test]
fn cars_to_parquet() {
let (mut cmd, tempfile) = cli_data_to_parquet("cars", None).unwrap();
let (mut cmd, tempfile) =
cli_data_to_parquet("cars", OverwriteOption::DoNotOverwrite, None).unwrap();

cmd.assert().success().stdout(predicate::str::contains(
"In total, wrote 1,081 rows from file cars.sas7bdat into cars.parquet",
Expand All @@ -58,7 +91,36 @@ fn cars_to_parquet() {

#[test]
fn cars_to_parquet_with_streaming() {
let (mut cmd, tempfile) = cli_data_to_parquet("cars", Some(500)).unwrap();
let (mut cmd, tempfile) =
cli_data_to_parquet("cars", OverwriteOption::DoNotOverwrite, Some(500)).unwrap();

cmd.assert().success().stdout(predicate::str::contains(
"In total, wrote 1,081 rows from file cars.sas7bdat into cars.parquet",
));

let df = parquet_to_df(tempfile.to_path_buf()).unwrap();

let (height, width) = df.shape();

assert_eq!(height, 1081);
assert_eq!(width, 13);

tempfile.close().unwrap();
}

#[test]
fn cars_to_parquet_overwrite() {
// first stream
let (mut cmd, tempfile) =
cli_data_to_parquet("cars", OverwriteOption::DoNotOverwrite, Some(500)).unwrap();

cmd.assert().success().stdout(predicate::str::contains(
"In total, wrote 1,081 rows from file cars.sas7bdat into cars.parquet",
));

// next do not stream
let (mut cmd, tempfile) =
cli_data_to_parquet("cars", OverwriteOption::Overwrite(tempfile), None).unwrap();

cmd.assert().success().stdout(predicate::str::contains(
"In total, wrote 1,081 rows from file cars.sas7bdat into cars.parquet",
Expand Down

0 comments on commit 4ce8fb1

Please sign in to comment.