Skip to content

Commit

Permalink
Do not serialize/deserialize task data (fixes #116)
Browse files Browse the repository at this point in the history
  • Loading branch information
geirsagberg committed Jul 9, 2024
1 parent 2a7328f commit d5a72e1
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
*/
package no.bekk.dbscheduler.ui.model;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class TaskModel {

private String taskName;
private List<String> taskInstance;
private List<Object> taskData;
Expand All @@ -31,127 +35,4 @@ public class TaskModel {
private List<Integer> consecutiveFailures;
private Instant lastHeartbeat;
private int version;

private static final ObjectMapper objectMapper = new ObjectMapper();

public TaskModel(
String taskName,
List<String> taskInstance,
List<Object> inputTaskData,
List<Instant> executionTime,
List<Boolean> picked,
List<String> pickedBy,
List<Instant> lastSuccess,
Instant lastFailure,
List<Integer> consecutiveFailures,
Instant lastHeartbeat,
int version) {
this.taskName = taskName;
this.taskInstance = taskInstance;
this.taskData = serializeTaskData(inputTaskData);
this.executionTime = executionTime;
this.picked = picked;
this.pickedBy = pickedBy;
this.lastSuccess = lastSuccess;
this.lastFailure = lastFailure;
this.consecutiveFailures = consecutiveFailures;
this.lastHeartbeat = lastHeartbeat;
this.version = version;
}

public TaskModel() {}

private List<Object> serializeTaskData(List<Object> inputTaskDataList) {
return inputTaskDataList.stream()
.map(
data -> {
try {
String serializedData = objectMapper.writeValueAsString(data);
return objectMapper.readValue(serializedData, Object.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}

public String getTaskName() {
return taskName;
}

public List<String> getTaskInstance() {
return taskInstance;
}

public void setTaskInstance(List<String> taskInstance) {
this.taskInstance = taskInstance;
}

public List<Object> getTaskData() {
return taskData;
}

public void setTaskData(List<Object> inputTaskData) {
this.taskData = serializeTaskData(inputTaskData);
}

public void setExecutionTime(List<Instant> executionTime) {
this.executionTime = executionTime;
}

public List<Instant> getExecutionTime() {
return executionTime;
}

public List<Boolean> isPicked() {
return picked;
}

public void setPicked(List<Boolean> picked) {
this.picked = picked;
}

public List<String> getPickedBy() {
return pickedBy;
}

public void setPickedBy(List<String> pickedBy) {
this.pickedBy = pickedBy;
}

public List<Instant> getLastSuccess() {
return lastSuccess;
}

public void setLastSuccess(List<Instant> lastSuccess) {
this.lastSuccess = lastSuccess;
}

public Instant getLastFailure() {
return lastFailure;
}

public void setLastFailure(Instant lastFailure) {
this.lastFailure = lastFailure;
}

public List<Integer> getConsecutiveFailures() {
return consecutiveFailures;
}

public Instant getLastHeartbeat() {
return lastHeartbeat;
}

public void setLastHeartbeat(Instant lastHeartbeat) {
this.lastHeartbeat = lastHeartbeat;
}

public int getVersion() {
return version;
}

public void setConsecutiveFailures(List<Integer> consecutiveFailures) {
this.consecutiveFailures = consecutiveFailures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class LogLogic {

private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
private static final int DEFAULT_LIMIT = 500;
private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
private final Caching caching;
private final LogModelRowMapper logModelRowMapper;

Expand Down Expand Up @@ -124,7 +122,23 @@ public List<LogModel> getLogsDirectlyFromDB(TaskDetailsRequestParams requestPara
queryBuilder.getQuery(), queryBuilder.getParameters(), logModelRowMapper);
}

private enum Operators {
GREATER_THAN_OR_EQUALS(">="),
LESS_THAN_OR_EQUALS("<=");

private final String operator;

Operators(String operator) {
this.operator = operator;
}

public String getOperator() {
return operator;
}
}

private static class TimeCondition implements AndCondition {

private final String varName;
private final String operator;
private final Instant value;
Expand All @@ -147,6 +161,7 @@ public void setParameters(MapSqlParameterSource p) {
}

private static class SearchCondition implements AndCondition {

private final String searchTerm;
private final Map<String, Object> params;

Expand Down Expand Up @@ -176,6 +191,7 @@ public void setParameters(MapSqlParameterSource p) {
}

public static class FilterCondition implements AndCondition {

private final TaskRequestParams.TaskFilter filterCondition;

public FilterCondition(TaskRequestParams.TaskFilter filterCondition) {
Expand All @@ -197,6 +213,7 @@ public void setParameters(MapSqlParameterSource p) {

@RequiredArgsConstructor
public static class LogModelRowMapper implements RowMapper<LogModel> {

private final boolean showData;
private final Serializer serializer;

Expand Down Expand Up @@ -224,19 +241,4 @@ public LogModel mapRow(ResultSet rs, int rowNum) throws SQLException {
rs.getString("exception_stacktrace"));
}
}

private enum Operators {
GREATER_THAN_OR_EQUALS(">="),
LESS_THAN_OR_EQUALS("<=");

private final String operator;

Operators(String operator) {
this.operator = operator;
}

public String getOperator() {
return operator;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import no.bekk.dbscheduler.ui.model.*;
import no.bekk.dbscheduler.ui.model.GetTasksResponse;
import no.bekk.dbscheduler.ui.model.PollResponse;
import no.bekk.dbscheduler.ui.model.TaskDetailsRequestParams;
import no.bekk.dbscheduler.ui.model.TaskModel;
import no.bekk.dbscheduler.ui.model.TaskRequestParams;
import no.bekk.dbscheduler.ui.util.Caching;
import no.bekk.dbscheduler.ui.util.QueryUtils;
import no.bekk.dbscheduler.ui.util.mapper.TaskMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ResponseStatusException;

@Service
public class TaskLogic {

private final Scheduler scheduler;
private final Caching caching;
private final boolean showData;

@Autowired
public TaskLogic(Scheduler scheduler, Caching caching, boolean showData) {
this.scheduler = scheduler;
this.scheduler.start();
Expand Down Expand Up @@ -76,7 +80,6 @@ public void runTaskGroupNow(String taskName, boolean onlyFailed) {
System.out.println("Failed to run task: " + e.getMessage());
}
}
;
});
}

Expand Down Expand Up @@ -222,11 +225,17 @@ private void handleNewTask(
Set<String> newRunningTaskNames,
String taskName,
String status) {
if (newTaskNames.contains(taskName) && params.getTaskName() == null) return;
if (newTaskNames.contains(taskName) && params.getTaskName() == null) {
return;
}

newTaskNames.add(taskName);
if (status.charAt(0) == '1') newFailureTaskNames.add(taskName);
if (status.charAt(1) == '1') newRunningTaskNames.add(taskName);
if (status.charAt(0) == '1') {
newFailureTaskNames.add(taskName);
}
if (status.charAt(1) == '1') {
newRunningTaskNames.add(taskName);
}
}

private void handleStatusChange(
Expand All @@ -243,14 +252,18 @@ private void handleStatusChange(
&& (!newFailureTaskNames.contains(taskName) || params.getTaskName() != null)) {
newFailureTaskNames.add(taskName);
}
if (cachedStatus.charAt(0) == '1' && status.charAt(0) == '0') stoppedFailing++;
if (cachedStatus.charAt(0) == '1' && status.charAt(0) == '0') {
stoppedFailing++;
}

if (cachedStatus.charAt(1) == '0'
&& status.charAt(1) == '1'
&& (!newRunningTaskNames.contains(taskName) || params.getTaskName() != null)) {
newRunningTaskNames.add(taskName);
}
if (cachedStatus.charAt(1) == '1' && status.charAt(1) == '0') finishedRunning++;
if (cachedStatus.charAt(1) == '1' && status.charAt(1) == '0') {
finishedRunning++;
}
}

private String getStatus(ScheduledExecution<Object> task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ public static List<TaskModel> filterTasks(
case FAILED:
return task.getConsecutiveFailures().stream().anyMatch(failures -> failures != 0);
case RUNNING:
return task.isPicked().stream().anyMatch(Boolean::booleanValue);
return task.getPicked().stream().anyMatch(Boolean::booleanValue);
case SCHEDULED:
return IntStream.range(0, task.isPicked().size())
return IntStream.range(0, task.getPicked().size())
.anyMatch(
i ->
!task.isPicked().get(i) && task.getConsecutiveFailures().get(i) == 0);
!task.getPicked().get(i)
&& task.getConsecutiveFailures().get(i) == 0);
default:
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import no.bekk.dbscheduler.ui.model.TaskModel;

public class TaskMapper {

public static List<TaskModel> mapScheduledExecutionsToTaskModel(
List<ScheduledExecution<Object>> scheduledExecutions) {
return scheduledExecutions.stream()
Expand Down Expand Up @@ -63,7 +64,7 @@ public static List<TaskModel> groupTasks(List<TaskModel> tasks) {
.collect(Collectors.toList()));
taskModel.setPicked(
taskModels.stream()
.map(TaskModel::isPicked)
.map(TaskModel::getPicked)
.flatMap(List::stream)
.collect(Collectors.toList()));
taskModel.setPickedBy(
Expand Down Expand Up @@ -92,11 +93,6 @@ public static List<TaskModel> groupTasks(List<TaskModel> tasks) {
.collect(Collectors.toList());
}

public static List<TaskModel> mapAllExecutionsToTaskModel(
List<ScheduledExecution<Object>> scheduledExecutions) {
return groupTasks(mapScheduledExecutionsToTaskModel(scheduledExecutions));
}

public static List<TaskModel> mapAllExecutionsToTaskModelUngrouped(
List<ScheduledExecution<Object>> scheduledExecutions) {
return mapScheduledExecutionsToTaskModel(scheduledExecutions);
Expand Down
Loading

0 comments on commit d5a72e1

Please sign in to comment.