Skip to content

Commit

Permalink
[improvement](spill) avoid occuping too much memory while spill build…
Browse files Browse the repository at this point in the history
… block during the hash join build phase (apache#33747)
  • Loading branch information
mrhhsg authored Apr 17, 2024
1 parent bb1bbb2 commit 73c336a
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}

std::vector<uint32_t> partition_indexes[_partition_count];
auto* channel_ids = reinterpret_cast<uint64_t*>(local_state._partitioner->get_channel_ids());
auto* channel_ids = reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids());
for (uint32_t i = 0; i != rows; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
Expand Down Expand Up @@ -862,6 +862,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
RETURN_IF_ERROR(
_inner_probe_operator->pull(local_state._runtime_state.get(), block, eos));
if (*eos) {
_update_profile_from_internal_states(local_state);
local_state._runtime_state.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RuntimeState;

namespace pipeline {

using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;

class PartitionedHashJoinProbeOperatorX;

Expand Down
166 changes: 129 additions & 37 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,21 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,

Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
for (uint32_t i = 0; i != p._partition_count; ++i) {
auto& spilling_stream = _shared_state->spilled_streams[i];
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream, print_id(state->query_id()),
fmt::format("hash_build_sink_{}", i), _parent->id(),
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer,
_spill_write_wait_io_timer);
}
return _partitioner->open(state);
}

Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
Expand Down Expand Up @@ -87,39 +100,127 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}

Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
auto build_block = std::move(_shared_state->inner_shared_state->build_block);
if (!build_block) {
build_block = vectorized::Block::create_shared();
auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state) {
auto& mutable_block = reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
->_build_side_mutable_block;
*build_block = mutable_block.to_block();
LOG(INFO) << "hash join sink will revoke build mutable block: "
<< build_block->allocated_bytes();
}
}

/// Here need to skip the first row in build block.
/// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`.
if (build_block->rows() <= 1) {
return Status::OK();
}

if (build_block->columns() > row_desc.num_slots()) {
build_block->erase(row_desc.num_slots());
}

{
/// TODO: DO NOT execute build exprs twice(when partition and building hash table)
SCOPED_TIMER(_partition_timer);
RETURN_IF_ERROR(
_partitioner->do_partitioning(state, build_block.get(), _mem_tracker.get()));
}

auto execution_context = state->get_task_execution_context();
_dependency->block();
auto spill_func = [execution_context, build_block, state, this]() {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());

auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<uint32_t> partition_indices;
const auto reserved_size = 4096;
partition_indices.reserve(reserved_size);

auto flush_rows = [&partition_indices, &build_block, &state, this](
std::unique_ptr<vectorized::MutableBlock>& partition_block,
vectorized::SpillStreamSPtr& spilling_stream) {
auto* begin = &(partition_indices[0]);
const auto count = partition_indices.size();
if (!partition_block) {
partition_block =
vectorized::MutableBlock::create_unique(build_block->clone_empty());
}
partition_block->add_rows(build_block.get(), begin, begin + count);
partition_indices.clear();

if (partition_block->allocated_bytes() >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
auto block = partition_block->to_block();
partition_block =
vectorized::MutableBlock::create_unique(build_block->clone_empty());
auto status = spilling_stream->spill_block(state, block, false);

if (!status.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = status;
_spill_status_ok = false;
_dependency->set_ready();
return false;
}
}
return true;
};

for (uint32_t i = 0; i != p._partition_count; ++i) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
DCHECK(spilling_stream != nullptr);

const auto rows = build_block->rows();
for (size_t idx = 1; idx != rows; ++idx) {
if (channel_ids[idx] == i) {
partition_indices.emplace_back(idx);
} else {
continue;
}

const auto count = partition_indices.size();
if (UNLIKELY(count >= reserved_size)) {
if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
break;
}
}
}

if (!partition_indices.empty()) {
flush_rows(partitioned_blocks[i], spilling_stream);
}
}

_dependency->set_ready();
};
auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
return thread_pool->submit_func(spill_func);
}

Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);

if (!_shared_state->need_to_spill) {
profile()->add_info_string("Spilled", "true");
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
auto build_block = std::move(_shared_state->inner_shared_state->build_block);
if (!build_block) {
build_block = vectorized::Block::create_shared();
auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state) {
auto& mutable_block =
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
->_build_side_mutable_block;
*build_block = mutable_block.to_block();
LOG(INFO) << "hash join sink will revoke build mutable block: "
<< build_block->allocated_bytes();
}
}

/// Here need to skip the first row in build block.
/// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`.
if (build_block->rows() > 1) {
if (build_block->columns() > row_desc.num_slots()) {
build_block->erase(row_desc.num_slots());
}
RETURN_IF_ERROR(_partition_block(state, build_block.get(), 1, build_block->rows()));
}
return _revoke_unpartitioned_block(state);
}

_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
Expand All @@ -133,16 +234,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
continue;
}

if (!spilling_stream) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream, print_id(state->query_id()), "hash_build_sink",
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer,
_spill_write_wait_io_timer);
}
DCHECK(spilling_stream != nullptr);

auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
Expand Down Expand Up @@ -201,7 +293,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,

auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
std::vector<uint32_t> partition_indexes[p._partition_count];
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RuntimeState;

namespace pipeline {

using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;

class PartitionedHashJoinSinkOperatorX;

Expand All @@ -60,6 +60,8 @@ class PartitionedHashJoinSinkLocalState
Status _partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin,
size_t end);

Status _revoke_unpartitioned_block(RuntimeState* state);

friend class PartitionedHashJoinSinkOperatorX;

std::atomic_int _spilling_streams_count {0};
Expand Down

0 comments on commit 73c336a

Please sign in to comment.