From 85028e24319b46c66289ac02488da90111795dbc Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 5 Dec 2024 11:56:59 +0800 Subject: [PATCH] [Fix](job)Fix CAS competition failure leading to message publishing failure. tryPublish returns false in case of CAS competition failure or when the queue is full. Therefore, we need to first check if there is enough capacity and then use the publish method for serial publishing. --- .../apache/doris/job/disruptor/TaskDisruptor.java | 12 ++++++++++-- .../org/apache/doris/job/scheduler/JobScheduler.java | 4 +++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 6ca2924c593bc1..b04355385a5119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -74,12 +74,20 @@ public void start() { public boolean publishEvent(Object... args) { try { RingBuffer ringBuffer = disruptor.getRingBuffer(); - return ringBuffer.tryPublishEvent(eventTranslator, args); + // Check if the RingBuffer has enough capacity to reserve 10 slots for tasks + // If there is insufficient capacity (less than 10 slots available) + // log a warning and drop the current task + if (!ringBuffer.hasAvailableCapacity(10)) { + LOG.warn("ring buffer has no available capacity,task will be dropped," + + "please check the task queue size."); + return false; + } + ringBuffer.publishEvent(eventTranslator, args); } catch (Exception e) { LOG.warn("Failed to publish event", e); // Handle the exception, e.g., retry or alert } - return false; + return true; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index ea0c263a5ee1c3..7f8b39f1e66dc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -168,7 +168,9 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE for (AbstractTask task : tasks) { if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())) { - throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId())); + String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId()); + task.onFail(errorMsg); + throw new JobException(errorMsg); } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),