From e167394dc16e52e4044a32d52226559ff923464c Mon Sep 17 00:00:00 2001 From: airborne12 Date: Thu, 13 Jul 2023 11:52:24 +0800 Subject: [PATCH] [Fix](pipeline) close sink when fragment context destructs (#21668) Co-authored-by: airborne12 --- be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++ be/src/pipeline/pipeline_fragment_context.h | 1 + be/src/runtime/fragment_mgr.cpp | 4 ++++ 3 files changed, 15 insertions(+) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 8eef37d9315469..7e85b19205d355 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -705,6 +705,16 @@ Status PipelineFragmentContext::submit() { } } +void PipelineFragmentContext::close_sink() { + if (_sink) { + if (_prepared) { + _sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed")); + } else { + _sink->close(_runtime_state.get(), Status::OK()); + } + } +} + void PipelineFragmentContext::close_if_prepare_failed() { if (_tasks.empty()) { if (_root_plan) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 262794154bcbf8..cda6206d9b9e77 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -83,6 +83,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this lock(_lock); _fragment_map.clear(); _query_ctx_map.clear(); + for (auto& pipeline : _pipeline_map) { + pipeline.second->close_sink(); + } + _pipeline_map.clear(); } }