Skip to content

Commit

Permalink
[Fix](pipeline) close sink when fragment context destructs (apache#21668
Browse files Browse the repository at this point in the history
)

Co-authored-by: airborne12 <[email protected]>
  • Loading branch information
airborne12 and airborne12 authored Jul 13, 2023
1 parent 14253b6 commit e167394
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 0 deletions.
10 changes: 10 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,16 @@ Status PipelineFragmentContext::submit() {
}
}

void PipelineFragmentContext::close_sink() {
if (_sink) {
if (_prepared) {
_sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed"));
} else {
_sink->close(_runtime_state.get(), Status::OK());
}
}
}

void PipelineFragmentContext::close_if_prepare_failed() {
if (_tasks.empty()) {
if (_root_plan) {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
Status submit();

void close_if_prepare_failed();
void close_sink();

void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; }

Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() {
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_query_ctx_map.clear();
for (auto& pipeline : _pipeline_map) {
pipeline.second->close_sink();
}
_pipeline_map.clear();
}
}

Expand Down

0 comments on commit e167394

Please sign in to comment.