From 2b5a80245d336e17ff149eca33e666fae71c04bf Mon Sep 17 00:00:00 2001 From: Sarah Gilmore <74676073+sgilmore10@users.noreply.github.com> Date: Wed, 4 Dec 2024 11:08:57 -0500 Subject: [PATCH] GH-44922: [MATLAB] Add IPC `RecordBatchStreamFileWriter` MATLAB class (#44925) ### Rationale for this change To enable support for the IPC Streaming format in the MATLAB interface, we should add a `RecordBatchStreamWriter` class. ### What changes are included in this PR? Added `arrow.io.ipc.RecordBatchStreamWriter` class. **Example Usage:** ```matlab >> city = ["Boston" "Seattle" "Denver" "Juno" "Anchorage" "Chicago"]'; >> daylength = duration(["15:17:01" "15:59:16" "14:59:14" "19:21:23" "14:18:24" "15:13:39"])'; >> matlabTable = table(city, daylength, VariableNames=["City", "DayLength"]); >> recordBatch1 = arrow.recordBatch(matlabTable(1:4, :)) >> recordBatch2 = arrow.recordBatch(matlabTable(5:end, :)); >> writer = arrow.io.ipc.RecordBatchStreamWriter("daylight.arrow", recordBatch1.Schema); >> writer.writeRecordBatch(recordBatch1); >> writer.writeRecordBatch(recordBatch2); >> writer.close(); ``` ### Are these changes tested? Yes. I Parameterized the test cases in `test/arrow/io/ipc/tRecordBatchWriter.m` to test the behavior of both `arrow.io.ipc.RecordBatchFileWriter` AND `arrow.io.ipc.RecordBatchStreamWriter`. ### Are there any user-facing changes? Yes. Users can now use `arrow.io.ipc.RecordBatchStreamWriter` to serialize `RecordBatch`es/`Table`s to the Arrow IPC Streaming format. ### Future Directions 1. #44923 * GitHub Issue: #44922 Lead-authored-by: Sarah Gilmore Co-authored-by: Sarah Gilmore <74676073+sgilmore10@users.noreply.github.com> Co-authored-by: Kevin Gurney Co-authored-by: Sutou Kouhei Signed-off-by: Sarah Gilmore --- .../io/ipc/proxy/record_batch_file_writer.cc | 49 +----------- .../io/ipc/proxy/record_batch_file_writer.h | 17 ++-- .../ipc/proxy/record_batch_stream_writer.cc | 64 +++++++++++++++ .../io/ipc/proxy/record_batch_stream_writer.h | 35 ++++++++ .../io/ipc/proxy/record_batch_writer.cc | 75 +++++++++++++++++ .../matlab/io/ipc/proxy/record_batch_writer.h | 41 ++++++++++ matlab/src/cpp/arrow/matlab/proxy/factory.cc | 3 + .../+arrow/+io/+ipc/RecordBatchFileWriter.m | 56 ++----------- .../+arrow/+io/+ipc/RecordBatchStreamWriter.m | 34 ++++++++ .../+arrow/+io/+ipc/RecordBatchWriter.m | 74 +++++++++++++++++ ...BatchFileWriter.m => tRecordBatchWriter.m} | 80 +++++++++++-------- .../cmake/BuildMatlabArrowInterface.cmake | 4 +- 12 files changed, 387 insertions(+), 145 deletions(-) create mode 100644 matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc create mode 100644 matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h create mode 100644 matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc create mode 100644 matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.h create mode 100644 matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamWriter.m create mode 100644 matlab/src/matlab/+arrow/+io/+ipc/RecordBatchWriter.m rename matlab/test/arrow/io/ipc/{tRecordBatchFileWriter.m => tRecordBatchWriter.m} (77%) diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc index ed1052e0a8076..69ba734bd0ef9 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc @@ -18,9 +18,7 @@ #include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h" #include "arrow/io/file.h" #include "arrow/matlab/error/error.h" -#include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" -#include "arrow/matlab/tabular/proxy/table.h" #include "arrow/util/utf8.h" #include "libmexclass/proxy/ProxyManager.h" @@ -29,11 +27,7 @@ namespace arrow::matlab::io::ipc::proxy { RecordBatchFileWriter::RecordBatchFileWriter( const std::shared_ptr writer) - : writer{std::move(writer)} { - REGISTER_METHOD(RecordBatchFileWriter, close); - REGISTER_METHOD(RecordBatchFileWriter, writeRecordBatch); - REGISTER_METHOD(RecordBatchFileWriter, writeTable); -} + : RecordBatchWriter(std::move(writer)) {} libmexclass::proxy::MakeResult RecordBatchFileWriter::make( const libmexclass::proxy::FunctionArguments& constructor_arguments) { @@ -65,43 +59,4 @@ libmexclass::proxy::MakeResult RecordBatchFileWriter::make( return std::make_shared(std::move(writer)); } -void RecordBatchFileWriter::writeRecordBatch( - libmexclass::proxy::method::Context& context) { - namespace mda = ::matlab::data; - using RecordBatchProxy = ::arrow::matlab::tabular::proxy::RecordBatch; - - mda::StructArray opts = context.inputs[0]; - const mda::TypedArray record_batch_proxy_id_mda = - opts[0]["RecordBatchProxyID"]; - const uint64_t record_batch_proxy_id = record_batch_proxy_id_mda[0]; - - auto proxy = libmexclass::proxy::ProxyManager::getProxy(record_batch_proxy_id); - auto record_batch_proxy = std::static_pointer_cast(proxy); - auto record_batch = record_batch_proxy->unwrap(); - - MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteRecordBatch(*record_batch), context, - error::IPC_RECORD_BATCH_WRITE_FAILED); -} - -void RecordBatchFileWriter::writeTable(libmexclass::proxy::method::Context& context) { - namespace mda = ::matlab::data; - using TableProxy = ::arrow::matlab::tabular::proxy::Table; - - mda::StructArray opts = context.inputs[0]; - const mda::TypedArray table_proxy_id_mda = opts[0]["TableProxyID"]; - const uint64_t table_proxy_id = table_proxy_id_mda[0]; - - auto proxy = libmexclass::proxy::ProxyManager::getProxy(table_proxy_id); - auto table_proxy = std::static_pointer_cast(proxy); - auto table = table_proxy->unwrap(); - - MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteTable(*table), context, - error::IPC_RECORD_BATCH_WRITE_FAILED); -} - -void RecordBatchFileWriter::close(libmexclass::proxy::method::Context& context) { - MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->Close(), context, - error::IPC_RECORD_BATCH_WRITE_CLOSE_FAILED); -} - -} // namespace arrow::matlab::io::ipc::proxy \ No newline at end of file +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.h index bfd83504f190a..ac76afaf23957 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.h +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.h @@ -16,27 +16,20 @@ // under the License. #include "arrow/ipc/writer.h" +#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h" + #include "libmexclass/proxy/Proxy.h" namespace arrow::matlab::io::ipc::proxy { -class RecordBatchFileWriter : public libmexclass::proxy::Proxy { +class RecordBatchFileWriter : public RecordBatchWriter { public: RecordBatchFileWriter(std::shared_ptr writer); - ~RecordBatchFileWriter() = default; + virtual ~RecordBatchFileWriter() = default; static libmexclass::proxy::MakeResult make( const libmexclass::proxy::FunctionArguments& constructor_arguments); - - protected: - std::shared_ptr writer; - - void writeRecordBatch(libmexclass::proxy::method::Context& context); - - void writeTable(libmexclass::proxy::method::Context& context); - - void close(libmexclass::proxy::method::Context& context); }; -} // namespace arrow::matlab::io::ipc::proxy \ No newline at end of file +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc new file mode 100644 index 0000000000000..4640a54819b83 --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h" +#include "arrow/io/file.h" +#include "arrow/ipc/writer.h" +#include "arrow/matlab/error/error.h" +#include "arrow/matlab/tabular/proxy/schema.h" +#include "arrow/util/utf8.h" + +#include "libmexclass/proxy/ProxyManager.h" + +namespace arrow::matlab::io::ipc::proxy { + +RecordBatchStreamWriter::RecordBatchStreamWriter( + const std::shared_ptr writer) + : RecordBatchWriter(std::move(writer)) {} + +libmexclass::proxy::MakeResult RecordBatchStreamWriter::make( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamWriterProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter; + using SchemaProxy = arrow::matlab::tabular::proxy::Schema; + + const mda::StructArray opts = constructor_arguments[0]; + + const mda::StringArray filename_mda = opts[0]["Filename"]; + const auto filename_utf16 = std::u16string(filename_mda[0]); + MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, + arrow::util::UTF16StringToUTF8(filename_utf16), + error::UNICODE_CONVERSION_ERROR_ID); + + const mda::TypedArray arrow_schema_proxy_id_mda = opts[0]["SchemaProxyID"]; + auto proxy = libmexclass::proxy::ProxyManager::getProxy(arrow_schema_proxy_id_mda[0]); + auto arrow_schema_proxy = std::static_pointer_cast(proxy); + auto arrow_schema = arrow_schema_proxy->unwrap(); + + MATLAB_ASSIGN_OR_ERROR(auto output_stream, + arrow::io::FileOutputStream::Open(filename_utf8), + error::FAILED_TO_OPEN_FILE_FOR_WRITE); + + MATLAB_ASSIGN_OR_ERROR(auto writer, + arrow::ipc::MakeStreamWriter(output_stream, arrow_schema), + "arrow:matlab:MakeFailed"); + + return std::make_shared(std::move(writer)); +} + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h new file mode 100644 index 0000000000000..484d1aa252c57 --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/writer.h" +#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h" + +#include "libmexclass/proxy/Proxy.h" + +namespace arrow::matlab::io::ipc::proxy { + +class RecordBatchStreamWriter : public RecordBatchWriter { + public: + RecordBatchStreamWriter(std::shared_ptr writer); + + virtual ~RecordBatchStreamWriter() = default; + + static libmexclass::proxy::MakeResult make( + const libmexclass::proxy::FunctionArguments& constructor_arguments); +}; + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc new file mode 100644 index 0000000000000..beffcca0245f0 --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h" +#include "arrow/io/file.h" +#include "arrow/matlab/error/error.h" +#include "arrow/matlab/tabular/proxy/record_batch.h" +#include "arrow/matlab/tabular/proxy/schema.h" +#include "arrow/matlab/tabular/proxy/table.h" + +#include "libmexclass/proxy/ProxyManager.h" + +namespace arrow::matlab::io::ipc::proxy { + +RecordBatchWriter::RecordBatchWriter( + const std::shared_ptr writer) + : writer{std::move(writer)} { + REGISTER_METHOD(RecordBatchWriter, close); + REGISTER_METHOD(RecordBatchWriter, writeRecordBatch); + REGISTER_METHOD(RecordBatchWriter, writeTable); +} + +void RecordBatchWriter::writeRecordBatch(libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + using RecordBatchProxy = ::arrow::matlab::tabular::proxy::RecordBatch; + + mda::StructArray opts = context.inputs[0]; + const mda::TypedArray record_batch_proxy_id_mda = + opts[0]["RecordBatchProxyID"]; + const uint64_t record_batch_proxy_id = record_batch_proxy_id_mda[0]; + + auto proxy = libmexclass::proxy::ProxyManager::getProxy(record_batch_proxy_id); + auto record_batch_proxy = std::static_pointer_cast(proxy); + auto record_batch = record_batch_proxy->unwrap(); + + MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteRecordBatch(*record_batch), context, + error::IPC_RECORD_BATCH_WRITE_FAILED); +} + +void RecordBatchWriter::writeTable(libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + using TableProxy = ::arrow::matlab::tabular::proxy::Table; + + mda::StructArray opts = context.inputs[0]; + const mda::TypedArray table_proxy_id_mda = opts[0]["TableProxyID"]; + const uint64_t table_proxy_id = table_proxy_id_mda[0]; + + auto proxy = libmexclass::proxy::ProxyManager::getProxy(table_proxy_id); + auto table_proxy = std::static_pointer_cast(proxy); + auto table = table_proxy->unwrap(); + + MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteTable(*table), context, + error::IPC_RECORD_BATCH_WRITE_FAILED); +} + +void RecordBatchWriter::close(libmexclass::proxy::method::Context& context) { + MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->Close(), context, + error::IPC_RECORD_BATCH_WRITE_CLOSE_FAILED); +} + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.h new file mode 100644 index 0000000000000..885a0cbf207fe --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/ipc/writer.h" +#include "libmexclass/proxy/Proxy.h" + +namespace arrow::matlab::io::ipc::proxy { + +class RecordBatchWriter : public libmexclass::proxy::Proxy { + public: + RecordBatchWriter(std::shared_ptr writer); + + virtual ~RecordBatchWriter() = default; + + protected: + std::shared_ptr writer; + + void writeRecordBatch(libmexclass::proxy::method::Context& context); + + void writeTable(libmexclass::proxy::method::Context& context); + + void close(libmexclass::proxy::method::Context& context); +}; + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/proxy/factory.cc b/matlab/src/cpp/arrow/matlab/proxy/factory.cc index 8326b4371917a..a08a7495c00c9 100644 --- a/matlab/src/cpp/arrow/matlab/proxy/factory.cc +++ b/matlab/src/cpp/arrow/matlab/proxy/factory.cc @@ -36,6 +36,7 @@ #include "arrow/matlab/io/feather/proxy/writer.h" #include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h" #include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h" +#include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" #include "arrow/matlab/tabular/proxy/table.h" @@ -111,6 +112,8 @@ libmexclass::proxy::MakeResult Factory::make_proxy( REGISTER_PROXY(arrow.c.proxy.RecordBatchImporter , arrow::matlab::c::proxy::RecordBatchImporter); REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader , arrow::matlab::io::ipc::proxy::RecordBatchFileReader); REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter , arrow::matlab::io::ipc::proxy::RecordBatchFileWriter); + REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamWriter , arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter); + // clang-format on return libmexclass::error::Error{error::UNKNOWN_PROXY_ERROR_ID, diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchFileWriter.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchFileWriter.m index aee4acf5c16e6..ee1298c23706f 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchFileWriter.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchFileWriter.m @@ -1,5 +1,5 @@ -%RECORDBATCHFILEWRITER Class for serializing record batches to a file using -% the IPC format. +%RECORDBATCHFILEWRITER Class for serializing record batches to the Arrow IPC File +% format. % Licensed to the Apache Software Foundation (ASF) under one or more % contributor license agreements. See the NOTICE file distributed with @@ -16,11 +16,7 @@ % implied. See the License for the specific language governing % permissions and limitations under the License. -classdef RecordBatchFileWriter < matlab.mixin.Scalar - - properties(SetAccess=private, GetAccess=public, Hidden) - Proxy - end +classdef RecordBatchFileWriter < arrow.io.ipc.RecordBatchWriter methods function obj = RecordBatchFileWriter(filename, schema) @@ -30,48 +26,8 @@ end args = struct(Filename=filename, SchemaProxyID=schema.Proxy.ID); proxyName = "arrow.io.ipc.proxy.RecordBatchFileWriter"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); - end - - function writeRecordBatch(obj, recordBatch) - arguments - obj(1, 1) arrow.io.ipc.RecordBatchFileWriter - recordBatch(1, 1) arrow.tabular.RecordBatch - end - - args = struct(RecordBatchProxyID=recordBatch.Proxy.ID); - obj.Proxy.writeRecordBatch(args); - end - - function writeTable(obj, arrowTable) - arguments - obj(1, 1) arrow.io.ipc.RecordBatchFileWriter - arrowTable(1, 1) arrow.tabular.Table - end - - args = struct(TableProxyID=arrowTable.Proxy.ID); - obj.Proxy.writeTable(args); - end - - function write(obj, tabularObj) - arguments - obj(1, 1) arrow.io.ipc.RecordBatchFileWriter - tabularObj(1, 1) - end - if isa(tabularObj, "arrow.tabular.RecordBatch") - obj.writeRecordBatch(tabularObj); - elseif isa(tabularObj, "arrow.tabular.Table") - obj.writeTable(tabularObj); - else - id = "arrow:matlab:ipc:write:InvalidType"; - msg = "tabularObj input argument must be an instance of " + ... - "either arrow.tabular.RecordBatch or arrow.tabular.Table."; - error(id, msg); - end - end - - function close(obj) - obj.Proxy.close(); + proxy = arrow.internal.proxy.create(proxyName, args); + obj@arrow.io.ipc.RecordBatchWriter(proxy); end end -end +end \ No newline at end of file diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamWriter.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamWriter.m new file mode 100644 index 0000000000000..17fe7184a8df8 --- /dev/null +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamWriter.m @@ -0,0 +1,34 @@ +%RECORDBATCHSTREAMWRITER Class for serializing record batches to the Arrow +% IPC Streaming format. + +% Licensed to the Apache Software Foundation (ASF) under one or more +% contributor license agreements. See the NOTICE file distributed with +% this work for additional information regarding copyright ownership. +% The ASF licenses this file to you under the Apache License, Version +% 2.0 (the "License"); you may not use this file except in compliance +% with the License. You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +% implied. See the License for the specific language governing +% permissions and limitations under the License. + +classdef RecordBatchStreamWriter < arrow.io.ipc.RecordBatchWriter + + methods + function obj = RecordBatchStreamWriter(filename, schema) + arguments + filename(1, 1) string {mustBeNonzeroLengthText} + schema(1, 1) arrow.tabular.Schema + end + args = struct(Filename=filename, SchemaProxyID=schema.Proxy.ID); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamWriter"; + proxy = arrow.internal.proxy.create(proxyName, args); + obj@arrow.io.ipc.RecordBatchWriter(proxy); + end + end +end + diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchWriter.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchWriter.m new file mode 100644 index 0000000000000..a662392cc6f47 --- /dev/null +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchWriter.m @@ -0,0 +1,74 @@ +%RECORDBATCHWRITER Class for serializing record batches to the Arrow +% IPC format. + +% Licensed to the Apache Software Foundation (ASF) under one or more +% contributor license agreements. See the NOTICE file distributed with +% this work for additional information regarding copyright ownership. +% The ASF licenses this file to you under the Apache License, Version +% 2.0 (the "License"); you may not use this file except in compliance +% with the License. You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +% implied. See the License for the specific language governing +% permissions and limitations under the License. + +classdef (Abstract) RecordBatchWriter < matlab.mixin.Scalar + + properties(SetAccess=private, GetAccess=public, Hidden) + Proxy + end + + methods + function obj = RecordBatchWriter(proxy) + arguments + proxy(1, 1) libmexclass.proxy.Proxy + end + obj.Proxy = proxy; + end + + function writeRecordBatch(obj, recordBatch) + arguments + obj(1, 1) arrow.io.ipc.RecordBatchWriter + recordBatch(1, 1) arrow.tabular.RecordBatch + end + + args = struct(RecordBatchProxyID=recordBatch.Proxy.ID); + obj.Proxy.writeRecordBatch(args); + end + + function writeTable(obj, arrowTable) + arguments + obj(1, 1) arrow.io.ipc.RecordBatchWriter + arrowTable(1, 1) arrow.tabular.Table + end + + args = struct(TableProxyID=arrowTable.Proxy.ID); + obj.Proxy.writeTable(args); + end + + function write(obj, tabularObj) + arguments + obj(1, 1) arrow.io.ipc.RecordBatchWriter + tabularObj(1, 1) + end + if isa(tabularObj, "arrow.tabular.RecordBatch") + obj.writeRecordBatch(tabularObj); + elseif isa(tabularObj, "arrow.tabular.Table") + obj.writeTable(tabularObj); + else + id = "arrow:io:ipc:write:InvalidType"; + msg = "Input must be an instance of " + ... + "either arrow.tabular.RecordBatch or arrow.tabular.Table."; + error(id, msg); + end + end + + function close(obj) + obj.Proxy.close(); + end + end +end \ No newline at end of file diff --git a/matlab/test/arrow/io/ipc/tRecordBatchFileWriter.m b/matlab/test/arrow/io/ipc/tRecordBatchWriter.m similarity index 77% rename from matlab/test/arrow/io/ipc/tRecordBatchFileWriter.m rename to matlab/test/arrow/io/ipc/tRecordBatchWriter.m index 25bbf4474edd4..55802e31f885d 100644 --- a/matlab/test/arrow/io/ipc/tRecordBatchFileWriter.m +++ b/matlab/test/arrow/io/ipc/tRecordBatchWriter.m @@ -1,4 +1,5 @@ -%TRECORDBATCHFILEWRITER Unit tests for arrow.io.ipc.RecordBatchFileWriter. +%TRECORDBATCHWRITER Unit tests for arrow.io.ipc.RecordBatchFileWriter +% and arrow.io.ipc.RecordBatchStreamWriter. % Licensed to the Apache Software Foundation (ASF) under one or more % contributor license agreements. See the NOTICE file distributed with @@ -15,7 +16,16 @@ % implied. See the License for the specific language governing % permissions and limitations under the License. -classdef tRecordBatchFileWriter < matlab.unittest.TestCase +classdef tRecordBatchWriter < matlab.unittest.TestCase + + properties(TestParameter) + WriterConstructor = struct(... + RecordBatchFileWriter=@arrow.io.ipc.RecordBatchFileWriter,... + RecordBatchStreamWriter=@arrow.io.ipc.RecordBatchStreamWriter... + ); + end + + methods function folder = setupTemporaryFolder(testCase) @@ -26,45 +36,45 @@ end methods (Test) - function ZeroLengthFilenameError(testCase) - % Verify RecordBatchFileWriter throws an exception with the + function ZeroLengthFilenameError(testCase, WriterConstructor) + % Verify RecordBatchWriter throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a zero length string. schema = arrow.schema(arrow.field("A", arrow.float64())); - fcn = @() arrow.io.ipc.RecordBatchFileWriter("", schema); + fcn = @() WriterConstructor("", schema); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end - function MissingStringFilenameError(testCase) - % Verify RecordBatchFileWriter throws an exception with the + function MissingStringFilenameError(testCase, WriterConstructor) + % Verify RecordBatchWriter throws an exception with the % identifier MATLAB:validators:mustBeNonzeroLengthText if the % filename input argument given is a missing string. schema = arrow.schema(arrow.field("A", arrow.float64())); - fcn = @() arrow.io.ipc.RecordBatchFileWriter(string(missing), schema); + fcn = @() WriterConstructor(string(missing), schema); testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); end - function FilenameInvalidTypeError(testCase) - % Verify RecordBatchFileWriter throws an exception with the + function FilenameInvalidTypeError(testCase, WriterConstructor) + % Verify RecordBatchWriter throws an exception with the % identifier MATLAB:validators:UnableToConvert if the filename % input argument is neither a scalar string nor a char vector. schema = arrow.schema(arrow.field("A", arrow.float64())); - fcn = @() arrow.io.ipc.RecordBatchFileWriter(table, schema); + fcn = @() WriterConstructor(table, schema); testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function InvalidSchemaType(testCase) - % Verify RecordBatchFileWriter throws an exception with the + function InvalidSchemaType(testCase, WriterConstructor) + % Verify RecordBatchWriter throws an exception with the % identifier MATLAB:validators:UnableToConvert if the schema % input argument is not an arrow.tabular.Schema instance. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.field("A", arrow.float64()); - fcn = @() arrow.io.ipc.RecordBatchFileWriter(fname, schema); + fcn = @() WriterConstructor(fname, schema); testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function writeRecordBatchInvalidType(testCase) + function writeRecordBatchInvalidType(testCase, WriterConstructor) % Verify writeRecordBatch throws an exception with the % identifier MATLAB:validators:UnableToConvert if the % recordBatch input argument given is not an @@ -72,26 +82,26 @@ function writeRecordBatchInvalidType(testCase) folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowTable = arrow.table(table([1 2 3 4]', VariableNames="A")); fcn = @() writer.writeRecordBatch(arrowTable); testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function writeTableInvalidType(testCase) + function writeTableInvalidType(testCase, WriterConstructor) % Verify writeTable throws an exception with the % identifier MATLAB:validators:UnableToConvert if the table % input argument given is not an arrow.tabular.Table instance. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowRecordBatch = arrow.recordBatch(table([1 2 3 4]', VariableNames="A")); fcn = @() writer.writeTable(arrowRecordBatch); testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); end - function writeInvalidType(testCase) + function writeInvalidType(testCase, WriterConstructor) % Verify writeTable throws an exception with the % identifier arrow:matlab:ipc:write:InvalidType if the % tabularObj input argument given is neither an @@ -99,12 +109,12 @@ function writeInvalidType(testCase) folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); fcn = @() writer.write(schema); - testCase.verifyError(fcn, "arrow:matlab:ipc:write:InvalidType"); + testCase.verifyError(fcn, "arrow:io:ipc:write:InvalidType"); end - function writeRecordBatchInvalidSchema(testCase) + function writeRecordBatchInvalidSchema(testCase, WriterConstructor) % Verify writeRecordBatch throws an exception with the % identifier arrow:io:ipc:FailedToWriteRecordBatch if the % schema of the given record batch does match the expected @@ -112,28 +122,28 @@ function writeRecordBatchInvalidSchema(testCase) folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowRecordBatch = arrow.recordBatch(table([1 2 3 4]', VariableNames="B")); fcn = @() writer.writeRecordBatch(arrowRecordBatch); testCase.verifyError(fcn, "arrow:io:ipc:FailedToWriteRecordBatch"); end - function writeTableInvalidSchema(testCase) + function writeTableInvalidSchema(testCase, WriterConstructor) % Verify writeTable throws an exception with the % identifier arrow:io:ipc:FailedToWriteRecordBatch if the % schema of the given table does match the expected schema. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowTable = arrow.table(table([1 2 3 4]', VariableNames="B")); fcn = @() writer.writeTable(arrowTable); testCase.verifyError(fcn, "arrow:io:ipc:FailedToWriteRecordBatch"); end - function writeInvalidSchema(testCase) + function writeInvalidSchema(testCase, WriterConstructor) % Verify write throws an exception with the % identifier arrow:io:ipc:FailedToWriteRecordBatch if the % schema of the given record batch or table does match the @@ -141,7 +151,7 @@ function writeInvalidSchema(testCase) folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowTable = arrow.table(table([1 2 3 4]', VariableNames="B")); fcn = @() writer.write(arrowTable); @@ -152,39 +162,39 @@ function writeInvalidSchema(testCase) testCase.verifyError(fcn, "arrow:io:ipc:FailedToWriteRecordBatch"); end - function writeRecordBatchSmoke(testCase) + function writeRecordBatchSmoke(testCase, WriterConstructor) % Verify writeRecordBatch does not error or issue a warning % if it successfully writes the record batch to the file. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowRecordBatch = arrow.recordBatch(table([1 2 3 4]', VariableNames="A")); fcn = @() writer.writeRecordBatch(arrowRecordBatch); testCase.verifyWarningFree(fcn); end - function writeTableBatchSmoke(testCase) + function writeTableBatchSmoke(testCase, WriterConstructor) % Verify writeTable does not error or issue a warning % if it successfully writes the table to the file. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowTable = arrow.table(table([1 2 3 4]', VariableNames="A")); fcn = @() writer.writeTable(arrowTable); testCase.verifyWarningFree(fcn); end - function writeSmoke(testCase) + function writeSmoke(testCase, WriterConstructor) % Verify write does not error or issue a warning if it % successfully writes the record batch or table to the file. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowRecordBatch = arrow.recordBatch(table([1 2 3 4]', VariableNames="A")); fcn = @() writer.write(arrowRecordBatch); @@ -195,13 +205,13 @@ function writeSmoke(testCase) testCase.verifyWarningFree(fcn); end - function closeSmoke(testCase) + function closeSmoke(testCase, WriterConstructor) % Verify close does not error or issue a warning if it was % successful. folder = testCase.setupTemporaryFolder(); fname = fullfile(folder, "data.arrow"); schema = arrow.schema(arrow.field("A", arrow.float64())); - writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer = WriterConstructor(fname, schema); arrowTable = arrow.table(table([1 2 3 4]', VariableNames="A")); writer.write(arrowTable); fcn = @() writer.close(); diff --git a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake index 8016cbf261b7c..29a737a6ecf25 100644 --- a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake +++ b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake @@ -81,7 +81,9 @@ set(MATLAB_ARROW_LIBMEXCLASS_CLIENT_PROXY_SOURCES "${CMAKE_SOURCE_DIR}/src/cpp/a "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/c/proxy/schema.cc" "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/c/proxy/record_batch_importer.cc" "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_reader.cc" - "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc") + "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc" + "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc" + "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc") set(MATLAB_ARROW_LIBMEXCLASS_CLIENT_PROXY_FACTORY_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/proxy")