diff --git a/r/R/parquet.R b/r/R/parquet.R index 623d8ee908a92..be0c718f1a555 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -431,6 +431,11 @@ ParquetFileWriter <- R6Class("ParquetFileWriter", assert_is(table, "Table") parquet___arrow___FileWriter__WriteTable(self, table, chunk_size) }, + WriteBatch = function(batch, chunk_size) { + assert_is(batch, "RecordBatch") + table <- Table$create(batch) + parquet___arrow___FileWriter__WriteTable(self, table, chunk_size) + }, Close = function() parquet___arrow___FileWriter__Close(self) ) ) diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R index f2359116fdaf1..cc57022600f8d 100644 --- a/r/tests/testthat/test-parquet.R +++ b/r/tests/testthat/test-parquet.R @@ -530,3 +530,31 @@ test_that("thrift string and container size can be specified when reading Parque data <- reader_container$ReadTable() expect_identical(collect.ArrowTabular(data), example_data) }) + +test_that("We can use WriteBatch on ParquetFileWriter", { + tf <- tempfile() + on.exit(unlink(tf)) + sink <- FileOutputStream$create(tf) + sch <- schema(a = int32()) + props <- ParquetWriterProperties$create(column_names = names(sch)) + writer <- ParquetFileWriter$create(schema = sch, sink = sink, properties = props) + + batch <- RecordBatch$create(data.frame(a = 1:10)) + writer$WriteBatch(batch, chunk_size = 10) + writer$WriteBatch(batch, chunk_size = 10) + writer$WriteBatch(batch, chunk_size = 10) + writer$Close() + + tbl <- read_parquet(tf) + expect_equal(nrow(tbl), 30) +}) + +test_that("WriteBatch on ParquetFileWriter errors when called on closed sink", { + sink <- FileOutputStream$create(tempfile()) + sch <- schema(a = int32()) + props <- ParquetWriterProperties$create(column_names = names(sch)) + writer <- ParquetFileWriter$create(schema = sch, sink = sink, properties = props) + writer$Close() + batch <- RecordBatch$create(data.frame(a = 1:10)) + expect_error(writer$WriteBatch(batch, chunk_size = 10), "Operation on closed file") +})