From 91fb3f448e24ad87d2297fad5f75d5f2cac92c82 Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Thu, 12 Sep 2024 15:36:23 -0700 Subject: [PATCH] fix MapSettings tests --- .../google/appengine/tools/mapreduce/MapJob.java | 2 +- .../appengine/tools/mapreduce/MapReduceJob.java | 8 ++++---- .../appengine/tools/mapreduce/MapSettings.java | 11 ++++++----- .../pipeline/impl/servlets/PipelineServlet.java | 6 ++++++ .../tools/mapreduce/MapSettingsTest.java | 15 ++++++++------- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java index 877af04d..398ccfef 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java @@ -101,7 +101,7 @@ public Value> run() { mapTasks.add(new MapOnlyShardTask<>(jobId, i, readers.size(), readers.get(i), specification.getMapper(), writers.get(i), settings.getMillisPerSlice())); } - ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getJobKey(), getPipelineKey()); + ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController> workerController = new WorkerController<>( jobId, new CountersImpl(), output, resultAndStatus.getHandle()); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index b976f936..332fae64 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -225,7 +225,7 @@ public Value> run() { mrSpec.getMapper(), writers.get(i), settings.getMillisPerSlice())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getMRJobKey(), getPipelineKey()); + settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController, FilesByShard, MapperContext> workerController = @@ -330,7 +330,7 @@ public Value> run(MapReduceResult ma settings.getSortReadTimeMillis())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getMRJobKey(), getPipelineKey()); + settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController, KeyValue>, @@ -444,7 +444,7 @@ public Value> run(MapReduceResult pr settings.getSortReadTimeMillis())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getJobKey(), getPipelineKey()); + settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController>, @@ -537,7 +537,7 @@ public Value> run(MapReduceResult mergeResult) mrSpec.getReducer(), writers.get(i), settings.getMillisPerSlice())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getJobKey(), getPipelineKey()); + settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController>, O, R, ReducerContext> workerController = new WorkerController<>(getShardedJobId(), mergeResult.getCounters(), output, diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java index d20ad864..755fabfc 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java @@ -12,6 +12,7 @@ import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.TransientFailureException; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet; @@ -239,7 +240,7 @@ public JobSetting[] toJobSettings(JobSetting... extra) { return settings; } - ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) { + ShardedJobSettings toShardedJobSettings(ShardedJobId shardedJobId, Key pipelineKey) { String module = getModule(); String version = null; @@ -260,10 +261,10 @@ ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) { } final ShardedJobSettings.Builder builder = new ShardedJobSettings.Builder() - .setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobKey.toUrlSafe()) - .setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobKey.toUrlSafe()) - .setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobKey.toUrlSafe()) - .setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobKey)) + .setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobId.asEncodedString()) + .setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobId.asEncodedString()) + .setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobId.asEncodedString()) + .setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobId)) .setModule(module) .setVersion(version) .setQueueName(workerQueueName) diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java index a7bfa869..fec69824 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java @@ -14,6 +14,7 @@ package com.google.appengine.tools.pipeline.impl.servlets; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId; import com.google.appengine.tools.pipeline.di.DaggerJobRunServiceComponent; import com.google.appengine.tools.pipeline.di.JobRunServiceComponent; import com.google.cloud.datastore.Key; @@ -107,6 +108,11 @@ public static String makeViewerUrl(Key rootJobKey, Key jobKey) { return baseUrl() + "status.html?root=" + rootJobKey.toUrlSafe() + "#pipeline-" + jobKey.toUrlSafe(); } + public static String makeViewerUrl(Key rootJobKey, ShardedJobId shardedJobId) { + //TODO: revisit this; + return baseUrl() + "status.html?root=" + rootJobKey.toUrlSafe() + "#pipeline-" + shardedJobId.asEncodedString(); + } + private enum RequestType { HANDLE_TASK(TaskHandler.PATH_COMPONENT), diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java index f91cf39e..e552a5a7 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java @@ -15,6 +15,7 @@ import com.google.appengine.tools.development.testing.LocalModulesServiceTestConfig; import com.google.appengine.tools.development.testing.LocalServiceTestHelper; import com.google.appengine.tools.development.testing.LocalTaskQueueTestConfig; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobId; import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobSettings; import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.JobSetting.OnBackend; @@ -163,21 +164,21 @@ public void testBuilderWithSettings() { public void testMakeShardedJobSettings() { Key key = datastore.newKeyFactory().setKind("Kind1").newKey("value1"); MapSettings settings = new MapSettings.Builder().setWorkerQueueName("good-queue").build(); - Key shardedJobKey = datastore.newKeyFactory().setKind("Kind2").newKey("job1"); - ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobKey, key); + ShardedJobId shardedJobId = ShardedJobId.of(datastore.getOptions().getProjectId(), datastore.getOptions().getNamespace(), "job1"); + ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobId, key); assertEquals("default", sjSettings.getModule()); assertEquals("1", sjSettings.getVersion()); assertEquals("1.default.test.localhost", sjSettings.getTaskQueueTarget()); assertEquals(settings.getWorkerQueueName(), sjSettings.getQueueName()); - assertEquals(getPath(settings, "job1", CONTROLLER_PATH), sjSettings.getControllerPath()); - assertEquals(getPath(settings, "job1", WORKER_PATH), sjSettings.getWorkerPath()); - assertEquals(makeViewerUrl(key, key), sjSettings.getPipelineStatusUrl()); + assertEquals(getPath(settings, shardedJobId.asEncodedString(), CONTROLLER_PATH), sjSettings.getControllerPath()); + assertEquals(getPath(settings, shardedJobId.asEncodedString(), WORKER_PATH), sjSettings.getWorkerPath()); + assertEquals(makeViewerUrl(key, shardedJobId), sjSettings.getPipelineStatusUrl()); assertEquals(settings.getMaxShardRetries(), sjSettings.getMaxShardRetries()); assertEquals(settings.getMaxSliceRetries(), sjSettings.getMaxSliceRetries()); settings = new MapSettings.Builder(settings).setModule("module1").build(); - sjSettings = settings.toShardedJobSettings(shardedJobKey, key); + sjSettings = settings.toShardedJobSettings(shardedJobId, key); assertEquals("v1.module1.test.localhost", sjSettings.getTaskQueueTarget()); assertEquals("module1", sjSettings.getModule()); assertEquals("v1", sjSettings.getVersion()); @@ -192,7 +193,7 @@ public void testMakeShardedJobSettings() { ApiProxy.setEnvironmentForCurrentThread(mockEnv); // Test when current module is the same as requested module try { - sjSettings = settings.toShardedJobSettings(shardedJobKey, key); + sjSettings = settings.toShardedJobSettings(shardedJobId, key); assertEquals("default", sjSettings.getModule()); assertEquals("2", sjSettings.getVersion()); } finally {