From d27d0ceb4886cbdbdfffa94bf71369c0a16c0579 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Tue, 11 Jun 2024 17:34:48 +0200 Subject: [PATCH] fixes --- libminifi/include/SchedulingAgent.h | 2 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 62 ++++++++++++-------- libminifi/src/SchedulingAgent.cpp | 6 +- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index a21cd1fe58..99ebd62c8e 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -91,7 +91,7 @@ class SchedulingAgent { nonstd::expected triggerAndCommit(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& session_factory); - nonstd::expected trigger(core::Processor* processor, + nonstd::expected trigger(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& process_session); diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index f27253d42e..61b338b958 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -38,38 +38,48 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* processor) { utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& session_factory) { - if (this->running_) { - const auto start_time = std::chrono::steady_clock::now(); - // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice + if (!this->running_) { + return utils::TaskRescheduleInfo::Done(); + } + if (processorYields(processor)) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); + } + + const auto start_time = std::chrono::steady_clock::now(); + // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice - const auto process_session = session_factory->createSession(); - process_session->setMetrics(processor->getMetrics()); + const auto process_session = session_factory->createSession(); + process_session->setMetrics(processor->getMetrics()); - try { - while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { - this->trigger(processor, process_context, process_session); - if (processor->isYield()) { - process_session->commit(); - return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); - } + + try { + const auto run_commit = gsl::finally([&]() { + process_session->commit(); + }); + while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { + const auto trigger_result = this->trigger(processor, process_context, process_session); + if (!trigger_result || !*trigger_result) { + break; } - process_session->commit(); - } catch (const std::exception& exception) { - logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", - exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); - processor->yield(admin_yield_duration_); - process_session->rollback(); - throw; - } catch (...) { - logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName()); - processor->yield(admin_yield_duration_); - process_session->rollback(); - throw; } + } catch (const std::exception& exception) { + logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", + exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); + processor->yield(admin_yield_duration_); + process_session->rollback(); + throw; + } catch (...) { + logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName()); + processor->yield(admin_yield_duration_); + process_session->rollback(); + throw; + } - return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available + if (processor->isYield()) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); } - return utils::TaskRescheduleInfo::Done(); + + return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available } } // namespace org::apache::nifi::minifi diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index c4a3ff8a1e..f42ca057f6 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -99,12 +99,12 @@ nonstd::expected SchedulingAgent::triggerAndCommit(cor return {}; } -nonstd::expected SchedulingAgent::trigger(core::Processor* processor, +nonstd::expected SchedulingAgent::trigger(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& process_session) { gsl_Expects(processor); if (processorYields(processor)) { - return {}; + return false; } auto schedule_it = scheduled_processors_.end(); @@ -134,7 +134,7 @@ nonstd::expected SchedulingAgent::trigger(core::Proces processor->yield(admin_yield_duration_); return nonstd::make_unexpected(std::current_exception()); } - return {}; + return true; } void SchedulingAgent::watchDogFunc() {