diff --git a/cpp/arrow/FileWriter.cpp b/cpp/arrow/FileWriter.cpp index c5f2a7ae..f2936c91 100644 --- a/cpp/arrow/FileWriter.cpp +++ b/cpp/arrow/FileWriter.cpp @@ -97,11 +97,32 @@ extern "C" ) } + PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_WriteRecordBatches( + FileWriter* writer, struct ArrowArrayStream* stream, int64_t chunk_size) + { + TRYCATCH + ( + std::shared_ptr reader; + PARQUET_ASSIGN_OR_THROW(reader, arrow::ImportRecordBatchReader(stream)); + std::vector> batches; + PARQUET_ASSIGN_OR_THROW(batches, reader->ToRecordBatches()); + for (const auto& batch : batches) + { + PARQUET_THROW_NOT_OK(writer->WriteRecordBatch(*batch)); + } + ) + } + PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_NewRowGroup(FileWriter* writer, int64_t chunk_size) { TRYCATCH(PARQUET_THROW_NOT_OK(writer->NewRowGroup(chunk_size));) } + PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_NewBufferedRowGroup(FileWriter* writer) + { + TRYCATCH(PARQUET_THROW_NOT_OK(writer->NewBufferedRowGroup());) + } + PARQUETSHARP_EXPORT ExceptionInfo* FileWriter_WriteColumnChunk( FileWriter* writer, struct ArrowArray* c_array, struct ArrowSchema* c_array_type) { diff --git a/csharp.test/Arrow/TestFileWriter.cs b/csharp.test/Arrow/TestFileWriter.cs index 817539e4..3fef283c 100644 --- a/csharp.test/Arrow/TestFileWriter.cs +++ b/csharp.test/Arrow/TestFileWriter.cs @@ -128,6 +128,80 @@ public async Task TestWriteRecordBatch() await VerifyData(inStream, numRows); } + [Test] + public async Task TestWriteBufferedRecordBatches() + { + var fields = new[] + { + new Field("x", new Apache.Arrow.Types.Int32Type(), false), + new Field("y", new Apache.Arrow.Types.FloatType(), false), + }; + var schema = new Apache.Arrow.Schema(fields, null); + + RecordBatch GetBatch(int xVal, int numRows) + { + var arrays = new IArrowArray[] + { + new Int32Array.Builder() + .AppendRange(Enumerable.Repeat(xVal, numRows)) + .Build(), + new FloatArray.Builder() + .AppendRange(Enumerable.Range(0, numRows).Select(i => i / 100.0f)) + .Build(), + }; + return new RecordBatch(schema, arrays, numRows); + } + + using var buffer = new ResizableBuffer(); + using (var outStream = new BufferOutputStream(buffer)) + { + using var propertiesBuilder = new WriterPropertiesBuilder(); + propertiesBuilder.MaxRowGroupLength(250); + using var writerProperties = propertiesBuilder.Build(); + using var writer = new FileWriter(outStream, schema, writerProperties); + + using var batch0 = GetBatch(0, 100); + writer.WriteBufferedRecordBatch(batch0); + using var batch1 = GetBatch(0, 100); + writer.WriteBufferedRecordBatch(batch1); + + writer.NewBufferedRowGroup(); + + using var batch2 = GetBatch(1, 100); + writer.WriteBufferedRecordBatch(batch2); + using var batch3 = GetBatch(1, 100); + writer.WriteBufferedRecordBatch(batch3); + + writer.NewBufferedRowGroup(); + + using var batch4 = GetBatch(2, 300); + writer.WriteBufferedRecordBatch(batch4); + + writer.Close(); + } + + using var inStream = new BufferReader(buffer); + using var fileReader = new FileReader(inStream); + Assert.That(fileReader.NumRowGroups, Is.EqualTo(4)); + + var expectedSizes = new[] {200, 200, 250, 50}; + var expectedXValues = new[] {0, 1, 2, 2}; + + for (var rowGroupIdx = 0; rowGroupIdx < fileReader.NumRowGroups; ++rowGroupIdx) + { + using var batchReader = fileReader.GetRecordBatchReader(rowGroups: new[] {rowGroupIdx}); + using var batch = await batchReader.ReadNextRecordBatchAsync(); + Assert.That(batch, Is.Not.Null); + Assert.That(batch.Length, Is.EqualTo(expectedSizes[rowGroupIdx])); + var xVals = batch.Column("x") as Int32Array; + Assert.That(xVals, Is.Not.Null); + for (var i = 0; i < xVals!.Length; ++i) + { + Assert.That(xVals.GetValue(i), Is.EqualTo(expectedXValues[rowGroupIdx])); + } + } + } + [Test] public async Task TestWriteRowGroupColumns() { diff --git a/csharp/Arrow/FileWriter.cs b/csharp/Arrow/FileWriter.cs index ca1d617a..5923fb07 100644 --- a/csharp/Arrow/FileWriter.cs +++ b/csharp/Arrow/FileWriter.cs @@ -15,9 +15,13 @@ namespace ParquetSharp.Arrow /// /// Writes Parquet files using Arrow format data /// - /// This may be used to write whole tables or record batches, + /// This may be used to write whole tables or record batches at once, /// using the WriteTable or WriteRecordBatch methods. /// + /// You can also buffer writes of record batches to allow writing multiple + /// record batches within a Parquet row group, using WriteBufferedRecordBatch + /// and NewBufferedRowGroup to start a new row group. + /// /// For more control over writing, you can create a new row group with NewRowGroup, /// then write all columns for the row group with the WriteColumn method. /// All required columns must be written before starting the next row group @@ -146,8 +150,8 @@ public unsafe Apache.Arrow.Schema Schema /// /// Write an Arrow table to Parquet /// - /// The table data will be chunked into row groups that respect the maximum - /// chunk size specified if required. + /// A new row group will be started, and the table data will be chunked into + /// row groups that respect the maximum chunk size specified if required. /// This method requires that the columns in the table use equal chunking. /// /// The table to write @@ -161,8 +165,8 @@ public void WriteTable(Table table, long chunkSize = 1024 * 1024) /// /// Write a record batch to Parquet /// - /// The data will be chunked into row groups that respect the maximum - /// chunk size specified if required. + /// A new row group will be started, and the record batch data will be chunked + /// into row groups that respect the maximum chunk size specified if required. /// /// The record batch to write /// The maximum length of row groups to write @@ -172,6 +176,31 @@ public void WriteRecordBatch(RecordBatch recordBatch, long chunkSize = 1024 * 10 WriteRecordBatchStream(arrayStream, chunkSize); } + /// + /// Write a record batch to Parquet in buffered mode, allowing + /// multiple record batches to be written to the same row group. + /// + /// New row groups are started if the data reaches the MaxRowGroupLength configured + /// in the WriterProperties. + /// + /// The record batch to write + public void WriteBufferedRecordBatch(RecordBatch recordBatch) + { + var arrayStream = new RecordBatchStream(recordBatch.Schema, new[] {recordBatch}); + WriteBufferedRecordBatches(arrayStream); + } + + /// + /// Flush buffered data and start a new row group. + /// This can be used to force creation of a new row group when writing data + /// with WriteBufferedRecordBatch. + /// + public void NewBufferedRowGroup() + { + ExceptionInfo.Check(FileWriter_NewBufferedRowGroup(_handle.IntPtr)); + GC.KeepAlive(_handle); + } + /// /// Start writing a new row group to the file. After calling this method, /// each column required in the schema must be written using WriteColumn @@ -265,6 +294,18 @@ private unsafe void WriteRecordBatchStream(IArrowArrayStream arrayStream, long c GC.KeepAlive(_handle); } + /// + /// Write record batches in buffered mode + /// + private unsafe void WriteBufferedRecordBatches(IArrowArrayStream arrayStream) + { + var cArrayStream = new CArrowArrayStream(); + CArrowArrayStreamExporter.ExportArrayStream(arrayStream, &cArrayStream); + ExceptionInfo.Check(FileWriter_WriteRecordBatches(_handle.IntPtr, &cArrayStream)); + GC.KeepAlive(cArrayStream); + GC.KeepAlive(_handle); + } + [DllImport(ParquetDll.Name)] private static extern unsafe IntPtr FileWriter_OpenPath( [MarshalAs(UnmanagedType.LPUTF8Str)] string path, CArrowSchema* schema, IntPtr properties, IntPtr arrowProperties, out IntPtr writer); @@ -279,9 +320,15 @@ private static extern unsafe IntPtr FileWriter_OpenStream( [DllImport(ParquetDll.Name)] private static extern unsafe IntPtr FileWriter_WriteTable(IntPtr writer, CArrowArrayStream* stream, long chunkSize); + [DllImport(ParquetDll.Name)] + private static extern unsafe IntPtr FileWriter_WriteRecordBatches(IntPtr writer, CArrowArrayStream* stream); + [DllImport(ParquetDll.Name)] private static extern IntPtr FileWriter_NewRowGroup(IntPtr writer, long chunkSize); + [DllImport(ParquetDll.Name)] + private static extern IntPtr FileWriter_NewBufferedRowGroup(IntPtr writer); + [DllImport(ParquetDll.Name)] private static extern unsafe IntPtr FileWriter_WriteColumnChunk(IntPtr writer, CArrowArray* array, CArrowSchema* arrayType); diff --git a/csharp/Encryption/CryptoFactory.cs b/csharp/Encryption/CryptoFactory.cs index 784bb392..981d79b7 100644 --- a/csharp/Encryption/CryptoFactory.cs +++ b/csharp/Encryption/CryptoFactory.cs @@ -35,7 +35,7 @@ public unsafe CryptoFactory(KmsClientFactory kmsClientFactory) /// /// The KMS connection configuration to use /// The encryption configuration to use - /// The path to the Parquet file being written + /// The path to the Parquet file being written. Can be null if internal key material is used. /// Encryption properties for the file public FileEncryptionProperties GetFileEncryptionProperties( KmsConnectionConfig connectionConfig, @@ -58,7 +58,7 @@ public FileEncryptionProperties GetFileEncryptionProperties( /// /// The KMS connection configuration to use /// The decryption configuration to use - /// The path to the Parquet file being read + /// The path to the Parquet file being read. Can be null if internal key material is used. /// Decryption properties for the file public FileDecryptionProperties GetFileDecryptionProperties( KmsConnectionConfig connectionConfig, diff --git a/csharp/ParquetSharp.csproj b/csharp/ParquetSharp.csproj index 09b73680..f22af932 100644 --- a/csharp/ParquetSharp.csproj +++ b/csharp/ParquetSharp.csproj @@ -12,7 +12,7 @@ true true 1591; - 15.0.0-beta2 + 15.0.0-beta3 G-Research G-Research ParquetSharp diff --git a/docs/Arrow.md b/docs/Arrow.md index 65095885..1da0c8b6 100644 --- a/docs/Arrow.md +++ b/docs/Arrow.md @@ -164,6 +164,19 @@ if it contains more rows than the chunk size, which can be specified when writin writer.WriteRecordBatch(recordBatch, chunkSize: 1024); ``` +Calling `WriteRecordBatch` always starts a new row group, but since ParquetSharp 15.0.0, +you can also write buffered record batches, +so that multiple batches may be written to the same row group: + +```csharp +writer.WriteBufferedRecordBatch(recordBatch); +``` + +When using `WriteBufferedRecordBatch`, data will be flushed when the `FileWriter` +is closed or `NewBufferedRowGroup` is called to start a new row group. +A new row group will also be started if the row group size reaches the `MaxRowGroupLength` +value configured in the `WriterProperties`. + ### Writing data one column at a time Rather than writing record batches, you may also explicitly start Parquet row groups