Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Aug 5, 2024
1 parent d2af912 commit 7fd41a9
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 411 deletions.
2 changes: 1 addition & 1 deletion schemas/internal/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ message Command {
RescueThreadRunRequest rescue_thread_run = 22;
DeleteTaskWorkerGroupRequest delete_task_worker_group = 23;
ScheduleWfRun schedule_wf_run = 24;
ScheduleWfRequest schedule_wf_run_request = 25;
// TODO: Add SaveUserTask
}
}
Expand Down Expand Up @@ -81,6 +80,7 @@ message MetadataCommand {
DeletePrincipalRequest delete_principal = 13;
PutTenantRequest put_tenant = 14;
PutWorkflowEventDefRequest workflow_event_def = 15;
ScheduleWfRequest schedule_wf_run_request = 16;
// TODO: DeleteTenant
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.littlehorse.common.model.corecommand.subcommand.RescueThreadRunRequestModel;
import io.littlehorse.common.model.corecommand.subcommand.ResumeWfRunRequestModel;
import io.littlehorse.common.model.corecommand.subcommand.RunWfRequestModel;
import io.littlehorse.common.model.corecommand.subcommand.ScheduleWfRequestModel;
import io.littlehorse.common.model.corecommand.subcommand.SleepNodeMaturedModel;
import io.littlehorse.common.model.corecommand.subcommand.StopWfRunRequestModel;
import io.littlehorse.common.model.corecommand.subcommand.TaskAttemptRetryReadyModel;
Expand Down Expand Up @@ -65,7 +64,6 @@ public class CommandModel extends AbstractCommand<Command> {
private RescueThreadRunRequestModel rescueThreadRun;
private DeleteTaskWorkerGroupRequestModel deleteTaskWorkerGroup;
private ScheduleWfRunCommandModel scheduleWfRun;
private ScheduleWfRequestModel scheduleWfRunRequest;

public Class<Command> getProtoBaseClass() {
return Command.class;
Expand Down Expand Up @@ -155,9 +153,6 @@ public Command.Builder toProto() {
case SCHEDULE_WF_RUN:
out.setScheduleWfRun(scheduleWfRun.toProto());
break;
case SCHEDULE_WF_RUN_REQUEST:
out.setScheduleWfRunRequest(scheduleWfRunRequest.toProto());
break;
case COMMAND_NOT_SET:
throw new RuntimeException("Not possible");
}
Expand Down Expand Up @@ -246,10 +241,6 @@ public void initFrom(Message proto, ExecutionContext context) {
scheduleWfRun =
LHSerializable.fromProto(p.getScheduleWfRun(), ScheduleWfRunCommandModel.class, context);
break;
case SCHEDULE_WF_RUN_REQUEST:
scheduleWfRunRequest =
LHSerializable.fromProto(p.getScheduleWfRunRequest(), ScheduleWfRequestModel.class, context);
break;
case COMMAND_NOT_SET:
throw new RuntimeException("Not possible");
}
Expand Down Expand Up @@ -300,8 +291,6 @@ public CoreSubCommand<?> getSubCommand() {
return deleteTaskWorkerGroup;
case SCHEDULE_WF_RUN:
return scheduleWfRun;
case SCHEDULE_WF_RUN_REQUEST:
return scheduleWfRunRequest;
case COMMAND_NOT_SET:
}
throw new IllegalStateException("Not possible to have missing subcommand.");
Expand Down Expand Up @@ -372,9 +361,6 @@ public void setSubCommand(CoreSubCommand<?> cmd) {
} else if (cls.equals(ScheduleWfRunCommandModel.class)) {
type = CommandCase.SCHEDULE_WF_RUN;
scheduleWfRun = (ScheduleWfRunCommandModel) cmd;
} else if (cls.equals(ScheduleWfRequestModel.class)) {
type = CommandCase.SCHEDULE_WF_RUN_REQUEST;
scheduleWfRunRequest = (ScheduleWfRequestModel) cmd;
} else {
throw new IllegalArgumentException("Unrecognized SubCommand class: " + cls.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@
import com.google.protobuf.Message;
import io.grpc.Status;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.LHServerConfig;
import io.littlehorse.common.exceptions.LHApiException;
import io.littlehorse.common.model.LHTimer;
import io.littlehorse.common.model.corecommand.CommandModel;
import io.littlehorse.common.model.corecommand.CoreSubCommand;
import io.littlehorse.common.model.getable.core.variable.VariableValueModel;
import io.littlehorse.common.model.getable.core.wfrun.ScheduledWfRunModel;
import io.littlehorse.common.model.getable.global.wfspec.WfSpecModel;
import io.littlehorse.common.model.getable.objectId.ScheduledWfRunIdModel;
import io.littlehorse.common.model.getable.objectId.WfRunIdModel;
import io.littlehorse.common.model.getable.objectId.WfSpecIdModel;
import io.littlehorse.common.model.metadatacommand.MetadataSubCommand;
import io.littlehorse.common.model.metadatacommand.subcommand.ScheduleWfRunCommandModel;
import io.littlehorse.common.util.LHUtil;
import io.littlehorse.sdk.common.exception.LHSerdeError;
import io.littlehorse.sdk.common.proto.ScheduleWfRequest;
import io.littlehorse.sdk.common.proto.VariableValue;
import io.littlehorse.server.streams.topology.core.ExecutionContext;
import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext;
import io.littlehorse.server.streams.topology.core.MetadataCommandExecution;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class ScheduleWfRequestModel extends CoreSubCommand<ScheduleWfRequest> {
public class ScheduleWfRequestModel extends MetadataSubCommand<ScheduleWfRequest> {
private String id;
private String wfSpecName;
private Integer majorVersion;
Expand Down Expand Up @@ -81,9 +80,12 @@ public boolean hasResponse() {
}

@Override
public Message process(ProcessorExecutionContext executionContext, LHServerConfig config) {
public Message process(MetadataCommandExecution executionContext) {
Date eventTime = executionContext.currentCommand().getTime();
Optional<Date> scheduledTime = LHUtil.nextDate(cronExpression, eventTime);
if (id == null) {
id = LHUtil.generateGuid();
}
if (scheduledTime.isPresent()) {
WfRunIdModel wfRunId = new WfRunIdModel(LHUtil.generateGuid(), parentWfRunId);
WfSpecModel spec = executionContext.service().getWfSpec(wfSpecName, majorVersion, revision);
Expand All @@ -95,27 +97,17 @@ public Message process(ProcessorExecutionContext executionContext, LHServerConfi
WfSpecIdModel wfSpecId = spec.getId();
ScheduleWfRunCommandModel scheduledCommand =
new ScheduleWfRunCommandModel(id, wfRunId, wfSpecId, variables, cronExpression);
scheduledCommand.getPartitionKey();
LHTimer timer = new LHTimer(new CommandModel(scheduledCommand));
timer.maturationTime = scheduledTime.get();
executionContext.getTaskManager().scheduleTimer(timer);
executionContext.forward(timer);
ScheduledWfRunIdModel scheduledId = new ScheduledWfRunIdModel(id);
ScheduledWfRunModel scheduledWfRun =
new ScheduledWfRunModel(scheduledId, wfSpecId, variables, parentWfRunId, cronExpression);
executionContext.getableManager().put(scheduledWfRun);
executionContext.metadataManager().put(scheduledWfRun);
return scheduledWfRun.toProto().build();
} else {
throw new LHApiException(Status.INVALID_ARGUMENT, "Invalid next date");
}
}

@Override
public String getPartitionKey() {
if (id == null) id = LHUtil.generateGuid();

// Child wfrun needs access to state of parent, so it needs to be on the same partition
if (parentWfRunId != null) {
return parentWfRunId.getPartitionKey().get();
}
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.google.protobuf.Message;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.model.AbstractGetable;
import io.littlehorse.common.model.CoreGetable;
import io.littlehorse.common.model.MetadataGetable;
import io.littlehorse.common.model.getable.core.variable.VariableValueModel;
import io.littlehorse.common.model.getable.objectId.ScheduledWfRunIdModel;
import io.littlehorse.common.model.getable.objectId.WfRunIdModel;
Expand All @@ -22,7 +22,7 @@
import java.util.Map;
import java.util.Optional;

public class ScheduledWfRunModel extends CoreGetable<ScheduledWfRun> {
public class ScheduledWfRunModel extends MetadataGetable<ScheduledWfRun> {
private ScheduledWfRunIdModel id;
private WfSpecIdModel wfSPecId;
private Map<String, VariableValueModel> variables = new HashMap<>();
Expand Down Expand Up @@ -50,7 +50,7 @@ public ScheduledWfRunModel(
public void initFrom(Message proto, ExecutionContext context) throws LHSerdeError {
ScheduledWfRun p = (ScheduledWfRun) proto;
id = ScheduledWfRunIdModel.fromProto(p.getId(), ScheduledWfRunIdModel.class, context);
wfSPecId = WfSpecIdModel.fromProto(proto, WfSpecIdModel.class, context);
wfSPecId = WfSpecIdModel.fromProto(p.getWfSpecId(), WfSpecIdModel.class, context);
cronExpression = p.getCronExpression();
createdAt = LHUtil.fromProtoTs(p.getCreatedAt());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.littlehorse.common.model.getable.objectId;

import com.google.protobuf.Message;
import io.littlehorse.common.model.getable.CoreObjectId;
import io.littlehorse.common.model.getable.MetadataId;
import io.littlehorse.common.model.getable.core.wfrun.ScheduledWfRunModel;
import io.littlehorse.common.proto.GetableClassEnum;
import io.littlehorse.sdk.common.exception.LHSerdeError;
Expand All @@ -10,7 +10,7 @@
import io.littlehorse.server.streams.topology.core.ExecutionContext;
import java.util.Optional;

public class ScheduledWfRunIdModel extends CoreObjectId<ScheduledWfRunId, ScheduledWfRun, ScheduledWfRunModel> {
public class ScheduledWfRunIdModel extends MetadataId<ScheduledWfRunId, ScheduledWfRun, ScheduledWfRunModel> {

private String id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.LHServerConfig;
import io.littlehorse.common.model.AbstractCommand;
import io.littlehorse.common.model.corecommand.subcommand.ScheduleWfRequestModel;
import io.littlehorse.common.model.metadatacommand.subcommand.DeleteExternalEventDefRequestModel;
import io.littlehorse.common.model.metadatacommand.subcommand.DeletePrincipalRequestModel;
import io.littlehorse.common.model.metadatacommand.subcommand.DeleteTaskDefRequestModel;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class MetadataCommandModel extends AbstractCommand<MetadataCommand> {
private DeletePrincipalRequestModel deletePrincipal;
private PutTenantRequestModel putTenant;
private PutWorkflowEventDefRequestModel putWorkflowEventDef;
private ScheduleWfRequestModel scheduleWfRun;

public MetadataCommandModel() {
super();
Expand Down Expand Up @@ -111,6 +113,9 @@ public MetadataCommand.Builder toProto() {
case WORKFLOW_EVENT_DEF:
out.setWorkflowEventDef(putWorkflowEventDef.toProto());
break;
case SCHEDULE_WF_RUN_REQUEST:
out.setScheduleWfRunRequest(scheduleWfRun.toProto());
break;
case METADATACOMMAND_NOT_SET:
log.warn("Metadata command was empty! Will throw LHSerdeError in future.");
}
Expand Down Expand Up @@ -170,6 +175,10 @@ public void initFrom(Message proto, ExecutionContext context) {
putWorkflowEventDef = LHSerializable.fromProto(
p.getWorkflowEventDef(), PutWorkflowEventDefRequestModel.class, context);
break;
case SCHEDULE_WF_RUN_REQUEST:
scheduleWfRun =
LHSerializable.fromProto(p.getScheduleWfRunRequest(), ScheduleWfRequestModel.class, context);
break;
case METADATACOMMAND_NOT_SET:
log.warn("Metadata command was empty! Will throw LHSerdeError in future.");
}
Expand Down Expand Up @@ -202,6 +211,8 @@ public MetadataSubCommand<?> getSubCommand() {
return putTenant;
case WORKFLOW_EVENT_DEF:
return putWorkflowEventDef;
case SCHEDULE_WF_RUN_REQUEST:
return scheduleWfRun;
case METADATACOMMAND_NOT_SET:
}
throw new IllegalStateException("Not possible to have missing subcommand.");
Expand Down Expand Up @@ -245,6 +256,9 @@ public void setSubCommand(MetadataSubCommand<?> cmd) {
} else if (cls.equals(PutWorkflowEventDefRequestModel.class)) {
type = MetadataCommandCase.WORKFLOW_EVENT_DEF;
putWorkflowEventDef = (PutWorkflowEventDefRequestModel) cmd;
} else if (cls.equals(ScheduleWfRequestModel.class)) {
type = MetadataCommandCase.SCHEDULE_WF_RUN_REQUEST;
scheduleWfRun = (ScheduleWfRequestModel) cmd;
} else {
throw new IllegalArgumentException("Unrecognized SubCommand class: " + cls.getName());
}
Expand Down
Loading

0 comments on commit 7fd41a9

Please sign in to comment.