Skip to content

Commit

Permalink
Customization
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Meng committed Apr 13, 2024
1 parent e195ef6 commit 9700c4d
Show file tree
Hide file tree
Showing 26 changed files with 784 additions and 101 deletions.
15 changes: 12 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ allprojects {
}

// all client and their related modules are published with Java 17 compatibility
["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each {
// calix
// ["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each {
["annotations", "common", "client", "client-spring"].each {
// end calix
project(":conductor-$it") {
compileJava {
options.release = 17
Expand All @@ -110,7 +113,10 @@ task server {
dependsOn ':conductor-server:bootRun'
}

configure(allprojects - project(':conductor-grpc')) {
// calix
// configure(allprojects - project(':conductor-grpc')) {
configure(allprojects) {
// end calix
apply plugin: 'com.diffplug.spotless'

spotless {
Expand All @@ -123,7 +129,10 @@ configure(allprojects - project(':conductor-grpc')) {
}
}

['cassandra-persistence', 'core', 'redis-concurrency-limit', 'test-harness', 'client'].each {
// calix
// ['cassandra-persistence', 'core', 'redis-concurrency-limit', 'test-harness', 'client'].each {
['core', 'redis-concurrency-limit', 'test-harness', 'client'].each {
// end calix
configure(project(":conductor-$it")) {
spotless {
groovy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,45 @@ public Map<String, Object> getAll() {
props.forEach((key, value) -> map.put(key.toString(), value));
return map;
}

// calix
private int asyncDaoThreadCount = Runtime.getRuntime().availableProcessors() * 2;
private int asyncListenerThreadCount = Runtime.getRuntime().availableProcessors() * 2;
private int asyncStartThreadCount = Runtime.getRuntime().availableProcessors() * 2;

@DurationUnit(ChronoUnit.MILLIS)
private Duration workflowUnAckLockTimeout = Duration.ofMillis(500);

public int getAsyncDaoThreadCount() {
return this.asyncDaoThreadCount;
}

public void setAsyncDaoThreadCount(int asyncDaoThreadCount) {
this.asyncDaoThreadCount = asyncDaoThreadCount;
}

public int getAsyncListenerThreadCount() {
return this.asyncListenerThreadCount;
}

public void setAsyncListenerThreadCount(int asyncListenerThreadCount) {
this.asyncListenerThreadCount = asyncListenerThreadCount;
}

public int getAsyncStartThreadCount() {
return this.asyncStartThreadCount;
}

public void setAsyncStartThreadCount(int asyncStartThreadCount) {
this.asyncStartThreadCount = asyncStartThreadCount;
}

public Duration getWorkflowUnAckLockTimeout() {
return workflowUnAckLockTimeout;
}

public void setWorkflowUnAckLockTimeout(Duration workflowUnAckLockTimeout) {
this.workflowUnAckLockTimeout = workflowUnAckLockTimeout;
}
// end calix
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ public long getInProgressTaskCount(String taskDefName) {
* payload fails.
*/
public void updateTask(TaskModel taskModel) {
// calix
long s = Monitors.now();
// end calix
if (taskModel.getStatus() != null) {
if (!taskModel.getStatus().isTerminal()
|| (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) {
Expand Down Expand Up @@ -526,6 +529,10 @@ public void updateTask(TaskModel taskModel) {
taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
} finally {
// calix
Monitors.recordTaskUpdateDuration(taskModel.getTaskType(), s);
// end calix
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public AsyncSystemTaskExecutor(
* @param taskId The id of the {@link TaskModel} object.
*/
public void execute(WorkflowSystemTask systemTask, String taskId) {
// calix
long s = Monitors.now();
// end calix
TaskModel task = loadTaskQuietly(taskId);
if (task == null) {
LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask);
Expand Down Expand Up @@ -88,6 +91,9 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
queueDAO.remove(queueName, task.getTaskId());
return;
}
// calix
Monitors.recordWorkflowTaskSys(queueName, "get_tasks", s);
// end calix

if (task.getStatus().equals(TaskModel.Status.SCHEDULED)) {
if (executionDAOFacade.exceedsInProgressLimit(task)) {
Expand Down Expand Up @@ -115,9 +121,15 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
// if we are here the Task object is updated and needs to be persisted regardless of an
// exception
try {
// calix
s = Monitors.now();
// end calix
WorkflowModel workflow =
executionDAOFacade.getWorkflowModel(
workflowId, systemTask.isTaskRetrievalRequired());
// calix
Monitors.recordWorkflowTaskSys(queueName, "get_wf", s);
// end calix

if (workflow.getStatus().isTerminal()) {
LOGGER.info(
Expand Down Expand Up @@ -146,12 +158,21 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
task.incrementPollCount();
}

// calix
s = Monitors.now();
// end calix
if (task.getStatus() == TaskModel.Status.SCHEDULED) {
task.setStartTime(System.currentTimeMillis());
Monitors.recordQueueWaitTime(task.getTaskType(), task.getQueueWaitTime());
systemTask.start(workflow, task, workflowExecutor);
// calix
Monitors.recordWorkflowTaskSys(queueName, "task_start", s);
// end calix
} else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
systemTask.execute(workflow, task, workflowExecutor);
// calix
Monitors.recordWorkflowTaskSys(queueName, "task_exec", s);
// end calix
}

// Update message in Task queue based on Task status
Expand Down Expand Up @@ -187,14 +208,25 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
Monitors.error(AsyncSystemTaskExecutor.class.getSimpleName(), "executeSystemTask");
LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
} finally {
// calix
s = Monitors.now();
executionDAOFacade.updateTask(task);
s = Monitors.recordWorkflowTaskSys(queueName, "update_task", s);
// end calix
if (shouldRemoveTaskFromQueue) {
queueDAO.remove(queueName, task.getTaskId());
LOGGER.debug("{} removed from queue: {}", task, queueName);
// calix
s = Monitors.recordWorkflowTaskSys(queueName, "queue_rem", s);
// end calix
}
// if the current task execution has completed, then the workflow needs to be evaluated
if (hasTaskExecutionCompleted) {
workflowExecutor.decide(workflowId);
// calix
// workflowExecutor.decide(workflowId);
workflowExecutor.locked(workflowExecutor.decide(workflowId), workflowId);
Monitors.recordWorkflowTaskSys(queueName, "decide", s);
// end calix
}
}
}
Expand Down
Loading

0 comments on commit 9700c4d

Please sign in to comment.