Skip to content

Commit

Permalink
fix MapSettings tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eschultink committed Sep 12, 2024
1 parent 268dabd commit 91fb3f4
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Value<MapReduceResult<R>> run() {
mapTasks.add(new MapOnlyShardTask<>(jobId, i, readers.size(), readers.get(i),
specification.getMapper(), writers.get(i), settings.getMillisPerSlice()));
}
ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getJobKey(), getPipelineKey());
ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getShardedJobId(), getPipelineKey());
PromisedValue<ResultAndStatus<R>> resultAndStatus = newPromise();
WorkerController<I, O, R, MapOnlyMapperContext<O>> workerController = new WorkerController<>(
jobId, new CountersImpl(), output, resultAndStatus.getHandle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public Value<MapReduceResult<FilesByShard>> run() {
mrSpec.getMapper(), writers.get(i), settings.getMillisPerSlice()));
}
ShardedJobSettings shardedJobSettings =
settings.toShardedJobSettings(getMRJobKey(), getPipelineKey());
settings.toShardedJobSettings(getShardedJobId(), getPipelineKey());

PromisedValue<ResultAndStatus<FilesByShard>> resultAndStatus = newPromise();
WorkerController<I, KeyValue<K, V>, FilesByShard, MapperContext<K, V>> workerController =
Expand Down Expand Up @@ -330,7 +330,7 @@ public Value<MapReduceResult<FilesByShard>> run(MapReduceResult<FilesByShard> ma
settings.getSortReadTimeMillis()));
}
ShardedJobSettings shardedJobSettings =
settings.toShardedJobSettings(getMRJobKey(), getPipelineKey());
settings.toShardedJobSettings(getShardedJobId(), getPipelineKey());

PromisedValue<ResultAndStatus<FilesByShard>> resultAndStatus = newPromise();
WorkerController<KeyValue<ByteBuffer, ByteBuffer>, KeyValue<ByteBuffer, List<ByteBuffer>>,
Expand Down Expand Up @@ -444,7 +444,7 @@ public Value<MapReduceResult<FilesByShard>> run(MapReduceResult<FilesByShard> pr
settings.getSortReadTimeMillis()));
}
ShardedJobSettings shardedJobSettings =
settings.toShardedJobSettings(getJobKey(), getPipelineKey());
settings.toShardedJobSettings(getShardedJobId(), getPipelineKey());

PromisedValue<ResultAndStatus<FilesByShard>> resultAndStatus = newPromise();
WorkerController<KeyValue<ByteBuffer, Iterator<ByteBuffer>>,
Expand Down Expand Up @@ -537,7 +537,7 @@ public Value<MapReduceResult<R>> run(MapReduceResult<FilesByShard> mergeResult)
mrSpec.getReducer(), writers.get(i), settings.getMillisPerSlice()));
}
ShardedJobSettings shardedJobSettings =
settings.toShardedJobSettings(getJobKey(), getPipelineKey());
settings.toShardedJobSettings(getShardedJobId(), getPipelineKey());
PromisedValue<ResultAndStatus<R>> resultAndStatus = newPromise();
WorkerController<KeyValue<K, Iterator<V>>, O, R, ReducerContext<O>> workerController =
new WorkerController<>(getShardedJobId(), mergeResult.getCounters(), output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TransientFailureException;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet;
Expand Down Expand Up @@ -239,7 +240,7 @@ public JobSetting[] toJobSettings(JobSetting... extra) {
return settings;
}

ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) {
ShardedJobSettings toShardedJobSettings(ShardedJobId shardedJobId, Key pipelineKey) {

String module = getModule();
String version = null;
Expand All @@ -260,10 +261,10 @@ ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) {
}

final ShardedJobSettings.Builder builder = new ShardedJobSettings.Builder()
.setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobKey.toUrlSafe())
.setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobKey.toUrlSafe())
.setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobKey.toUrlSafe())
.setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobKey))
.setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobId.asEncodedString())
.setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobId.asEncodedString())
.setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobId.asEncodedString())
.setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobId))
.setModule(module)
.setVersion(version)
.setQueueName(workerQueueName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.appengine.tools.pipeline.impl.servlets;

import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId;
import com.google.appengine.tools.pipeline.di.DaggerJobRunServiceComponent;
import com.google.appengine.tools.pipeline.di.JobRunServiceComponent;
import com.google.cloud.datastore.Key;
Expand Down Expand Up @@ -107,6 +108,11 @@ public static String makeViewerUrl(Key rootJobKey, Key jobKey) {
return baseUrl() + "status.html?root=" + rootJobKey.toUrlSafe() + "#pipeline-" + jobKey.toUrlSafe();
}

public static String makeViewerUrl(Key rootJobKey, ShardedJobId shardedJobId) {
//TODO: revisit this;
return baseUrl() + "status.html?root=" + rootJobKey.toUrlSafe() + "#pipeline-" + shardedJobId.asEncodedString();
}

private enum RequestType {

HANDLE_TASK(TaskHandler.PATH_COMPONENT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.appengine.tools.development.testing.LocalModulesServiceTestConfig;
import com.google.appengine.tools.development.testing.LocalServiceTestHelper;
import com.google.appengine.tools.development.testing.LocalTaskQueueTestConfig;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.JobSetting.OnBackend;
Expand Down Expand Up @@ -163,21 +164,21 @@ public void testBuilderWithSettings() {
public void testMakeShardedJobSettings() {
Key key = datastore.newKeyFactory().setKind("Kind1").newKey("value1");
MapSettings settings = new MapSettings.Builder().setWorkerQueueName("good-queue").build();
Key shardedJobKey = datastore.newKeyFactory().setKind("Kind2").newKey("job1");
ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
ShardedJobId shardedJobId = ShardedJobId.of(datastore.getOptions().getProjectId(), datastore.getOptions().getNamespace(), "job1");
ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobId, key);
assertEquals("default", sjSettings.getModule());
assertEquals("1", sjSettings.getVersion());
assertEquals("1.default.test.localhost", sjSettings.getTaskQueueTarget());
assertEquals(settings.getWorkerQueueName(), sjSettings.getQueueName());
assertEquals(getPath(settings, "job1", CONTROLLER_PATH), sjSettings.getControllerPath());
assertEquals(getPath(settings, "job1", WORKER_PATH), sjSettings.getWorkerPath());
assertEquals(makeViewerUrl(key, key), sjSettings.getPipelineStatusUrl());
assertEquals(getPath(settings, shardedJobId.asEncodedString(), CONTROLLER_PATH), sjSettings.getControllerPath());
assertEquals(getPath(settings, shardedJobId.asEncodedString(), WORKER_PATH), sjSettings.getWorkerPath());
assertEquals(makeViewerUrl(key, shardedJobId), sjSettings.getPipelineStatusUrl());
assertEquals(settings.getMaxShardRetries(), sjSettings.getMaxShardRetries());
assertEquals(settings.getMaxSliceRetries(), sjSettings.getMaxSliceRetries());


settings = new MapSettings.Builder(settings).setModule("module1").build();
sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
sjSettings = settings.toShardedJobSettings(shardedJobId, key);
assertEquals("v1.module1.test.localhost", sjSettings.getTaskQueueTarget());
assertEquals("module1", sjSettings.getModule());
assertEquals("v1", sjSettings.getVersion());
Expand All @@ -192,7 +193,7 @@ public void testMakeShardedJobSettings() {
ApiProxy.setEnvironmentForCurrentThread(mockEnv);
// Test when current module is the same as requested module
try {
sjSettings = settings.toShardedJobSettings(shardedJobKey, key);
sjSettings = settings.toShardedJobSettings(shardedJobId, key);
assertEquals("default", sjSettings.getModule());
assertEquals("2", sjSettings.getVersion());
} finally {
Expand Down

0 comments on commit 91fb3f4

Please sign in to comment.