Skip to content

Commit

Permalink
apacheGH-44036: [C++] IPC: ipc reader/writer code enhancement (apache…
Browse files Browse the repository at this point in the history
…#44019)

### Rationale for this change

So minor ipc code enhancement when I read the code

### What changes are included in this PR?

Avoid copying shared_ptr in some naive space

### Are these changes tested?

Covered by existence

### Are there any user-facing changes?

no

* GitHub Issue: apache#44036

Authored-by: mwish <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
mapleFU authored Sep 10, 2024
1 parent f3dd298 commit b6316c0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 18 deletions.
12 changes: 6 additions & 6 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class ArrayLoader {
RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
} else {
buffer_index_++;
out_->buffers[1].reset(new Buffer(nullptr, 0));
out_->buffers[1] = std::make_shared<Buffer>(nullptr, 0);
}
return Status::OK();
}
Expand Down Expand Up @@ -644,11 +644,11 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
const std::vector<bool>& inclusion_mask, const IpcReadContext& context,
io::RandomAccessFile* file) {
if (inclusion_mask.size() > 0) {
return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context, file);
} else {
if (inclusion_mask.empty()) {
return LoadRecordBatchSubset(metadata, schema, /*inclusion_mask=*/nullptr, context,
file);
} else {
return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context, file);
}
}

Expand Down Expand Up @@ -1447,7 +1447,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
// Prebuffering's read patterns are also slightly worse than the alternative
// when doing whole-file reads because the logic is not in place to recognize
// we can just read the entire file up-front
if (options_.included_fields.size() != 0 &&
if (!options_.included_fields.empty() &&
options_.included_fields.size() != schema_->fields().size() &&
!file_->supports_zero_copy()) {
RETURN_NOT_OK(state->PreBufferMetadata({}));
Expand Down Expand Up @@ -1907,7 +1907,7 @@ Result<std::shared_ptr<RecordBatchFileReader>> RecordBatchFileReader::Open(
Future<std::shared_ptr<RecordBatchFileReader>> RecordBatchFileReader::OpenAsync(
const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& options) {
ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
return OpenAsync(std::move(file), footer_offset, options);
return OpenAsync(file, footer_offset, options);
}

Future<std::shared_ptr<RecordBatchFileReader>> RecordBatchFileReader::OpenAsync(
Expand Down
21 changes: 9 additions & 12 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ bool HasNestedDict(const ArrayData& data) {
}

Status GetTruncatedBitmap(int64_t offset, int64_t length,
const std::shared_ptr<Buffer> input, MemoryPool* pool,
const std::shared_ptr<Buffer>& input, MemoryPool* pool,
std::shared_ptr<Buffer>* buffer) {
if (!input) {
*buffer = input;
Expand All @@ -103,7 +103,7 @@ Status GetTruncatedBitmap(int64_t offset, int64_t length,
}

Status GetTruncatedBuffer(int64_t offset, int64_t length, int32_t byte_width,
const std::shared_ptr<Buffer> input, MemoryPool* pool,
const std::shared_ptr<Buffer>& input, MemoryPool* pool,
std::shared_ptr<Buffer>* buffer) {
if (!input) {
*buffer = input;
Expand Down Expand Up @@ -252,7 +252,7 @@ class RecordBatchSerializer {
}

Status Assemble(const RecordBatch& batch) {
if (field_nodes_.size() > 0) {
if (!field_nodes_.empty()) {
field_nodes_.clear();
buffer_meta_.clear();
out_->body_buffers.clear();
Expand Down Expand Up @@ -335,8 +335,7 @@ class RecordBatchSerializer {
ARROW_ASSIGN_OR_RAISE(auto shifted_offsets,
AllocateBuffer(required_bytes, options_.memory_pool));

offset_type* dest_offsets =
reinterpret_cast<offset_type*>(shifted_offsets->mutable_data());
auto dest_offsets = shifted_offsets->mutable_span_as<offset_type>();
const offset_type start_offset = array.value_offset(0);

for (int i = 0; i < array.length(); ++i) {
Expand All @@ -362,7 +361,6 @@ class RecordBatchSerializer {
offset_type* out_min_offset,
offset_type* out_max_end) {
auto offsets = array.value_offsets();
auto sizes = array.value_sizes();

const int64_t required_bytes = sizeof(offset_type) * array.length();
if (array.offset() != 0) {
Expand Down Expand Up @@ -572,7 +570,7 @@ class RecordBatchSerializer {
Status Visit(const StructArray& array) {
--max_recursion_depth_;
for (int i = 0; i < array.num_fields(); ++i) {
std::shared_ptr<Array> field = array.field(i);
const auto& field = array.field(i);
RETURN_NOT_OK(VisitArray(*field));
}
++max_recursion_depth_;
Expand Down Expand Up @@ -641,8 +639,7 @@ class RecordBatchSerializer {
ARROW_ASSIGN_OR_RAISE(
auto shifted_offsets_buffer,
AllocateBuffer(length * sizeof(int32_t), options_.memory_pool));
int32_t* shifted_offsets =
reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
auto shifted_offsets = shifted_offsets_buffer->mutable_span_as<int32_t>();

// Offsets are guaranteed to be increasing according to the spec, so
// the first offset we find for a child is the initial offset and
Expand Down Expand Up @@ -899,7 +896,7 @@ Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool,
RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor,
scratch_space->mutable_data(), &stream));

out->reset(new Tensor(tensor.type(), contiguous_data, tensor.shape()));
*out = std::make_unique<Tensor>(tensor.type(), contiguous_data, tensor.shape());

return Status::OK();
}
Expand Down Expand Up @@ -1005,7 +1002,7 @@ class SparseTensorSerializer {
}

Status Assemble(const SparseTensor& sparse_tensor) {
if (buffer_meta_.size() > 0) {
if (!buffer_meta_.empty()) {
buffer_meta_.clear();
out_->body_buffers.clear();
}
Expand Down Expand Up @@ -1169,7 +1166,7 @@ Status RecordBatchWriter::WriteTable(const Table& table) { return WriteTable(tab

namespace internal {

IpcPayloadWriter::~IpcPayloadWriter() {}
IpcPayloadWriter::~IpcPayloadWriter() = default;

Status IpcPayloadWriter::Start() { return Status::OK(); }

Expand Down

0 comments on commit b6316c0

Please sign in to comment.