Skip to content

Commit

Permalink
[Fix](job)Fix CAS competition failure leading to message publishing f…
Browse files Browse the repository at this point in the history
…ailure.

 tryPublish returns false in case of CAS competition failure or when the queue is full. Therefore, we need to first check if there is enough capacity and then use the publish method for serial publishing.
  • Loading branch information
CalvinKirs committed Dec 5, 2024
1 parent 4c55b53 commit 85028e2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,20 @@ public void start() {
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
return ringBuffer.tryPublishEvent(eventTranslator, args);
// Check if the RingBuffer has enough capacity to reserve 10 slots for tasks
// If there is insufficient capacity (less than 10 slots available)
// log a warning and drop the current task
if (!ringBuffer.hasAvailableCapacity(10)) {
LOG.warn("ring buffer has no available capacity,task will be dropped,"
+ "please check the task queue size.");
return false;
}
ringBuffer.publishEvent(eventTranslator, args);
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert
}
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId());
task.onFail(errorMsg);
throw new JobException(errorMsg);

}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
Expand Down

0 comments on commit 85028e2

Please sign in to comment.