Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45263: [MATLAB] Add ability to construct RecordBatchStreamReader from uint8 array #45274

Merged
merged 10 commits into from
Jan 17, 2025
2 changes: 2 additions & 0 deletions matlab/src/cpp/arrow/matlab/error/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED =
static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed";
static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
"arrow:io:ipc:FailedToOpenRecordBatchReader";
static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE =
"arrow:io:ipc:InvalidConstructionType";
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/matlab/buffer/matlab_buffer.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/schema.h"
Expand All @@ -36,14 +38,13 @@ RecordBatchStreamReader::RecordBatchStreamReader(
REGISTER_METHOD(RecordBatchStreamReader, readTable);
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
using RecordBatchStreamReaderProxy =
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;

const mda::StructArray opts = constructor_arguments[0];

const mda::StringArray filename_mda = opts[0]["Filename"];
const auto filename_utf16 = std::u16string(filename_mda[0]);
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
Expand All @@ -60,6 +61,43 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
using RecordBatchStreamReaderProxy =
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;

const mda::StructArray opts = constructor_arguments[0];
const ::matlab::data::TypedArray<uint8_t> bytes_mda = opts[0]["Bytes"];
const auto matlab_buffer =
std::make_shared<arrow::matlab::buffer::MatlabBuffer>(bytes_mda);
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(matlab_buffer);
MATLAB_ASSIGN_OR_ERROR(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(buffer_reader),
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
const mda::StructArray opts = constructor_arguments[0];

// Dispatch to the appropriate static "make" method depending
// on the input type.
const mda::StringArray type_mda = opts[0]["Type"];
const auto type_utf16 = std::u16string(type_mda[0]);
if (type_utf16 == u"Bytes") {
return RecordBatchStreamReader::fromBytes(constructor_arguments);
} else if (type_utf16 == u"File") {
return RecordBatchStreamReader::fromFile(constructor_arguments);
} else {
return libmexclass::error::Error{
"arrow:io:ipc:InvalidConstructionType",
"Invalid construction type for RecordBatchStreamReader."};
}
}

void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy {

static libmexclass::proxy::MakeResult make(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromFile(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromBytes(
const libmexclass::proxy::FunctionArguments& constructor_arguments);

protected:
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;
Expand Down
28 changes: 24 additions & 4 deletions matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,34 @@
Schema
end

methods
function obj = RecordBatchStreamReader(filename)
kevingurney marked this conversation as resolved.
Show resolved Hide resolved
methods (Static)
function obj = fromBytes(bytes)
arguments
bytes(:, 1) uint8
end
args = struct(Bytes=bytes, Type="Bytes");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
proxy = arrow.internal.proxy.create(proxyName, args);
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
end

function obj = fromFile(filename)
arguments
filename(1, 1) string {mustBeNonzeroLengthText}
end
args = struct(Filename=filename);
args = struct(Filename=filename, Type="File");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
proxy = arrow.internal.proxy.create(proxyName, args);
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
end
end

methods
function obj = RecordBatchStreamReader(proxy)
arguments
proxy(1, 1) libmexclass.proxy.Proxy
end
obj.Proxy = proxy;
end

function schema = get.Schema(obj)
Expand Down
Loading
Loading