From 513d182f9df3864da565563270180ccc785f278e Mon Sep 17 00:00:00 2001 From: Erik Schultink Date: Wed, 17 Jul 2024 13:31:06 -0700 Subject: [PATCH] fix places using job entity name, instead of full key --- .../appengine/tools/mapreduce/MapJob.java | 5 +- .../tools/mapreduce/MapReduceJob.java | 114 ++++++++++-------- .../tools/mapreduce/MapSettings.java | 10 +- .../impl/GoogleCloudStorageMapOutput.java | 5 +- .../impl/GoogleCloudStorageSortOutput.java | 4 +- .../mapreduce/impl/WorkerController.java | 1 + .../impl/handlers/StatusHandler.java | 6 +- .../tools/mapreduce/MapSettingsTest.java | 7 +- .../shardedjob/ShardedJobStorageTest.java | 10 +- 9 files changed, 93 insertions(+), 69 deletions(-) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java index 61b77590..326082d8 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java @@ -51,9 +51,6 @@ public MapJob(MapSpecification specification, MapSettings settings) { this.settings = settings; } - - - /** * Starts a {@link MapJob} with the given parameters in a new Pipeline. * Returns the pipeline id. @@ -103,7 +100,7 @@ public Value> run() { mapTasks.add(new MapOnlyShardTask<>(jobId, i, readers.size(), readers.get(i), specification.getMapper(), writers.get(i), settings.getMillisPerSlice())); } - ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(jobId, getPipelineKey()); + ShardedJobSettings shardedJobSettings = settings.toShardedJobSettings(getJobKey(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController> workerController = new WorkerController<>( jobId, new CountersImpl(), output, resultAndStatus.getHandle()); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index ddfd6290..1d8c5885 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -31,11 +31,13 @@ import com.google.appengine.tools.pipeline.*; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Key; import com.google.cloud.storage.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -83,6 +85,22 @@ protected Datastore getDatastore() { return datastore; } + interface MRStage { + + /** + * @return key of MR job, of which this is one stage + */ + Key getMRJobKey(); + + + /** + * @return MRJobKey, as a string-encoded ID + */ + default String getMRJobId() { + return getMRJobKey().toUrlSafe(); + } + } + /** * Starts a {@link MapReduceJob} with the given parameters in a new Pipeline. * Returns the pipeline id. @@ -114,16 +132,17 @@ private static void verifyBucketIsWritable(MapReduceSettings settings) { * The pipeline job to execute the Map stage of the MapReduce. (For all shards) */ @RequiredArgsConstructor - static class MapJob extends Job0> { + static class MapJob extends Job0> implements MRStage { private static final long serialVersionUID = 1L; - @NonNull private final String mrJobId; + @Getter + @NonNull private final Key MRJobKey; @NonNull private final MapReduceSpecification mrSpec; @NonNull private final MapReduceSettings settings; private String getShardedJobId() { - return "map-" + mrJobId; + return "map-" + getMRJobKey().toUrlSafe(); } @Setter(onMethod = @__(@VisibleForTesting)) @@ -139,7 +158,7 @@ protected Datastore getDatastore() { @Override public String toString() { - return getClass().getSimpleName() + "(" + mrJobId + ")"; + return getClass().getSimpleName() + "(" + getShardedJobId() + ")"; } /** @@ -150,7 +169,7 @@ public String toString() { */ @Override public Value> run() { - Context context = new BaseContext(mrJobId); + Context context = new BaseContext(getMRJobId()); Input input = mrSpec.getInput(); input.setContext(context); List> readers; @@ -161,7 +180,7 @@ public Value> run() { } Output, FilesByShard> output = new GoogleCloudStorageMapOutput<>( settings.getBucketName(), - mrJobId, + getMRJobId(), mrSpec.getKeyMarshaller(), mrSpec.getValueMarshaller(), new HashingSharder(getNumOutputFiles(readers.size())), @@ -177,15 +196,15 @@ public Value> run() { ImmutableList.Builder, MapperContext>> mapTasks = ImmutableList.builder(); for (int i = 0; i < readers.size(); i++) { - mapTasks.add(new MapShardTask<>(mrJobId, i, readers.size(), readers.get(i), + mapTasks.add(new MapShardTask<>(getMRJobId(), i, readers.size(), readers.get(i), mrSpec.getMapper(), writers.get(i), settings.getMillisPerSlice())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); + settings.toShardedJobSettings(getMRJobKey(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController, FilesByShard, MapperContext> workerController = - new WorkerController<>(mrJobId, new CountersImpl(), output, resultAndStatus.getHandle()); + new WorkerController<>(getMRJobId(), new CountersImpl(), output, resultAndStatus.getHandle()); DatastoreOptions datastoreOptions = settings.getDatastoreOptions(); ShardedJob shardedJob = @@ -213,11 +232,13 @@ public Value> handleException(CancellationExceptio @RequiredArgsConstructor static class SortJob extends Job1< MapReduceResult, - MapReduceResult> { + MapReduceResult> implements MRStage { private static final long serialVersionUID = 1L; // We don't need the CountersImpl part of the MapResult input here but we // accept it to avoid needing an adapter job to connect this job to MapJob's result. - @NonNull private final String mrJobId; + + @Getter + @NonNull private final Key MRJobKey; @NonNull private final MapReduceSpecification mrSpec; @NonNull private final MapReduceSettings settings; @@ -234,12 +255,12 @@ protected Datastore getDatastore() { } private String getShardedJobId() { - return "sort-" + mrJobId; + return "sort-" + getMRJobId(); } @Override public String toString() { - return getClass().getSimpleName() + "(" + mrJobId + ")"; + return getClass().getSimpleName() + "(" + getMRJobId() + ")"; } /** @@ -250,7 +271,7 @@ public String toString() { @Override public Value> run(MapReduceResult mapResult) { - Context context = new BaseContext(mrJobId); + Context context = new BaseContext(getMRJobId()); int mapShards = findMaxFilesPerShard(mapResult.getOutputResult()); int reduceShards = mrSpec.getNumReducers(); FilesByShard filesByShard = mapResult.getOutputResult(); @@ -265,7 +286,7 @@ public Value> run(MapReduceResult ma ((Input) input).setContext(context); List>> readers = input.createReaders(); Output>, FilesByShard> output = - new GoogleCloudStorageSortOutput(settings.getBucketName(), mrJobId, + new GoogleCloudStorageSortOutput(settings.getBucketName(), getMRJobId(), new HashingSharder(reduceShards), outputOptions); output.setContext(context); @@ -277,7 +298,7 @@ public Value> run(MapReduceResult ma KeyValue>, SortContext>> sortTasks = ImmutableList.builder(); for (int i = 0; i < readers.size(); i++) { - sortTasks.add(new SortShardTask(mrJobId, + sortTasks.add(new SortShardTask(getMRJobId(), i, readers.size(), readers.get(i), @@ -286,11 +307,11 @@ public Value> run(MapReduceResult ma settings.getSortReadTimeMillis())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); + settings.toShardedJobSettings(getMRJobKey(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController, KeyValue>, - FilesByShard, SortContext> workerController = new WorkerController<>(mrJobId, + FilesByShard, SortContext> workerController = new WorkerController<>(getMRJobId(), mapResult.getCounters(), output, resultAndStatus.getHandle()); ShardedJob shardedJob = @@ -314,15 +335,14 @@ public Value handleException(CancellationException ex) { */ @RequiredArgsConstructor static class MergeJob extends - Job1, MapReduceResult> { - + Job1, MapReduceResult> implements MRStage { - - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; // We don't need the CountersImpl part of the MapResult input here but we // accept it to avoid needing an adapter job to connect this job to MapJob's result. - @NonNull private final String mrJobId; + @Getter + @NonNull private final Key MRJobKey; @NonNull private final MapReduceSpecification mrSpec; @NonNull private final MapReduceSettings settings; @NonNull private final Integer tier; @@ -340,12 +360,12 @@ protected Datastore getDatastore() { } private String getShardedJobId() { - return "merge-" + mrJobId + "-" + tier; + return "merge-" + getMRJobId() + "-" + tier; } @Override public String toString() { - return getClass().getSimpleName() + "(" + mrJobId + ")"; + return getClass().getSimpleName() + "(" + getMRJobId() + ")"; } /** @@ -362,7 +382,7 @@ public Value> run(MapReduceResult pr throw new Error("Prior result passed to " + getShardedJobId() + " was null"); } - Context context = new BaseContext(mrJobId); + Context context = new BaseContext(getMRJobId()); FilesByShard sortFiles = priorResult.getOutputResult(); int maxFilesPerShard = findMaxFilesPerShard(sortFiles); if (maxFilesPerShard <= settings.getMergeFanin()) { @@ -384,7 +404,7 @@ public Value> run(MapReduceResult pr input.createReaders(); Output>, FilesByShard> output = - new GoogleCloudStorageMergeOutput(settings.getBucketName(), mrJobId, tier, outputOptions); + new GoogleCloudStorageMergeOutput(settings.getBucketName(), getMRJobId(), tier, outputOptions); output.setContext(context); List>>> writers = @@ -395,7 +415,7 @@ public Value> run(MapReduceResult pr KeyValue>, MergeContext>> mergeTasks = ImmutableList.builder(); for (int i = 0; i < readers.size(); i++) { - mergeTasks.add(new MergeShardTask(mrJobId, + mergeTasks.add(new MergeShardTask(getMRJobId(), i, readers.size(), readers.get(i), @@ -403,13 +423,13 @@ public Value> run(MapReduceResult pr settings.getSortReadTimeMillis())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); + settings.toShardedJobSettings(getJobKey(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController>, KeyValue>, FilesByShard, MergeContext> workerController = - new WorkerController<>(mrJobId, priorResult.getCounters(), output, resultAndStatus.getHandle()); - DatastoreOptions datastoreOptions = settings.getDatastoreOptions(); + new WorkerController<>(getMRJobId(), priorResult.getCounters(), output, resultAndStatus.getHandle()); + ShardedJob shardedJob = new ShardedJob<>(getShardedJobId(), mergeTasks.build(), workerController, shardedJobSettings); FutureValue shardedJobResult = futureCall(shardedJob, settings.toJobSettings()); @@ -419,7 +439,7 @@ public Value> run(MapReduceResult pr resultAndStatus, settings.toJobSettings(waitFor(shardedJobResult), statusConsoleUrl(shardedJobSettings.getMapReduceStatusUrl()))); futureCall(new MapReduceJob.Cleanup(settings), immediate(priorResult), waitFor(finished)); - return futureCall(new MergeJob(mrJobId, mrSpec, settings, tier + 1), finished, + return futureCall(new MergeJob(getMRJobKey(), mrSpec, settings, tier + 1), finished, settings.toJobSettings(maxAttempts(1))); } @@ -443,11 +463,11 @@ private static int findMaxFilesPerShard(FilesByShard byShard) { */ @RequiredArgsConstructor static class ReduceJob extends Job1, - MapReduceResult> { - - private static final long serialVersionUID = 590237832617368335L; + MapReduceResult> implements MRStage { - @NonNull private final String mrJobId; + private static final long serialVersionUID = 1L; + @Getter + @NonNull private final Key MRJobKey; @NonNull private final MapReduceSpecification mrSpec; @NonNull private final MapReduceSettings settings; @@ -464,12 +484,12 @@ protected Datastore getDatastore() { } private String getShardedJobId() { - return "reduce-" + mrJobId; + return "reduce-" + getMRJobId(); } @Override public String toString() { - return getClass().getSimpleName() + "(" + mrJobId + ")"; + return getClass().getSimpleName() + "(" + getMRJobId() + ")"; } /** @@ -479,7 +499,7 @@ public String toString() { */ @Override public Value> run(MapReduceResult mergeResult) { - Context context = new BaseContext(mrJobId); + Context context = new BaseContext(getMRJobId()); Output output = mrSpec.getOutput(); output.setContext(context); GoogleCloudStorageReduceInput input = new GoogleCloudStorageReduceInput<>( @@ -493,14 +513,14 @@ public Value> run(MapReduceResult mergeResult) ImmutableList.Builder>, O, ReducerContext>> reduceTasks = ImmutableList.builder(); for (int i = 0; i < readers.size(); i++) { - reduceTasks.add(new ReduceShardTask<>(mrJobId, i, readers.size(), readers.get(i), + reduceTasks.add(new ReduceShardTask<>(getMRJobId(), i, readers.size(), readers.get(i), mrSpec.getReducer(), writers.get(i), settings.getMillisPerSlice())); } ShardedJobSettings shardedJobSettings = - settings.toShardedJobSettings(getShardedJobId(), getPipelineKey()); + settings.toShardedJobSettings(getJobKey(), getPipelineKey()); PromisedValue> resultAndStatus = newPromise(); WorkerController>, O, R, ReducerContext> workerController = - new WorkerController<>(mrJobId, mergeResult.getCounters(), output, + new WorkerController<>(getMRJobId(), mergeResult.getCounters(), output, resultAndStatus.getHandle()); ShardedJob shardedJob = new ShardedJob<>(getShardedJobId(), reduceTasks.build(), workerController, shardedJobSettings); @@ -556,16 +576,14 @@ public Value> run() { } settings = new MapReduceSettings.Builder(settings).setWorkerQueueName(queue).build(); } - String mrJobId = getJobKey().getName(); FutureValue> mapResult = futureCall( - new MapJob<>(mrJobId, specification, settings), settings.toJobSettings(maxAttempts(1))); - + new MapJob<>(getJobKey(), specification, settings), settings.toJobSettings(maxAttempts(1))); FutureValue> sortResult = futureCall( - new SortJob(mrJobId, specification, settings), mapResult, settings.toJobSettings(maxAttempts(1))); + new SortJob(getJobKey(), specification, settings), mapResult, settings.toJobSettings(maxAttempts(1))); FutureValue> mergeResult = futureCall( - new MergeJob(mrJobId, specification, settings, 1), sortResult, settings.toJobSettings(maxAttempts(1))); + new MergeJob(getJobKey(), specification, settings, 1), sortResult, settings.toJobSettings(maxAttempts(1))); FutureValue> reduceResult = futureCall( - new ReduceJob<>(mrJobId, specification, settings), mergeResult, settings.toJobSettings(maxAttempts(1))); + new ReduceJob<>(getJobKey(), specification, settings), mergeResult, settings.toJobSettings(maxAttempts(1))); futureCall(new Cleanup(settings), mapResult, waitFor(sortResult)); futureCall(new Cleanup(settings), mergeResult, waitFor(reduceResult)); return reduceResult; diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java index 68eb384d..d20ad864 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapSettings.java @@ -239,7 +239,7 @@ public JobSetting[] toJobSettings(JobSetting... extra) { return settings; } - ShardedJobSettings toShardedJobSettings(String shardedJobId, Key pipelineKey) { + ShardedJobSettings toShardedJobSettings(Key shardedJobKey, Key pipelineKey) { String module = getModule(); String version = null; @@ -260,10 +260,10 @@ ShardedJobSettings toShardedJobSettings(String shardedJobId, Key pipelineKey) { } final ShardedJobSettings.Builder builder = new ShardedJobSettings.Builder() - .setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobId) - .setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobId) - .setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobId) - .setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, pipelineKey)) + .setControllerPath(baseUrl + CONTROLLER_PATH + "/" + shardedJobKey.toUrlSafe()) + .setWorkerPath(baseUrl + WORKER_PATH + "/" + shardedJobKey.toUrlSafe()) + .setMapReduceStatusUrl(baseUrl + "detail?mapreduce_id=" + shardedJobKey.toUrlSafe()) + .setPipelineStatusUrl(PipelineServlet.makeViewerUrl(pipelineKey, shardedJobKey)) .setModule(module) .setVersion(version) .setQueueName(workerQueueName) diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageMapOutput.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageMapOutput.java index 62f9f297..f1a86585 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageMapOutput.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageMapOutput.java @@ -12,6 +12,7 @@ import com.google.appengine.tools.mapreduce.Sharder; import com.google.appengine.tools.mapreduce.impl.GoogleCloudStorageMapOutputWriter.MapOutputWriter; import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput; +import org.apache.commons.codec.digest.DigestUtils; import java.io.IOException; import java.util.ArrayList; @@ -52,8 +53,10 @@ public GoogleCloudStorageMapOutput(String bucket, String mrJobId, Marshaller @Override public List>> createWriters(int shards) { List>> result = new ArrayList<>(shards); + + String mrJobIdHash = DigestUtils.sha1Hex(mrJobId); for (int i = 0; i < shards; i++) { - String fileNamePattern = String.format(MAP_OUTPUT_DIR_FORMAT, mrJobId, i); + String fileNamePattern = String.format(MAP_OUTPUT_DIR_FORMAT, mrJobIdHash, i); OutputWriter> writer = new GoogleCloudStorageMapOutputWriter<>( bucket, fileNamePattern, keyMarshaller, valueMarshaller, sharder, this.options); result.add(writer); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageSortOutput.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageSortOutput.java index 7fde3419..025e06c6 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageSortOutput.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/GoogleCloudStorageSortOutput.java @@ -5,6 +5,7 @@ import com.google.appengine.tools.mapreduce.*; import com.google.appengine.tools.mapreduce.outputs.*; +import org.apache.commons.codec.digest.DigestUtils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -50,8 +51,9 @@ private static class ShardingOutputWriterImpl extends @Override public SlicingOutputWriterImpl createWriter(int number) { + String mrJobIdHash = DigestUtils.sha1Hex(mrJobId); String formatStringForShard = - String.format(MapReduceConstants.SORT_OUTPUT_DIR_FORMAT, mrJobId, shard, number); + String.format(MapReduceConstants.SORT_OUTPUT_DIR_FORMAT, mrJobIdHash, shard, number); return new SlicingOutputWriterImpl(bucket, formatStringForShard, options); } diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/WorkerController.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/WorkerController.java index a10ddf67..df1f96c9 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/WorkerController.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/WorkerController.java @@ -11,6 +11,7 @@ import com.google.appengine.tools.pipeline.NoSuchObjectException; import com.google.appengine.tools.pipeline.OrphanedObjectException; import com.google.appengine.tools.pipeline.PipelineService; +import com.google.cloud.datastore.Key; import com.google.common.collect.ImmutableList; import lombok.Getter; import lombok.NonNull; diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/handlers/StatusHandler.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/handlers/StatusHandler.java index d5257693..a52208c0 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/handlers/StatusHandler.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/handlers/StatusHandler.java @@ -181,7 +181,7 @@ JSONObject handleGetJobDetail(PipelineRunner pipelineRunner, String jobId) { jobObject.put("name", jobId); // For display jobObject.put("mapreduce_id", jobId); // This is the sharedJobId but it needs be be called // mapreduce_id for python compatibility. - jobObject.put("start_timestamp_ms", state.getStartTime()); + jobObject.put("start_timestamp_ms", state.getStartTime().toEpochMilli()); if (state.getStatus().isActive()) { jobObject.put("active", true); @@ -189,7 +189,7 @@ JSONObject handleGetJobDetail(PipelineRunner pipelineRunner, String jobId) { } else { jobObject.put("active", false); jobObject.put("result_status", String.valueOf(state.getStatus().getStatusCode())); - jobObject.put("updated_timestamp_ms", state.getMostRecentUpdateTime()); + jobObject.put("updated_timestamp_ms", state.getMostRecentUpdateTime().toEpochMilli()); } jobObject.put("shards", state.getTotalTaskCount()); jobObject.put("active_shards", state.getActiveTaskCount()); @@ -213,7 +213,7 @@ JSONObject handleGetJobDetail(PipelineRunner pipelineRunner, String jobId) { JSONObject shardObject = new JSONObject(); shardObject.put("shard_number", i); shardObject.put("shard_description", taskState.getTaskId()); - shardObject.put("updated_timestamp_ms", taskState.getMostRecentUpdateTime()); + shardObject.put("updated_timestamp_ms", taskState.getMostRecentUpdateTime().toEpochMilli()); if (taskState.getStatus().isActive()) { shardObject.put("active", true); } else { diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java index a3e38fdb..f91cf39e 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/MapSettingsTest.java @@ -163,7 +163,8 @@ public void testBuilderWithSettings() { public void testMakeShardedJobSettings() { Key key = datastore.newKeyFactory().setKind("Kind1").newKey("value1"); MapSettings settings = new MapSettings.Builder().setWorkerQueueName("good-queue").build(); - ShardedJobSettings sjSettings = settings.toShardedJobSettings("job1", key); + Key shardedJobKey = datastore.newKeyFactory().setKind("Kind2").newKey("job1"); + ShardedJobSettings sjSettings = settings.toShardedJobSettings(shardedJobKey, key); assertEquals("default", sjSettings.getModule()); assertEquals("1", sjSettings.getVersion()); assertEquals("1.default.test.localhost", sjSettings.getTaskQueueTarget()); @@ -176,7 +177,7 @@ public void testMakeShardedJobSettings() { settings = new MapSettings.Builder(settings).setModule("module1").build(); - sjSettings = settings.toShardedJobSettings("job1", key); + sjSettings = settings.toShardedJobSettings(shardedJobKey, key); assertEquals("v1.module1.test.localhost", sjSettings.getTaskQueueTarget()); assertEquals("module1", sjSettings.getModule()); assertEquals("v1", sjSettings.getVersion()); @@ -191,7 +192,7 @@ public void testMakeShardedJobSettings() { ApiProxy.setEnvironmentForCurrentThread(mockEnv); // Test when current module is the same as requested module try { - sjSettings = settings.toShardedJobSettings("job1", key); + sjSettings = settings.toShardedJobSettings(shardedJobKey, key); assertEquals("default", sjSettings.getModule()); assertEquals("2", sjSettings.getVersion()); } finally { diff --git a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobStorageTest.java b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobStorageTest.java index 9e068d84..a63e042c 100644 --- a/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobStorageTest.java +++ b/java/src/test/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobStorageTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test; import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.Iterator; /** @@ -34,8 +36,8 @@ public void testRoundTripJob() { ShardedJobStateImpl.ShardedJobSerializer.fromEntity(readTx, readEntity); assertEquals(job.getJobId(), fromEntity.getJobId()); assertEquals(job.getActiveTaskCount(), fromEntity.getActiveTaskCount()); - assertEquals(job.getMostRecentUpdateTime(), fromEntity.getMostRecentUpdateTime()); - assertEquals(job.getStartTime(), fromEntity.getStartTime()); + assertEquals(job.getMostRecentUpdateTime().truncatedTo(ChronoUnit.MILLIS), fromEntity.getMostRecentUpdateTime().truncatedTo(ChronoUnit.MILLIS)); + assertEquals(job.getStartTime().truncatedTo(ChronoUnit.MILLIS), fromEntity.getStartTime().truncatedTo(ChronoUnit.MILLIS)); assertEquals(job.getTotalTaskCount(), fromEntity.getTotalTaskCount()); assertEquals(job.getSettings().toString(), fromEntity.getSettings().toString()); assertEquals(job.getStatus(), fromEntity.getStatus()); @@ -50,9 +52,9 @@ public void testExpectedFields() { assertEquals(10, entity.getLong("taskCount")); assertTrue(entity.contains("activeShards")); assertTrue(entity.contains("status")); - assertTrue(entity.contains("startTimeMillis")); + assertTrue(entity.contains("startTime")); assertTrue(entity.contains("settings")); - assertTrue(entity.contains("mostRecentUpdateTimeMillis")); + assertTrue(entity.contains("mostRecentUpdateTime")); } @Test