Skip to content

Commit

Permalink
use a strongly typed IncrementalTaskId
Browse files Browse the repository at this point in the history
  • Loading branch information
eschultink committed Sep 20, 2024
1 parent a514c6a commit b9b2ff8
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.appengine.tools.mapreduce.MapReduceJob;
import com.google.appengine.tools.mapreduce.MapReduceServlet;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskId;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobRunner;
import com.google.appengine.tools.mapreduce.impl.util.RequestUtils;
Expand Down Expand Up @@ -117,15 +118,16 @@ public void doPost(HttpServletRequest request, HttpServletResponse response)
return;
}
ShardedJobRunner shardedJobRunner = stepExecutionComponent.shardedJobRunner();
shardedJobRunner.completeShard(getJobId(request), request.getParameter(TASK_ID_PARAM));
shardedJobRunner.completeShard(getJobId(request), IncrementalTaskId.parse(request.getParameter(TASK_ID_PARAM)));
} else if (handler.startsWith(WORKER_PATH)) {
if (!checkForTaskQueue(request, response)) {
return;
}
ShardedJobRunner shardedJobRunner = stepExecutionComponent.shardedJobRunner();
shardedJobRunner.runTask(
getJobId(request),
checkNotNull(request.getParameter(TASK_ID_PARAM), "Null task id"), Integer.parseInt(request.getParameter(SEQUENCE_NUMBER_PARAM)));
IncrementalTaskId.parse(checkNotNull(request.getParameter(TASK_ID_PARAM), "Null task id")),
Integer.parseInt(request.getParameter(SEQUENCE_NUMBER_PARAM)));
} else if (handler.startsWith(COMMAND_PATH)) {
if (!checkForAjax(request, response)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,51 @@
import lombok.NonNull;
import lombok.Value;

//represents a shared job task (eg, one of N shards of a sharded task)
/**
* identifies a task that will executed incrementally (slices)
* represents a sharded job task (eg, one of N shards of a sharded task)
*
* arguably it's just a ShardId ... sliced/incremental executions is an implementation detail
*/
@Value
@AllArgsConstructor(staticName = "of")
class IncrementalTaskId {
public class IncrementalTaskId {

/**
* the sharded job this task belongs to
*/
@NonNull
ShardedJobId shardedJobId;

/**
* which shard this represents, eg, 0-39 for 40 shards
*/
int number;

@Override
public String toString() {
return asEncodedString();
}

/**
* @return a string representation of this task id that can be used in URLs
*/
public String asEncodedString() {
return prefix(shardedJobId) + number;
}

@Deprecated
static IncrementalTaskId parse(ShardedJobId shardedJobId, String taskId) {
return IncrementalTaskId.of(shardedJobId, Integer.parseInt(taskId.substring(prefix(shardedJobId).length())));
}

public static IncrementalTaskId parse(String taskId) {
String numberPart = taskId.substring(taskId.lastIndexOf("-"), taskId.length());
String suffix = "-task-" + numberPart;
String encodedJobId = taskId.replace(suffix, "").replace("-", "/");
return IncrementalTaskId.of(ShardedJobId.fromEncodedString(encodedJobId), Integer.parseInt(numberPart));
}

private static String prefix(ShardedJobId shardedJobId) {
return shardedJobId.asEncodedString().replace("/", "-") + "-task-";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,35 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;
import java.util.Date;

/**
* Information about execution of an {@link IncrementalTask}.
*
* really, shardExecutionState ...
*
* @author [email protected] (Christian Ohler)
*
* @param <T> type of task
*/
@ToString
@Getter
public class IncrementalTaskState<T extends IncrementalTask> {

private final ShardedJobId jobId;

private final String taskId; //q: why is this a string?
private final IncrementalTaskId taskId;

private final Integer shardNumber;

@Setter
private Instant mostRecentUpdateTime;

/**
* incremented each (successful) run of task.
* incremented each (successful) run of task. eg, count of slices.
*/
@Setter
private int sequenceNumber;
Expand All @@ -52,6 +56,7 @@ public class IncrementalTaskState<T extends IncrementalTask> {
@Setter
private Status status;

@ToString.Exclude
private LockInfo lockInfo;

public static class LockInfo {
Expand Down Expand Up @@ -96,20 +101,20 @@ public String toString() {
* Returns a new running IncrementalTaskState.
*/
static <T extends IncrementalTask> IncrementalTaskState<T> create(
String taskId, ShardedJobId jobId, Instant createTime, T initialTask) {
IncrementalTaskId taskId, ShardedJobId jobId, Instant createTime, T initialTask) {
return new IncrementalTaskState<>(taskId, jobId, createTime, new LockInfo(null, null),
checkNotNull(initialTask), new Status(StatusCode.RUNNING));
}

private IncrementalTaskState(@NonNull String taskId,
private IncrementalTaskState(@NonNull IncrementalTaskId taskId,
@NonNull ShardedJobId jobId,
Instant mostRecentUpdateTime,
LockInfo lockInfo,
T task,
Status status) {
this.taskId = taskId;
this.jobId = jobId;
this.shardNumber = IncrementalTaskId.parse(jobId, taskId).getNumber();
this.shardNumber = taskId.getNumber();
this.mostRecentUpdateTime = mostRecentUpdateTime;
this.lockInfo = lockInfo;
this.task = task;
Expand All @@ -124,19 +129,6 @@ void clearRetryCount() {
retryCount = 0;
}

@Override
public String toString() {
return getClass().getSimpleName() + "("
+ taskId + ", "
+ jobId + ", "
+ mostRecentUpdateTime + ", "
+ sequenceNumber + ", "
+ retryCount + ", "
+ task + ", "
+ status + ", "
+ ")";
}

/**
* Utility class to serialize/deserialize IncrementalTaskState.
*/
Expand All @@ -153,8 +145,8 @@ public static class Serializer {
private static final String NEXT_TASK_PROPERTY = "nextTask";
private static final String STATUS_PROPERTY = "status";

public static Key makeKey(Datastore datastore, String taskId) {
return datastore.newKeyFactory().setKind(ENTITY_KIND).newKey(taskId);
public static Key makeKey(Datastore datastore, IncrementalTaskId taskId) {
return datastore.newKeyFactory().setKind(ENTITY_KIND).newKey(taskId.asEncodedString());
}

public static Entity toEntity(Transaction tx, IncrementalTaskState<?> in) {
Expand Down Expand Up @@ -196,8 +188,12 @@ public static <T extends IncrementalTask> IncrementalTaskState<T> fromEntity(
lockInfo = new LockInfo(null, null);
}

IncrementalTaskState<T> state = new IncrementalTaskState<>(in.getKey().getName(),
ShardedJobId.fromEncodedString(in.getString(JOB_ID_PROPERTY)),
ShardedJobId jobId = ShardedJobId.fromEncodedString(in.getString(JOB_ID_PROPERTY));


IncrementalTaskState<T> state = new IncrementalTaskState<>(
IncrementalTaskId.parse(jobId, in.getKey().getName()),
jobId,
in.getTimestamp(MOST_RECENT_UPDATE_TIME_PROPERTY).toDate().toInstant(),
lockInfo,
in.contains(NEXT_TASK_PROPERTY) ? SerializationUtil.deserializeFromDatastoreProperty(tx, in, NEXT_TASK_PROPERTY, lenient) : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,27 @@

import com.google.cloud.datastore.*;
import com.google.common.primitives.Ints;
import lombok.Getter;

/**
* Retry information for a shard.
*
*
* @param <T> type of task
*/
@Getter
public final class ShardRetryState<T extends IncrementalTask> {

private final String taskId;
private final IncrementalTaskId taskId;
private final T initialTask;
private int retryCount;

private ShardRetryState(String taskId, T initialTask, int retryCount) {
private ShardRetryState(IncrementalTaskId taskId, T initialTask, int retryCount) {
this.taskId = checkNotNull(taskId);
this.initialTask = checkNotNull(initialTask);
this.retryCount = retryCount;
}

public String getTaskId() {
return taskId;
}

public T getInitialTask() {
return initialTask;
}

public int getRetryCount() {
return retryCount;
}

public int incrementAndGet() {
return ++retryCount;
}
Expand All @@ -61,8 +51,9 @@ public static class Serializer {
private static final String ENTITY_KIND = "MR-ShardRetryState";
private static final String INITIAL_TASK_PROPERTY = "initialTask";
private static final String RETRY_COUNT_PROPERTY = "retryCount";
private static final String TASK_ID_PROPERTY = "taskId";

public static Key makeKey(Datastore datastore, String taskId) {
public static Key makeKey(Datastore datastore, IncrementalTaskId taskId) {

Key parent = IncrementalTaskState.Serializer.makeKey(datastore, taskId);
return Key.newBuilder(parent, ENTITY_KIND, 1L).build();
Expand All @@ -72,13 +63,15 @@ static Entity toEntity(Transaction tx, ShardRetryState<?> in) {
Entity.Builder shardInfo = Entity.newBuilder(makeKey(tx.getDatastore(), in.getTaskId()));
serializeToDatastoreProperty(tx, shardInfo, INITIAL_TASK_PROPERTY, in.initialTask);
shardInfo.set(RETRY_COUNT_PROPERTY, LongValue.newBuilder(in.retryCount).setExcludeFromIndexes(true).build());
shardInfo.set(TASK_ID_PROPERTY, in.taskId.asEncodedString());
return shardInfo.build();
}

static <T extends IncrementalTask> ShardRetryState<T> fromEntity(Transaction tx, Entity in) {
T initialTask = deserializeFromDatastoreProperty(tx, in, INITIAL_TASK_PROPERTY);
int retryCount = Ints.checkedCast(in.getLong(RETRY_COUNT_PROPERTY));
return new ShardRetryState<>(in.getKey().getParent().getName(), initialTask, retryCount);
IncrementalTaskId taskId = IncrementalTaskId.parse(in.getString(TASK_ID_PROPERTY));
return new ShardRetryState<>(taskId, initialTask, retryCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ public interface ShardedJobHandler {
* Is invoked by the servlet that handles
* {@link ShardedJobSettings#getControllerPath} when a shard has completed.
*/
void completeShard(final ShardedJobId jobId, final String taskId);
void completeShard(final ShardedJobId jobId, final IncrementalTaskId taskId);

/**
* Is invoked by the servlet that handles
* {@link ShardedJobSettings#getWorkerPath} to run a task.
*/
void runTask(final ShardedJobId jobId, final String taskId, final int sequenceNumber);
void runTask(final ShardedJobId jobId, final IncrementalTaskId taskId, final int sequenceNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;

/**
* identifies a job that has been sharded (split into parallel tasks)
Expand Down Expand Up @@ -39,7 +40,7 @@ public class ShardedJobId implements Serializable {


public String asEncodedString() {
return project + "/" + namespace + "/" + jobId;
return project + "/" + Optional.ofNullable(namespace).orElse("") + "/" + jobId;
}

public static ShardedJobId fromEncodedString(@NonNull String encoded) {
Expand Down
Loading

0 comments on commit b9b2ff8

Please sign in to comment.