Skip to content

Commit

Permalink
Use parallel memcopy from arrow (#633)
Browse files Browse the repository at this point in the history
* use parallel memcopy from arrow

* fix linting

* remove memory.h
  • Loading branch information
pcmoritz authored and robertnishihara committed Jun 3, 2017
1 parent 2694337 commit 0254efa
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 195 deletions.
187 changes: 0 additions & 187 deletions src/numbuf/python/src/pynumbuf/memory.h

This file was deleted.

17 changes: 10 additions & 7 deletions src/numbuf/python/src/pynumbuf/numbuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ PyObject* NumbufPlasmaObjectExistsError;
#endif

#include <arrow/api.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/ipc/util.h>
#include <arrow/ipc/writer.h>
#include <arrow/python/numpy_convert.h>

#include "adapters/python.h"
#include "memory.h"

using namespace arrow;
using namespace numbuf;
Expand Down Expand Up @@ -81,7 +82,7 @@ Status read_batch_and_tensors(uint8_t* data, int64_t size,
std::vector<std::shared_ptr<Tensor>>& tensors_out) {
std::shared_ptr<arrow::ipc::FileReader> reader;
int64_t batch_size = *((int64_t*)data);
auto source = std::make_shared<FixedBufferStream>(
auto source = std::make_shared<arrow::io::BufferReader>(
LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE);
RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, batch_size, &reader));
RETURN_NOT_OK(reader->GetRecordBatch(0, batch_out));
Expand Down Expand Up @@ -151,7 +152,7 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
object->batch = make_batch(array);

int64_t data_size, total_size;
auto mock = std::make_shared<MockBufferStream>();
auto mock = std::make_shared<arrow::ipc::MockOutputStream>();
write_batch_and_tensors(
mock.get(), object->batch, object->arrays, &data_size, &total_size);

Expand All @@ -173,9 +174,10 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
}
if (!PyMemoryView_Check(memoryview)) { return NULL; }
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto target = std::make_shared<FixedBufferStream>(
auto buf = std::make_shared<arrow::MutableBuffer>(
LENGTH_PREFIX_SIZE + reinterpret_cast<uint8_t*>(buffer->buf),
buffer->len - LENGTH_PREFIX_SIZE);
auto target = std::make_shared<arrow::io::FixedSizeBufferWriter>(buf);
int64_t batch_size, total_size;
ARROW_CHECK_OK(write_batch_and_tensors(
target.get(), object->batch, object->arrays, &batch_size, &total_size));
Expand Down Expand Up @@ -297,7 +299,7 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
std::shared_ptr<RecordBatch> batch = make_batch(array);

int64_t data_size, total_size;
auto mock = std::make_shared<MockBufferStream>();
auto mock = std::make_shared<arrow::ipc::MockOutputStream>();
write_batch_and_tensors(mock.get(), batch, tensors, &data_size, &total_size);

uint8_t* data;
Expand All @@ -321,8 +323,9 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
}
ARROW_CHECK_OK(s);

auto target =
std::make_shared<FixedBufferStream>(LENGTH_PREFIX_SIZE + data, total_size);
auto buf =
std::make_shared<arrow::MutableBuffer>(LENGTH_PREFIX_SIZE + data, total_size);
auto target = std::make_shared<arrow::io::FixedSizeBufferWriter>(buf);
write_batch_and_tensors(target.get(), batch, tensors, &data_size, &total_size);
*((int64_t*)data) = data_size;

Expand Down
2 changes: 1 addition & 1 deletion src/numbuf/thirdparty/download_thirdparty.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ fi
cd $TP_DIR/arrow
git pull origin master

git checkout 670612e6fdf699486641ed0d39d22257eb8acdb2
git checkout 8a700ccdad745c250fe5d91a9104e7c2d6364c1b

0 comments on commit 0254efa

Please sign in to comment.