Skip to content

Commit

Permalink
fix places using job entity name, instead of full key
Browse files Browse the repository at this point in the history
  • Loading branch information
eschultink committed Jul 17, 2024
1 parent 6d6346f commit 513d182
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public MapJob(MapSpecification<I, O, R> specification, MapSettings settings) {
this.settings = settings;
}




/**
* Starts a {@link MapJob} with the given parameters in a new Pipeline.
* Returns the pipeline id.
Expand Down Expand Up @@ -103,7 +100,7 @@ public Value<MapReduceResult<R>> run() {
mapTasks.add(new MapOnlyShardTask<>(jobId, i, readers.size(), readers.get(i),
specification.getMapper(), writers.get(i), settings.getMillisPerSlice()));
}
ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(jobId, getPipelineKey());
ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getJobKey(), getPipelineKey());
PromisedValue<ResultAndStatus<R>> resultAndStatus = newPromise();
WorkerController<I, O, R, MapOnlyMapperContext<O>> workerController = new WorkerController<>(
jobId, new CountersImpl(), output, resultAndStatus.getHandle());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public JobSetting[] toJobSettings(JobSetting... extra) {
return settings;
}

ShardedJobSettings toShardedJobSettings(String shardedJobId, Key pipelineKey) {
ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) {

String module = getModule();
String version = null;
Expand All @@ -260,10 +260,10 @@ ShardedJobSettings toShardedJobSettings(String shardedJobId, Key pipelineKey) {
}

final ShardedJobSettings.Builder builder = new ShardedJobSettings.Builder()
.setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobId)
.setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobId)
.setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobId)
.setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, pipelineKey))
.setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobKey.toUrlSafe())
.setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobKey.toUrlSafe())
.setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobKey.toUrlSafe())
.setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobKey))
.setModule(module)
.setVersion(version)
.setQueueName(workerQueueName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.appengine.tools.mapreduce.Sharder;
import com.google.appengine.tools.mapreduce.impl.GoogleCloudStorageMapOutputWriter.MapOutputWriter;
import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput;
import org.apache.commons.codec.digest.DigestUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,8 +53,10 @@ public GoogleCloudStorageMapOutput(String bucket, String mrJobId, Marshaller<K>
@Override
public List<? extends OutputWriter<KeyValue<K, V>>> createWriters(int shards) {
List<OutputWriter<KeyValue<K, V>>> result = new ArrayList<>(shards);

String mrJobIdHash = DigestUtils.sha1Hex(mrJobId);
for (int i = 0; i < shards; i++) {
String fileNamePattern = String.format(MAP_OUTPUT_DIR_FORMAT, mrJobId, i);
String fileNamePattern = String.format(MAP_OUTPUT_DIR_FORMAT, mrJobIdHash, i);
OutputWriter<KeyValue<K, V>> writer = new GoogleCloudStorageMapOutputWriter<>(
bucket, fileNamePattern, keyMarshaller, valueMarshaller, sharder, this.options);
result.add(writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.appengine.tools.mapreduce.*;
import com.google.appengine.tools.mapreduce.outputs.*;
import org.apache.commons.codec.digest.DigestUtils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -50,8 +51,9 @@ private static class ShardingOutputWriterImpl extends

@Override
public SlicingOutputWriterImpl createWriter(int number) {
String mrJobIdHash = DigestUtils.sha1Hex(mrJobId);
String formatStringForShard =
String.format(MapReduceConstants.SORT_OUTPUT_DIR_FORMAT, mrJobId, shard, number);
String.format(MapReduceConstants.SORT_OUTPUT_DIR_FORMAT, mrJobIdHash, shard, number);
return new SlicingOutputWriterImpl(bucket, formatStringForShard, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
import com.google.appengine.tools.pipeline.PipelineService;
import com.google.cloud.datastore.Key;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import lombok.NonNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ JSONObject handleGetJobDetail(PipelineRunner pipelineRunner, String jobId) {
jobObject.put("name", jobId); // For display
jobObject.put("mapreduce_id", jobId); // This is the sharedJobId but it needs be be called
// mapreduce_id for python compatibility.
jobObject.put("start_timestamp_ms", state.getStartTime());
jobObject.put("start_timestamp_ms", state.getStartTime().toEpochMilli());

if (state.getStatus().isActive()) {
jobObject.put("active", true);
jobObject.put("updated_timestamp_ms", System.currentTimeMillis());
} else {
jobObject.put("active", false);
jobObject.put("result_status", String.valueOf(state.getStatus().getStatusCode()));
jobObject.put("updated_timestamp_ms", state.getMostRecentUpdateTime());
jobObject.put("updated_timestamp_ms", state.getMostRecentUpdateTime().toEpochMilli());
}
jobObject.put("shards", state.getTotalTaskCount());
jobObject.put("active_shards", state.getActiveTaskCount());
Expand All @@ -213,7 +213,7 @@ JSONObject handleGetJobDetail(PipelineRunner pipelineRunner, String jobId) {
JSONObject shardObject = new JSONObject();
shardObject.put("shard_number", i);
shardObject.put("shard_description", taskState.getTaskId());
shardObject.put("updated_timestamp_ms", taskState.getMostRecentUpdateTime());
shardObject.put("updated_timestamp_ms", taskState.getMostRecentUpdateTime().toEpochMilli());
if (taskState.getStatus().isActive()) {
shardObject.put("active", true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public void testBuilderWithSettings() {
public void testMakeShardedJobSettings() {
Key key = datastore.newKeyFactory().setKind("Kind1").newKey("value1");
MapSettings settings = new MapSettings.Builder().setWorkerQueueName("good-queue").build();
ShardedJobSettings sjSettings = settings.toShardedJobSettings("job1", key);
Key shardedJobKey = datastore.newKeyFactory().setKind("Kind2").newKey("job1");
ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
assertEquals("default", sjSettings.getModule());
assertEquals("1", sjSettings.getVersion());
assertEquals("1.default.test.localhost", sjSettings.getTaskQueueTarget());
Expand All @@ -176,7 +177,7 @@ public void testMakeShardedJobSettings() {


settings = new MapSettings.Builder(settings).setModule("module1").build();
sjSettings = settings.toShardedJobSettings("job1", key);
sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
assertEquals("v1.module1.test.localhost", sjSettings.getTaskQueueTarget());
assertEquals("module1", sjSettings.getModule());
assertEquals("v1", sjSettings.getVersion());
Expand All @@ -191,7 +192,7 @@ public void testMakeShardedJobSettings() {
ApiProxy.setEnvironmentForCurrentThread(mockEnv);
// Test when current module is the same as requested module
try {
sjSettings = settings.toShardedJobSettings("job1", key);
sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
assertEquals("default", sjSettings.getModule());
assertEquals("2", sjSettings.getVersion());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Iterator;

/**
Expand All @@ -34,8 +36,8 @@ public void testRoundTripJob() {
ShardedJobStateImpl.ShardedJobSerializer.fromEntity(readTx, readEntity);
assertEquals(job.getJobId(), fromEntity.getJobId());
assertEquals(job.getActiveTaskCount(), fromEntity.getActiveTaskCount());
assertEquals(job.getMostRecentUpdateTime(), fromEntity.getMostRecentUpdateTime());
assertEquals(job.getStartTime(), fromEntity.getStartTime());
assertEquals(job.getMostRecentUpdateTime().truncatedTo(ChronoUnit.MILLIS), fromEntity.getMostRecentUpdateTime().truncatedTo(ChronoUnit.MILLIS));
assertEquals(job.getStartTime().truncatedTo(ChronoUnit.MILLIS), fromEntity.getStartTime().truncatedTo(ChronoUnit.MILLIS));
assertEquals(job.getTotalTaskCount(), fromEntity.getTotalTaskCount());
assertEquals(job.getSettings().toString(), fromEntity.getSettings().toString());
assertEquals(job.getStatus(), fromEntity.getStatus());
Expand All @@ -50,9 +52,9 @@ public void testExpectedFields() {
assertEquals(10, entity.getLong("taskCount"));
assertTrue(entity.contains("activeShards"));
assertTrue(entity.contains("status"));
assertTrue(entity.contains("startTimeMillis"));
assertTrue(entity.contains("startTime"));
assertTrue(entity.contains("settings"));
assertTrue(entity.contains("mostRecentUpdateTimeMillis"));
assertTrue(entity.contains("mostRecentUpdateTime"));
}

@Test
Expand Down

0 comments on commit 513d182

Please sign in to comment.