diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java index a460bb42026..19a65078909 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java @@ -224,7 +224,7 @@ public synchronized void stop() { this.stopStatus.setStopInprogress(true); - log.info("Stopping the Gobblin Cluster Manager"); + log.info("Stopping the Gobblin Temporal Cluster Manager"); stopAppLauncherAndServices(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java index f6b6e05f104..45e2ecb9b16 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java @@ -26,10 +26,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -54,7 +52,7 @@ */ @lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization @lombok.RequiredArgsConstructor -@lombok.ToString(exclude = { "stateConfig", "cachedWorkItems" }) +@lombok.ToString(exclude = { "cachedWorkItems" }) @Slf4j public abstract class AbstractEagerFsDirBackedWorkload implements Workload, FileSystemApt { @@ -64,7 +62,6 @@ public abstract class AbstractEagerFsDirBackedWorkload implements Wor // Cannot construct instance of `org.apache.hadoop.fs.Path` (although at least one Creator exists): // cannot deserialize from Object value (no delegate- or property-based Creator) @NonNull private String fsDir; - @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED) private transient volatile WORK_ITEM[] cachedWorkItems = null; @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 8eab3ef0bd7..2aa2a7e6495 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -122,8 +122,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event throw ApplicationFailure.newNonRetryableFailureWithCause( String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)), e.getClass().getName(), - e, - null + e ); } finally { // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid flight @@ -140,8 +139,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event throw ApplicationFailure.newNonRetryableFailureWithCause( String.format("Failed cleaning Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)), e.getClass().getName(), - e, - null + e ); } log.error("Failed to cleanup work dirs", e); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java new file mode 100644 index 00000000000..6725c58b6e3 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.google.common.base.Charsets; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +/** + * A {@link ScalingDirectiveSource} that reads {@link ScalingDirective}s from a {@link FileSystem} directory, where each directive is the name + * of a single file inside the directory. Directives too long for one filename path component MUST use the + * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content. + * Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive filenames is a + * NameNode-only operation, with their metadata-only nature conserving NN object count/quota. + */ +@Slf4j +public class FsScalingDirectiveSource implements ScalingDirectiveSource { + private final FileSystem fileSystem; + private final Path dirPath; + private final Optional optErrorsPath; + private final ScalingDirectiveParser parser = new ScalingDirectiveParser(); + + /** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */ + public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional optErrorsDirPath) { + this.fileSystem = fileSystem; + this.dirPath = new Path(directivesDirPath); + this.optErrorsPath = optErrorsDirPath.map(Path::new); + } + + /** + * @return all valid (parseable, in-order) scaling directives currently in the directory, ordered by ascending modtime + * + * Ignore invalid directives, and, when `optErrorsDirPath` was provided to the ctor, acknowledge each by moving it to a separate "errors" directory. + * Regardless, always swallow {@link ScalingDirectiveParser.InvalidSyntaxException}. + * + * Like un-parseable directives, also invalid are out-of-order directives. This blocks late/out-of-order insertion and/or edits to the directives + * stream. Each directive contains its own {@link ScalingDirective#getTimestampEpochMillis()} stated in its filename. Later-modtime directives are + * rejected when directive-timestamp-order does not match {@link FileStatus} modtime order. In the case of a modtime tie, the directive with the + * alphabetically-later filename is rejected. + * + * ATTENTION: This returns ALL known directives, even those already returned by a prior invocation. When the underlying directory is unchanged + * before the next invocation, the result will be equal elements in the same order. + * + * @throws IOException when unable to read the directory (or file data, in the case of an overlay definition placeholder) + */ + @Override + public List getScalingDirectives() throws IOException { + List> directiveWithFileStatus = new ArrayList<>(); + // TODO: add caching by dir modtime to avoid re-listing the same, unchanged contents, while also avoiding repetitive parsing + // to begin, just parse w/o worrying about ordering... that comes next + for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) { + if (!fileStatus.isFile()) { + log.warn("Ignoring non-file object: " + fileStatus); + optAcknowledgeError(fileStatus, "non-file (not an actual)"); + } else { + String fileName = fileStatus.getPath().getName(); + try { + try { + directiveWithFileStatus.add(new ImmutablePair<>(parser.parse(fileName), fileStatus)); + } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) { + // directive used placeholder syntax to indicate the overlay definition resides inside its file... so open the file to load that def + log.info("Loading overlay definition for directive {{" + fileName + "}} from: " + fileStatus); + String overlayDef = slurpFileAsString(fileStatus.getPath()); + directiveWithFileStatus.add(new ImmutablePair<>(needsDefinition.retryParsingWithDefinition(overlayDef), fileStatus)); + } + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + log.warn("Ignoring unparseable scaling directive {{" + fileName + "}}: " + fileStatus + " - " + e.getClass().getName() + ": " + e.getMessage()); + optAcknowledgeError(fileStatus, "unparseable"); + } + } + } + + // verify ordering: only return directives whose stated timestamp ordering (of filename prefix) matches `FileStatus` modtime order + List directives = new ArrayList<>(); + // NOTE: for deterministic total-ordering, sort by path, rather than by timestamp, in case of modtime tie (given only millisecs granularity) + directiveWithFileStatus.sort(Comparator.comparing(p -> p.getValue().getPath())); + long latestValidModTime = -1; + for (Map.Entry entry : directiveWithFileStatus) { + long thisModTime = entry.getValue().getModificationTime(); + if (thisModTime <= latestValidModTime) { // when equal (non-increasing) modtime: reject alphabetically-later filename (path) + log.warn("Ignoring out-of-order scaling directive " + entry.getKey() + " since FS modTime " + thisModTime + " NOT later than last observed " + + latestValidModTime + ": " + entry.getValue()); + optAcknowledgeError(entry.getValue(), "out-of-order"); + } else { + directives.add(entry.getKey()); + latestValidModTime = thisModTime; + } + } + return directives; + } + + /** "acknowledge" the rejection of an invalid directive by moving it to a separate "errors" dir (when `optErrorsDirPath` was given to the ctor) */ + protected void optAcknowledgeError(FileStatus invalidDirectiveFileStatus, String desc) { + this.optErrorsPath.ifPresent(errorsPath -> + moveDirectiveToDir(invalidDirectiveFileStatus, errorsPath, desc) + ); + } + + /** + * move `invalidDirectiveFileStatus` to a designated `destDirPath`, with the reason for moving (e.g. the error) described in `desc`. + * This is used to promote observability by acknowledging invalid, rejected directives + */ + protected void moveDirectiveToDir(FileStatus invalidDirectiveFileStatus, Path destDirPath, String desc) { + Path invalidDirectivePath = invalidDirectiveFileStatus.getPath(); + try { + if (!this.fileSystem.rename(invalidDirectivePath, new Path(destDirPath, invalidDirectivePath.getName()))) { + throw new RuntimeException(); // unclear how to obtain more info about such a failure + } + } catch (IOException e) { + log.warn("Failed to move " + desc + " directive {{" + invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath + "'... leaving in place", e); + } catch (RuntimeException e) { + log.warn("Failed to move " + desc + " directive {{" + invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath + + "' [unknown reason]... leaving in place", e); + } + } + + /** @return all contents of `path` as a single (UTF-8) `String` */ + protected String slurpFileAsString(Path path) throws IOException { + try (InputStream is = this.fileSystem.open(path)) { + return IOUtils.toString(is, Charsets.UTF_8); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java new file mode 100644 index 00000000000..1001150df3e --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Optional; +import java.util.function.Function; + +import com.typesafe.config.Config; +import lombok.Data; +import lombok.Getter; + + +/** + * Defines a new {@link WorkerProfile} by evolving from another profile, the basis. Such evolution creates a new immutable profile through + * {@link ProfileOverlay}, which either adds or removes properties from the basis profile's definition. That basis profile must already exist. + */ +@Data +public class ProfileDerivation { + + /** Flags when the basis profile was NOT found */ + public static class UnknownBasisException extends Exception { + @Getter private final String name; + public UnknownBasisException(String basisName) { + super("named '" + WorkforceProfiles.renderName(basisName) + "'"); + this.name = basisName; + } + } + + private final String basisProfileName; + private final ProfileOverlay overlay; + + /** @return a new profile definition through evolution from the basis profile, which is to be obtained via `basisResolver` */ + public Config formulateConfig(Function> basisResolver) throws UnknownBasisException { + Optional optProfile = basisResolver.apply(basisProfileName); + if (!optProfile.isPresent()) { + throw new UnknownBasisException(basisProfileName); + } else { + return overlay.applyOverlay(optProfile.get().getConfig()); + } + } + + /** @return the canonical display name of {@link #getBasisProfileName()} for tracing/debugging */ + public String renderName() { + return WorkforceProfiles.renderName(this.basisProfileName); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java new file mode 100644 index 00000000000..64b5d8ec30b --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; +import lombok.Data; +import lombok.RequiredArgsConstructor; + + +/** Alt. forms of profile overlay to evolve one profile {@link Config} into another. Two overlays may be combined hierarchically into a new overlay. */ +public interface ProfileOverlay { + + /** @return a new, evolved {@link Config}, by application of this overlay */ + Config applyOverlay(Config config); + + /** @return a new overlay, by combining this overlay *over* another */ + ProfileOverlay over(ProfileOverlay other); + + + /** A key-value pair/duple */ + @Data + class KVPair { + private final String key; + private final String value; + } + + + /** An overlay to evolve any profile by adding key-value pairs */ + @Data + @RequiredArgsConstructor // explicit, due to second, variadic ctor + class Adding implements ProfileOverlay { + private final List additionPairs; + + public Adding(KVPair... kvPairs) { + this(Arrays.asList(kvPairs)); + } + + @Override + public Config applyOverlay(Config config) { + return additionPairs.stream().sequential().reduce(config, + (currConfig, additionPair) -> + currConfig.withValue(additionPair.getKey(), ConfigValueFactory.fromAnyRef(additionPair.getValue())), + (configA, configB) -> + configB.withFallback(configA) + ); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + Map base = ((Adding) other).getAdditionPairs().stream().collect(Collectors.toMap(KVPair::getKey, KVPair::getValue)); + additionPairs.stream().forEach(additionPair -> + base.put(additionPair.getKey(), additionPair.getValue())); + return new Adding(base.entrySet().stream().map(entry -> new KVPair(entry.getKey(), entry.getValue())).collect(Collectors.toList())); + } else if (other instanceof Removing) { + return Combo.normalize(this, (Removing) other); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize((Adding) this.over(otherCombo.getAdding()), otherCombo.getRemoving()); + } else { // should NEVER happen! + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + } + + + /** An overlay to evolve any profile by removing named keys */ + @Data + @RequiredArgsConstructor // explicit, due to second, variadic ctor + class Removing implements ProfileOverlay { + private final List removalKeys; + + public Removing(String... keys) { + this(Arrays.asList(keys)); + } + + @Override + public Config applyOverlay(Config config) { + return removalKeys.stream().sequential().reduce(config, + (currConfig, removalKey) -> + currConfig.withoutPath(removalKey), + (configA, configB) -> + configA.withFallback(configB) + ); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + return Combo.normalize((Adding) other, this); + } else if (other instanceof Removing) { + Set otherKeys = new HashSet(((Removing) other).getRemovalKeys()); + otherKeys.addAll(removalKeys); + return new Removing(new ArrayList<>(otherKeys)); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize(otherCombo.getAdding(), (Removing) this.over(otherCombo.getRemoving())); + } else { // should NEVER happen! + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + } + + + /** An overlay to evolve any profile by adding key-value pairs while also removing named keys */ + @Data + class Combo implements ProfileOverlay { + private final Adding adding; + private final Removing removing; + + /** restricted-access ctor: instead use {@link Combo#normalize(Adding, Removing)} */ + private Combo(Adding adding, Removing removing) { + this.adding = adding; + this.removing = removing; + } + + @Override + public Config applyOverlay(Config config) { + return adding.applyOverlay(removing.applyOverlay(config)); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + return Combo.normalize((Adding) this.adding.over((Adding) other), this.removing); + } else if (other instanceof Removing) { + return Combo.normalize(this.adding, (Removing) this.removing.over((Removing) other)); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize((Adding) this.adding.over(otherCombo.getAdding()), (Removing) this.removing.over(otherCombo.getRemoving())); + } else { // should NEVER happen! + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + + /** @return a `Combo` overlay, by combining an `Adding` overlay with a `Removing` overlay */ + protected static Combo normalize(Adding toAdd, Removing toRemove) { + // pre-remove any in `toAdd` that are also in `toRemove`... yet still maintain all in `toRemove`, in case also in the eventual `Config` "basis" + Set removeKeysLookup = toRemove.getRemovalKeys().stream().collect(Collectors.toSet()); + List unmatchedAdditionPairs = toAdd.getAdditionPairs().stream().sequential().filter(additionPair -> + !removeKeysLookup.contains(additionPair.getKey()) + ).collect(Collectors.toList()); + return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new ArrayList<>(removeKeysLookup))); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java new file mode 100644 index 00000000000..8af9e95249a --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Optional; +import lombok.Data; +import lombok.RequiredArgsConstructor; + + +/** + * Core abstraction to model scaling adjustment: a directive originates at a specific moment in time to provide a set point for a given worker profile. + * The set point is the number of instances presently desired for that profile. When naming a heretofore unknown worker profile, the directive MUST also + * define that new profile through a {@link ProfileDerivation} referencing a known profile. Once defined, a worker profile MUST NOT be redefined. + */ +@Data +@RequiredArgsConstructor +public class ScalingDirective { + private final String profileName; + private final int setPoint; + private final long timestampEpochMillis; + private final Optional optDerivedFrom; + + /** Create a set-point-only directive (for a known profile, with no {@link ProfileDerivation}) */ + public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) { + this(profileName, setPoint, timestampEpochMillis, Optional.empty()); + } + + public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis, String basisProfileName, ProfileOverlay overlay) { + this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay))); + } + + /** @return the canonical display name (of {@link #getProfileName()}) for tracing/debugging */ + public String renderName() { + return WorkforceProfiles.renderName(this.profileName); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java new file mode 100644 index 00000000000..fa00c5630a0 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * Parser for {@link ScalingDirective} syntax of the form: + * TIMESTAMP '.' PROFILE_NAME '=' SET_POINT [ ( ',' | ';' ) PROFILE_NAME ( '+(' KV_PAIR ( KV_PAIR)* ')' | '-( KEY ( KEY)* ')' ) ] + * where: + * only ( TIMESTAMP '.' PROFILE_NAME '=' SET_POINT ) are non-optional. An optional trailing definition for that profile may name the + * "basis" profile to derive from through an "adding" or "removing" overlay. + * + * is either ',' or ';' (whichever did follow SET_POINT); choose which to minimize escaping (a KV_PAIR's VALUE, by URL-encoding). + * + * TIMESTAMP is millis-since-epoch. + * + * PROFILE_NAME is a simple [a-zA-Z0-9_]+ identifier familiar from many programming languages. The special name "baseline()" is reserved + * for the baseline profile, which may alternatively be spelled as the empty identifier (""). + * + * SET_POINT must be a non-negative integer ('0' indicates no instances desired). + * + * When an overlay is present, the form introduced by '+' is an "adding" (upsert) overlay and the form prefixed by '-' is a "removing" overlay. + * @see ProfileOverlay for {@link ProfileOverlay.Adding} and {@link ProfileOverlay.Removing} semantics. + * + * KV_PAIR (for "adding") is an '='-delimited (KEY '=' VALUE), where VALUE may use URL-encoding to escape characters. + * + * KEY (for "removing"; also in the "adding" KV_PAIR) is a '.'-separated sequence of alphanumeric identifier segments, as in a {@link java.util.Properties} + * or {@link com.typesafe.config.Config} name. + * + * Whitespace may appear around any tokens, though not within a KEY or a VALUE. + * + * Comments are unsupported. + * + * As an alternative to inlining a lengthy "adding" or "removing" overlay definition, {@link #OVERLAY_DEFINITION_PLACEHOLDER} may stand in to indicate that + * the definition itself will be supplied separately. Supply it and {@link OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)}, upon + * the same UNCHECKED exception (originally thrown by {@link #parse(String)}). + * + * Given this syntax is specifically designed for directives to appear as HDFS file names, we enforce a {@link #MAX_PROFILE_IDENTIFIER_LENGTH} (== 100), + * to ensure fit within the HDFS path segment limit (== 255), and therein avoid: + * org.apache.hadoop.hdfs.protocol.FSLimitException$PathComponentTooLongException: \ + * The maximum path component name limit of ... in directory ... is exceeded: limit=255 length=256 + * the max identifier length of 100 is chosen as follows: + * - limit == 255 + * - current millis-precision epoch timestamp requires 10 chars, yet reserve 16 for future-proofing to nanos-precision + * - reserve 30 chars for future use in syntax evolution + * - 200 = 255 [limit] - 16 [digit timestamp] - 1 ['.'] - 1 ['='] - 1 [',' / ';'] - 6 ['+(...)' / '-(...)'] - 30 [reserved... for future] + * - since a max of two profile identifiers, neither may exceed (200 / 2 == 100) chars + * + * Examples: + * - simply update the set point for the (already existing/defined) profile, `my_profile`: + * 1728435970.my_profile=24 + * + * - update the set point of the baseline profile (equiv. forms): + * 1728436821.=24 + * 1728436828.baseline()=24 + * + * - define a new profile, `new_profile`, with a set point of 16 by deriving via "adding" overlay from the existing profile, `bar` (equiv. forms): + * 1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen) + * 1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen) + * + * - similar derivation, but demonstrating URL-encoding, to preserve ',' and/or literal space in the value (equiv. forms): + * 1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again) + * 1728460832.new_profile=16;bar+(a.b.c=7;l.m=sixteen,%20again) + * + * - define a new profile, `other_profile`, with a set point of 9 by deriving via "removing" overlay from the existing profile, `my_profile` (equiv. forms): + * 1728436436.other_profile=9,my_profile-(x,y.z) + * 1728436499.other_profile=9;my_profile-(x;y.z) + * + * - define a new profile, `plus_profile`, with an initial set point, via "adding" overlay upon the baseline profile (equiv. forms): + * 1728441200.plus_profile=5,+(a.b.c=7,l.m=sixteen) + * 1728443640.plus_profile=5,baseline()+(a.b.c=7,l.m=sixteen) + * + * - define a new profile, `extra_profile`, with an initial set point, via "removing" overlay upon the baseline profile (equiv. forms): + * 1728448521.extra_profile=14,-(a.b, c.d) + * 1728449978.extra_profile=14,baseline()-(a.b, c.d) + * + * - define a new profile with an initial set point, using {@link #OVERLAY_DEFINITION_PLACEHOLDER} syntax instead of inlining the overlay definition: + * 1728539479.and_also=21,baaz-(...) + * 1728547230.this_too=19,quux+(...) + */ +@Slf4j +public class ScalingDirectiveParser { + + /** Announce a syntax error within {@link #getDirective()} */ + public static class InvalidSyntaxException extends Exception { + @Getter private final String directive; + + public InvalidSyntaxException(String directive, String desc) { + super("error: " + desc + ", in ==>" + directive + "<=="); + this.directive = directive; + } + } + + /** + * Report that {@link #getDirective()} used {@link #OVERLAY_DEFINITION_PLACEHOLDER} in lieu of inlining an "adding" or "removing" overlay definition. + * + * When the overlay definition is later recovered, pass it to {@link #retryParsingWithDefinition(String)} to re-attempt the parse. + */ + public static class OverlayPlaceholderNeedsDefinition extends RuntimeException { + @Getter private final String directive; + private final String overlaySep; + private final boolean isAdding; + // ATTENTION: explicitly manage a reference to `parser`, despite it being the enclosing class instance, instead of making this a non-static inner class. + // That allows `definePlaceholder` to be `static`, for simpler testability, while dodging: + // Static declarations in inner classes are not supported at language level '8' + private final ScalingDirectiveParser parser; + + private OverlayPlaceholderNeedsDefinition(String directive, String overlaySep, boolean isAdding, ScalingDirectiveParser enclosing) { + super("overlay placeholder, in ==>" + directive + "<=="); + this.directive = directive; + this.overlaySep = overlaySep; + this.isAdding = isAdding; + this.parser = enclosing; + } + + /** + * Pass the missing `overlayDefinition` and re-attempt parsing. This DOES NOT allow nested placeholding, so `overlayDefinition` may not + * itself be/contain {@link #OVERLAY_DEFINITION_PLACEHOLDER}. + * + * @return the parsed {@link ScalingDirective} or throw {@link InvalidSyntaxException} + */ + public ScalingDirective retryParsingWithDefinition(String overlayDefinition) throws InvalidSyntaxException { + try { + return this.parser.parse(definePlaceholder(this.directive, this.overlaySep, this.isAdding, overlayDefinition)); + } catch (OverlayPlaceholderNeedsDefinition e) { + throw new InvalidSyntaxException(this.directive, "overlay placeholder definition must not be itself another placeholder"); + } + } + + /** encapsulate the intricacies of splicing `overlayDefinition` into `directiveWithPlaceholder`, while performing the necessary URL-encoding */ + @VisibleForTesting + protected static String definePlaceholder(String directiveWithPlaceholder, String overlaySep, boolean isAdding, String overlayDefinition) { + // use care to selectively `urlEncode` parts (but NOT the entire string), to avoid disrupting syntactic chars, like [,;=] + String urlEncodedOverlayDef = Arrays.stream(overlayDefinition.split("\\s*" + overlaySep + "\\s*", -1)) // (neg. limit to disallow trailing empty strings) + .map(kvPair -> { + String[] kv = kvPair.split("\\s*=\\s*", 2); + if (isAdding && kv.length > 1) { + return kv[0] + '=' + urlEncode(kv[1]); + } else { + return kvPair; + } + }).collect(Collectors.joining(overlaySep)); + + // undo any double-encoding of '%', in case `overlayDefinition` arrived URL-encoded + return directiveWithPlaceholder.replace(OVERLAY_DEFINITION_PLACEHOLDER, urlEncodedOverlayDef.replace("%25", "%")); + } + } + + // TODO: syntax to remove an attr while ALSO "adding" (so not simply setting to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )] + + // syntax (described in class-level javadoc): + private static final String DIRECTIVE_REGEX = "(?x) (?s) ^ \\s* (\\d+) \\s* \\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) " + + "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( \\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s* $"; + + public static final int MAX_PROFILE_IDENTIFIER_LENGTH = 100; + public static final String URL_ENCODING_CHARSET = "UTF-8"; + public static final String OVERLAY_DEFINITION_PLACEHOLDER = "..."; + + private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)"; + private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)"; + private static final Pattern directivePattern = Pattern.compile(DIRECTIVE_REGEX); + private static final Pattern keyPattern = Pattern.compile(KEY_REGEX); + private static final Pattern keyValuePattern = Pattern.compile(KEY_VALUE_REGEX); + + private static final String BASELINE_ID = "baseline()"; + + /** + * Parse `directive` into a {@link ScalingDirective} or throw {@link InvalidSyntaxException} + * + * When an overlay definition is not inlined and {@link #OVERLAY_DEFINITION_PLACEHOLDER} is used instead, throw the UNCHECKED exception + * {@link OverlayPlaceholderNeedsDefinition}, which facilitates a subsequent attempt to supply the overlay definition and + * {@link OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)} (a form of the Proxy pattern). + */ + public ScalingDirective parse(String directive) throws InvalidSyntaxException { + Matcher parsed = directivePattern.matcher(directive); + if (parsed.matches()) { + long timestamp = Long.parseLong(parsed.group(1)); + String profileId = parsed.group(2); + String profileName = identifyProfileName(profileId, directive); + int setpoint = Integer.parseInt(parsed.group(3)); + Optional optDerivedFrom = Optional.empty(); + String overlayIntroSep = parsed.group(4); + if (overlayIntroSep != null) { + String basisProfileName = identifyProfileName(parsed.group(5), directive); + if (parsed.group(6) != null) { // '+' == adding + List additions = new ArrayList<>(); + String additionsStr = parsed.group(7); + if (additionsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) { + throw new OverlayPlaceholderNeedsDefinition(directive, overlayIntroSep, true, this); + } else if (!additionsStr.equals("")) { + for (String addStr : additionsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (`-1` limit to disallow trailing empty strings) + Matcher keyValueParsed = keyValuePattern.matcher(addStr); + if (keyValueParsed.matches()) { + additions.add(new ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive, keyValueParsed.group(2)))); + } else { + throw new InvalidSyntaxException(directive, "unable to parse key-value pair - {{" + addStr + "}}"); + } + } + } + optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(additions))); + } else { // '-' == removing + List removalKeys = new ArrayList<>(); + String removalsStr = parsed.group(9); + if (removalsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) { + throw new OverlayPlaceholderNeedsDefinition(directive, overlayIntroSep, false, this); + } else if (!removalsStr.equals("")) { + for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (`-1` limit to disallow trailing empty strings) + Matcher keyParsed = keyPattern.matcher(removeStr); + if (keyParsed.matches()) { + removalKeys.add(keyParsed.group(1)); + } else { + throw new InvalidSyntaxException(directive, "unable to parse key - {{" + removeStr + "}}"); + } + } + } + optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Removing(removalKeys))); + } + } + return new ScalingDirective(profileName, setpoint, timestamp, optDerivedFrom); + } else { + throw new InvalidSyntaxException(directive, "invalid syntax"); + } + } + + /** + * @return `directive` as a pretty-printed string + * + * NOTE: regardless of its length or content, the result inlines the entire overlay def, with {@link #OVERLAY_DEFINITION_PLACEHOLDER} NEVER used + * + * @see #parse(String), the (approximate) inverse operation (modulo {@link #OVERLAY_DEFINITION_PLACEHOLDER}, noted above) + */ + public static String asString(ScalingDirective directive) { + StringBuilder sb = new StringBuilder(); + sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint()); + directive.getOptDerivedFrom().ifPresent(derivedFrom -> { + sb.append(',').append(derivedFrom.getBasisProfileName()); + sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-("); + ProfileOverlay overlay = derivedFrom.getOverlay(); + if (overlay instanceof ProfileOverlay.Adding) { + ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; + for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { + sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); + } + if (adding.getAdditionPairs().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } else { + ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; + for (String key : removing.getRemovalKeys()) { + sb.append(key).append(", "); + } + if (removing.getRemovalKeys().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } + sb.append(')'); + }); + return sb.toString(); + } + + /** handle special naming of {@link #BASELINE_ID} and enforce {@link #MAX_PROFILE_IDENTIFIER_LENGTH} */ + private static String identifyProfileName(String profileId, String directive) throws InvalidSyntaxException { + if (profileId.length() > MAX_PROFILE_IDENTIFIER_LENGTH) { + throw new InvalidSyntaxException(directive, "profile ID exceeds length limit (of " + MAX_PROFILE_IDENTIFIER_LENGTH + "): '" + profileId + "'"); + } + return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME : profileId; + } + + /** @return `s`, URL-decoded as UTF-8 or throw {@link InvalidSyntaxException} */ + private static String urlDecode(String directive, String s) throws InvalidSyntaxException { + try { + return java.net.URLDecoder.decode(s, URL_ENCODING_CHARSET); + } catch (java.io.UnsupportedEncodingException e) { + throw new InvalidSyntaxException(directive, "unable to URL-decode - {{" + s + "}}"); + } + } + + /** @return `s`, URL-encoded as UTF-8 and wrap any {@link java.io.UnsupportedEncodingException}--which SHOULD NEVER HAPPEN!--as an unchecked exception */ + private static String urlEncode(String s) { + try { + return URLEncoder.encode(s, URL_ENCODING_CHARSET); + } catch (java.io.UnsupportedEncodingException e) { + throw new RuntimeException("THIS SHOULD BE IMPOSSIBLE, given we used '" + URL_ENCODING_CHARSET + "' with {{" + s + "}}", e); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java new file mode 100644 index 00000000000..1b0f79e78d6 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.List; + + +/** An opaque source of {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s */ +public interface ScalingDirectiveSource extends Cloneable { + /** @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer directives than previously returned */ + List getScalingDirectives() throws IOException; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java new file mode 100644 index 00000000000..47b92dde40e --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.List; +import lombok.Data; + + +/** Staffing set point {@link ProfileDelta}s for multiple {@link WorkerProfile}s */ +@Data +public class StaffingDeltas { + /** + * Difference for a {@link WorkerProfile}'s staffing set point (e.g. between desired and current levels). Positive `delta` reflects increase, + * while negative, a decrease. + */ + @Data + public static class ProfileDelta { + private final WorkerProfile profile; + private final int delta; + private final long setPointProvenanceEpochMillis; + + /** @return whether {@link #getDelta()} is non-zero */ + public boolean isChange() { + return delta != 0; + } + } + + + private final List perProfileDeltas; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java new file mode 100644 index 00000000000..bf1f1d2e099 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import com.typesafe.config.Config; +import lombok.Data; + + +/** A named worker {@link Config} */ +@Data +public class WorkerProfile { + private final String name; + private final Config config; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java new file mode 100644 index 00000000000..dde55556442 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * Stateful class to maintain the dynamically scalable workforce plan for {@link WorkerProfile}s with a {@link WorkforceStaffing} set point + * for each. The plan evolves through incremental revision by {@link ScalingDirective}s, while {@link #calcStaffingDeltas(WorkforceStaffing)} + * reports {@link StaffingDeltas} between the current plan and another alternative (e.g. current level of) {@link WorkforceStaffing}. + */ +@Slf4j +@ThreadSafe +public class WorkforcePlan { + + /** Common baseclass for illegal plan revision */ + public static class IllegalRevisionException extends Exception { + @Getter private final ScalingDirective directive; + private IllegalRevisionException(ScalingDirective directive, String msg) { + super(msg); + this.directive = directive; + } + + /** Illegal revision: directive arrived out of {@link ScalingDirective#getTimestampEpochMillis()} order */ + public static class OutOfOrderDirective extends IllegalRevisionException { + protected OutOfOrderDirective(ScalingDirective directive, long lastRevisionEpochMillis) { + super(directive, "directive for profile '" + directive.renderName() + "' precedes last revision at " + + lastRevisionEpochMillis + ": " + directive); + } + } + + /** Illegal revision: redefinition of a known worker profile */ + public static class Redefinition extends IllegalRevisionException { + protected Redefinition(ScalingDirective directive, ProfileDerivation proposedDerivation) { + super(directive, "profile '" + directive.renderName() + "' already exists, so may not be redefined on the basis of '" + + proposedDerivation.renderName() + "': " + directive); + } + } + + /** Illegal revision: worker profile evolution from an unknown basis profile */ + public static class UnknownBasis extends IllegalRevisionException { + protected UnknownBasis(ScalingDirective directive, ProfileDerivation.UnknownBasisException ube) { + super(directive, "profile '" + directive.renderName() + "' may not be defined on the basis of an unknown profile '" + + WorkforceProfiles.renderName(ube.getName()) + "': " + directive); + } + } + + /** Illegal revision: set point for an unknown worker profile */ + public static class UnrecognizedProfile extends IllegalRevisionException { + protected UnrecognizedProfile(ScalingDirective directive) { + super(directive, "unrecognized profile reference '" + directive.renderName() + "': " + directive); + } + } + } + + private final WorkforceProfiles profiles; + private final WorkforceStaffing staffing; + @Getter private volatile long lastRevisionEpochMillis; + + /** create new plan with the initial, baseline worker profile using `baselineConfig` at `initialSetPoint` */ + public WorkforcePlan(Config baselineConfig, int initialSetPoint) { + this.profiles = WorkforceProfiles.withBaseline(baselineConfig); + this.staffing = WorkforceStaffing.initialize(initialSetPoint); + this.lastRevisionEpochMillis = 0; + } + + /** @return how many worker profiles known to the plan, including the baseline */ + public int getNumProfiles() { + return profiles.size(); + } + + /** revise the plan with a new {@link ScalingDirective} or throw {@link IllegalRevisionException} */ + public synchronized void revise(ScalingDirective directive) throws IllegalRevisionException { + String name = directive.getProfileName(); + if (this.lastRevisionEpochMillis >= directive.getTimestampEpochMillis()) { + throw new IllegalRevisionException.OutOfOrderDirective(directive, this.lastRevisionEpochMillis); + }; + Optional optExistingProfile = profiles.apply(name); + Optional optDerivation = directive.getOptDerivedFrom(); + if (optExistingProfile.isPresent() && optDerivation.isPresent()) { + throw new IllegalRevisionException.Redefinition(directive, optDerivation.get()); + } else if (!optExistingProfile.isPresent() && !optDerivation.isPresent()) { + throw new IllegalRevisionException.UnrecognizedProfile(directive); + } else { // [exclusive-or: either, but not both present] + if (optDerivation.isPresent()) { // define a new profile on the basis of another + try { + this.profiles.addProfile(new WorkerProfile(name, optDerivation.get().formulateConfig(this.profiles))); + } catch (ProfileDerivation.UnknownBasisException ube) { + throw new IllegalRevisionException.UnknownBasis(directive, ube); + } + } + // TODO - make idempotent, since any retry attempt following failure between `addProfile` and `reviseStaffing` would thereafter fail with + // `IllegalRevisionException.Redefinition`, despite us wishing to adjust the set point for that new profile defined just before the failure. + // - how to ensure the profile def is the same / unchanged? (e.g. compare full profile `Config` equality)? + // NOTE: the current outcome would be a profile defined in `WorkforceProfiles` with no set point in `WorkforceStaffing`. fortunately, + // that would NOT lead to `calcStaffingDeltas` throwing {@link WorkforceProfiles.UnknownProfileException}! The out-of-band (manual) + // workaround/repair would be revision by another, later directive that provides the set point for that profile (WITHOUT providing the definition) + + this.staffing.reviseStaffing(name, directive.getSetPoint(), directive.getTimestampEpochMillis()); + this.lastRevisionEpochMillis = directive.getTimestampEpochMillis(); + } + } + + /** + * Performs atomic bulk revision while enforcing `directives` ordering in accord with {@link ScalingDirective#getTimestampEpochMillis()} + * + * This version catches {@link IllegalRevisionException}, to log a warning message before continuing to process subsequent directives. + */ + public synchronized void reviseWhenNewer(List directives) { + reviseWhenNewer(directives, ire -> { log.warn("Failure: ", ire); }); + } + + /** + * Performs atomic bulk revision while enforcing `directives` ordering in accord with {@link ScalingDirective#getTimestampEpochMillis()} + * + * This version catches {@link IllegalRevisionException}, to call `illegalRevisionHandler` before continuing to process subsequent directives. + */ + public synchronized void reviseWhenNewer(List directives, Consumer illegalRevisionHandler) { + directives.stream().sequential() + .forEach(directive -> { + try { + revise(directive); + } catch (IllegalRevisionException ire) { + illegalRevisionHandler.accept(ire); + } + }); + } + + /** @return diff of {@link StaffingDeltas} between this, current {@link WorkforcePlan} and some `altStaffing` (e.g. current) {@link WorkforceStaffing} */ + public synchronized StaffingDeltas calcStaffingDeltas(WorkforceStaffing altStaffing) { + return staffing.calcDeltas(altStaffing, profiles); + } + + /** @return [for testing/debugging] the current staffing set point for the {@link WorkerProfile} named `profileName`, when it exists */ + @VisibleForTesting + Optional peepStaffing(String profileName) { + return staffing.getStaffing(profileName); + } + + /** @return [for testing/debugging] the {@link WorkerProfile} named `profileName` or throws {@link WorkforceProfiles.UnknownProfileException} */ + @VisibleForTesting + WorkerProfile peepProfile(String profileName) throws WorkforceProfiles.UnknownProfileException { + return profiles.getOrThrow(profileName); + } + + /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it should NEVER {@link WorkforceProfiles.UnknownProfileException} */ + @VisibleForTesting + WorkerProfile peepBaselineProfile() throws WorkforceProfiles.UnknownProfileException { + return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java new file mode 100644 index 00000000000..da19c1c98dd --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import javax.annotation.concurrent.ThreadSafe; + +import com.typesafe.config.Config; +import lombok.Getter; + + +/** A collection of known {@link WorkerProfile}s, also offering name -> profile resolution for {@link ProfileDerivation} */ +@ThreadSafe +public class WorkforceProfiles implements Function> { + + /** Indicates {@link #getProfileName()} NOT found */ + public static class UnknownProfileException extends RuntimeException { + @Getter private final String profileName; + + public UnknownProfileException(String profileName) { + super("named '" + WorkforceProfiles.renderName(profileName) + "'"); + this.profileName = profileName; + } + } + + + public static final String BASELINE_NAME = ""; + public static final String BASELINE_NAME_RENDERING = "<>"; + + /** @return the canonical display name for tracing/debugging, with special handling for {@link #BASELINE_NAME} */ + public static String renderName(String name) { + return name.equals(BASELINE_NAME) ? BASELINE_NAME_RENDERING : name; + } + + + private final ConcurrentHashMap profileByName; + + /** restricted-access ctor: instead use {@link #withBaseline(Config)} */ + private WorkforceProfiles() { + this.profileByName = new ConcurrentHashMap<>(); + } + + /** @return a new instance with `baselineConfig` as the "baseline profile" */ + public static WorkforceProfiles withBaseline(Config baselineConfig) { + WorkforceProfiles profiles = new WorkforceProfiles(); + profiles.addProfile(new WorkerProfile(BASELINE_NAME, baselineConfig)); + return profiles; + } + + /** Add a new, previously unknown {@link WorkerProfile} or throw `RuntimeException` on any attempt to add/redefine a previously known profile */ + public void addProfile(WorkerProfile profile) { + if (profileByName.putIfAbsent(profile.getName(), profile) != null) { + throw new RuntimeException("profile '" + WorkforceProfiles.renderName(profile.getName()) + "' already exists!"); + } + } + + /** @return the {@link WorkerProfile} named `profileName`, when it exists */ + @Override + public Optional apply(String profileName) { + return Optional.ofNullable(profileByName.get(profileName)); + } + + /** + * @return the {@link WorkerProfile} named `profileName` or throw {@link UnknownProfileException} when it does not exist + * @throws UnknownProfileException when `profileName` is unknown + */ + public WorkerProfile getOrThrow(String profileName) { + WorkerProfile profile = profileByName.get(profileName); + if (profile != null) { + return profile; + } + throw new UnknownProfileException(profileName); + } + + /** @return how many known profiles, including the baseline */ + public int size() { + return profileByName.size(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java new file mode 100644 index 00000000000..2503e922a77 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import lombok.Data; + + +/** + * Collection to map {@link WorkerProfile} names, each to a given set point. + * + * An instance might be "managed" by a {@link WorkforcePlan}, to reflect desired staffing, or else "unmanaged", where it might represent the + * current, actual per-worker scaling level. Those two could be compared via {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}, to + * calculate the {@link StaffingDeltas} between the two (i.e. between the staffing for the "managed" workforce plan of record vs. the independently + * maintained, "unmanaged" staffing levels). + * + * TIP: for encapsulation simplicity, invoke the "managed" form through {@link WorkforcePlan#calcStaffingDeltas(WorkforceStaffing)} + */ +@ThreadSafe +public class WorkforceStaffing { + public static final long INITIALIZATION_PROVENANCE_EPOCH_MILLIS = 0L; + // CAUTION: sentinel value only for use with `StaffingDeltas.ProfileDelta` - NOT for use with `WorkforceStaffing::reviseStaffing`! + public static final long UNKNOWN_PROVENANCE_EPOCH_MILLIS = -1L; + + /** + * internal rep. for a set point, with associated provenance timestamp, to inform debugging, when returned by + * {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)} + */ + @Data + private static class SetPoint { + private final int numWorkers; + private final long provenanceEpochMillis; // for debuggability + } + + + private final Map setPointsByName; + + /** restricted-access ctor: instead use {@link #initialize(int)} */ + private WorkforceStaffing() { + this.setPointsByName = new ConcurrentHashMap<>(); + } + + /** @return a new instance with `initialBaselineSetPoint` for the "baseline profile's" set point */ + public static WorkforceStaffing initialize(int initialBaselineSetPoint) { + WorkforceStaffing staffing = new WorkforceStaffing(); + staffing.reviseStaffing(WorkforceProfiles.BASELINE_NAME, initialBaselineSetPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + return staffing; + } + + /** @return [for test init. brevity] a new instance with `initialBaselineSetPoint` for the "baseline profile" set point, plus multiple other set points */ + @VisibleForTesting + public static WorkforceStaffing initializeStaffing(int initialBaselineSetPoint, Map initialSetPointsByProfileName) { + WorkforceStaffing staffing = initialize(initialBaselineSetPoint); + initialSetPointsByProfileName.forEach((profileName, setPoint) -> + staffing.reviseStaffing(profileName, setPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS) + ); + return staffing; + } + + /** @return the staffing set point for the {@link WorkerProfile} named `profileName`, when it exists */ + public Optional getStaffing(String profileName) { + return Optional.ofNullable(setPointsByName.get(profileName)).map(SetPoint::getNumWorkers); + } + + /** update the staffing set point for the {@link WorkerProfile} named `profileName`, recording `provenanceEpochMillis` as the revision timestamp */ + public void reviseStaffing(String profileName, int setPoint, long provenanceEpochMillis) { + Preconditions.checkArgument(setPoint >= 0, "set points must be non-negative: '" + profileName + "' had " + setPoint); + Preconditions.checkArgument(provenanceEpochMillis >= INITIALIZATION_PROVENANCE_EPOCH_MILLIS, + "provenanceEpochMillis must be non-negative: '" + profileName + "' had " + provenanceEpochMillis); + setPointsByName.put(profileName, new SetPoint(setPoint, provenanceEpochMillis)); + } + + /** update the staffing set point for the {@link WorkerProfile} named `profileName`, without recording any specific provenance timestamp */ + @VisibleForTesting + public void updateSetPoint(String profileName, int setPoint) { + reviseStaffing(profileName, setPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + } + + /** + * @return the {@link StaffingDeltas} between `this` (as "the reference") and `altStaffing`, by using `profiles` to obtain {@link WorkerProfile}s. + * (A positive {@link StaffingDeltas.ProfileDelta#getDelta()} reflects an increase, while a negative, a decrease.) + * + * NOTE: when the same {@link WorkforcePlan} manages both this {@link WorkforceStaffing} and {@link WorkforceProfiles}, then + * {@link WorkforceProfiles.UnknownProfileException} should NEVER occur. + */ + public synchronized StaffingDeltas calcDeltas(WorkforceStaffing altStaffing, WorkforceProfiles profiles) { + Map frozenAltSetPointsByName = new HashMap<>(); // freeze entries for consistency amidst multiple traversals + altStaffing.setPointsByName.entrySet().forEach(entry -> frozenAltSetPointsByName.put(entry.getKey(), entry.getValue())); + // not expecting any profile earlier in `reference` to no longer be set... (but defensive coding nonetheless) + List profileDeltas = frozenAltSetPointsByName.entrySet().stream() + .filter(entry -> !this.setPointsByName.containsKey(entry.getKey())) + .map(entry -> new StaffingDeltas.ProfileDelta(profiles.getOrThrow(entry.getKey()), 0 - entry.getValue().getNumWorkers(), UNKNOWN_PROVENANCE_EPOCH_MILLIS)) + .collect(Collectors.toList()); + profileDeltas.addAll(this.setPointsByName.entrySet().stream().map(entry -> { + Optional optEquivAltSetPoint = Optional.ofNullable(frozenAltSetPointsByName.get(entry.getKey())).map(SetPoint::getNumWorkers); + return new StaffingDeltas.ProfileDelta( + profiles.getOrThrow(entry.getKey()), + entry.getValue().getNumWorkers() - optEquivAltSetPoint.orElse(0), + entry.getValue().getProvenanceEpochMillis()); + } + ).filter(delta -> delta.isChange()) + .collect(Collectors.toList())); + return new StaffingDeltas(profileDeltas); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java new file mode 100644 index 00000000000..522cc491c55 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.Streams; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +import org.apache.gobblin.util.io.SeekableFSInputStream; + + +/** Test {@link FsScalingDirectiveSource} */ +public class FsScalingDirectiveSourceTest { + + private static final String DIRECTIVES_DIR = "/test/dynamic/directives"; + private static final String ERRORS_DIR = "/test/dynamic/errors"; + @Mock private FileSystem fileSystem; + private FsScalingDirectiveSource source; + private static final ScalingDirectiveParser parser = new ScalingDirectiveParser(); + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + source = new FsScalingDirectiveSource(fileSystem, DIRECTIVES_DIR, Optional.of(ERRORS_DIR)); + } + + @Test + public void getScalingDirectivesWhenAllValidSyntax() throws IOException, ScalingDirectiveParser.InvalidSyntaxException { + String[] fileNames = { + "1700010000.=4", + "1700020000.new_profile=7,+(a.b.c=7,x.y=five)", + "1700030000.another_profile=3,+(a.b.c=8,x.y=six)", + "1700040000.new_profile=2" + }; + FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), (fileName, i) -> + createFileStatus(fileName, 1000 * (i + 1)) + ).toArray(FileStatus[]::new); + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + List directives = source.getScalingDirectives(); + + Assert.assertEquals(directives.size(), 4); + for (int i = 0; i < directives.size(); i++) { + Assert.assertEquals(directives.get(i), parseDirective(fileNames[i]), "fileNames[" + i + "] = " + fileNames[i]); + } + } + + @Test + public void getScalingDirectivesWhileRejectingEachInvalidEntry() throws IOException { + String[] fileNames = { + "1700010000.=4", + // still returned... although it would later be rejected as `WorkforcePlan.IllegalRevisionException.UnrecognizedProfile` + "1700020000.new_profile=2", + "1700030000.new_profile=7,+(a.b.c=7,x.y=five)", + // rejected: illegal syntax will fail to parse + "completely invalid", + "1700040000.another_profile=3,+(a.b.c=8,x.y=six)", + // rejected: because we later mock this as a dir, while a directive MUST be a file + "1700046000.acutally_a_dir=6,-(b.a,y.x)", + "1700050000.new_profile=9", + // rejected: because Removing must list key names, NOT key-value pairs + "1700055000.bad_directive=69,my_profile-(x=y,z=1)" + }; + FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), (fileName, i) -> { + boolean isFile = !fileName.contains("_a_dir="); + return createFileStatus(fileName, 1000 * (i + 1), isFile); + }).toArray(FileStatus[]::new); + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + List directives = source.getScalingDirectives(); + + Assert.assertEquals(directives.size(), 5); + try { + Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), "fileNames[" + 0 + "] = " + fileNames[0]); + Assert.assertEquals(directives.get(1), parseDirective(fileNames[1]), "fileNames[" + 1 + "] = " + fileNames[1]); + Assert.assertEquals(directives.get(2), parseDirective(fileNames[2]), "fileNames[" + 2 + "] = " + fileNames[2]); + Assert.assertEquals(directives.get(3), parseDirective(fileNames[4]), "fileNames[" + 4 + "] = " + fileNames[4]); + Assert.assertEquals(directives.get(4), parseDirective(fileNames[6]), "fileNames[" + 6 + "] = " + fileNames[6]); + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + Assert.fail("Unexpected parsing error (with test directive)!", e); + } + + // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) work as expected: + ArgumentCaptor sourcePathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor destPathCaptor = ArgumentCaptor.forClass(Path.class); + Mockito.verify(fileSystem, Mockito.times(fileNames.length - directives.size())) + .rename(sourcePathCaptor.capture(), destPathCaptor.capture()); + + List expectedErrorFileNames = Lists.newArrayList(fileNames[3], fileNames[5], fileNames[7]); + List expectedErrorDirectivePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(DIRECTIVES_DIR, fileName)) + .collect(Collectors.toList()); + List expectedErrorPostRenamePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(ERRORS_DIR, fileName)) + .collect(Collectors.toList()); + + Assert.assertEquals(sourcePathCaptor.getAllValues(), expectedErrorDirectivePaths); + Assert.assertEquals(destPathCaptor.getAllValues(), expectedErrorPostRenamePaths); + } + + @Test + public void getScalingDirectivesWhileRejectingOutOfOrderEntries() throws IOException { + String[] fileNames = { + "1700010000.=4", + "1700030000.new_profile=7,+(a.b.c=7,x.y=five)", + "1700040000.another_profile=3,+(a.b.c=8,x.y=six)", + "1700050000.new_profile=9" + }; + FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), (fileName, i) -> + // NOTE: elements [1] and [3] modtime will be 0, making them out of order against their directive timestamp (in their filename, like `1700030000.`) + createFileStatus(fileName, 1000 * (i + 1) * ((i + 1) % 2)) + ).toArray(FileStatus[]::new); + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + List directives = source.getScalingDirectives(); + + Assert.assertEquals(directives.size(), 2); + try { + Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), "fileNames[" + 0 + "] = " + fileNames[0]); + Assert.assertEquals(directives.get(1), parseDirective(fileNames[2]), "fileNames[" + 2 + "] = " + fileNames[2]); + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + Assert.fail("Unexpected parsing error (with test directive)!", e); + } + + // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) work as expected: + ArgumentCaptor sourcePathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor destPathCaptor = ArgumentCaptor.forClass(Path.class); + Mockito.verify(fileSystem, Mockito.times(fileNames.length - directives.size())) + .rename(sourcePathCaptor.capture(), destPathCaptor.capture()); + + List expectedErrorFileNames = Lists.newArrayList(fileNames[1], fileNames[3]); + List expectedErrorDirectivePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(DIRECTIVES_DIR, fileName)) + .collect(Collectors.toList()); + List expectedErrorPostRenamePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(ERRORS_DIR, fileName)) + .collect(Collectors.toList()); + + Assert.assertEquals(sourcePathCaptor.getAllValues(), expectedErrorDirectivePaths); + Assert.assertEquals(destPathCaptor.getAllValues(), expectedErrorPostRenamePaths); + } + + @Test + public void getScalingDirectivesWithOverlayPlaceholders() throws IOException { + String[] fileNames = { + "1700010000.=4", + "1700020000.some_profile=9,+(...)", + "1700030000.other_profile=2,-(...)", + "1700040000.some_profile=3", + "1700050000.other_profile=10" + }; + FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), (fileName, i) -> + createFileStatus(fileName, 1000 * (i + 1)) + ).toArray(FileStatus[]::new); + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + + String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1] + String removingOverlayDef = "b.c,y.z.a"; // for [2] + Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef)); + Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, fileNames[2]))).thenReturn(createInputStreamFromString(removingOverlayDef)); + List directives = source.getScalingDirectives(); + + Assert.assertEquals(directives.size(), fileNames.length); + try { + Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), "fileNames[" + 0 + "] = " + fileNames[0]); + Assert.assertEquals(directives.get(1), parseDirective(fileNames[1].replace("...", addingOverlayDef)), "fileNames[" + 1 + "] = " + fileNames[1]); + Assert.assertEquals(directives.get(2), parseDirective(fileNames[2].replace("...", removingOverlayDef)), "fileNames[" + 2 + "] = " + fileNames[2]); + Assert.assertEquals(directives.get(3), parseDirective(fileNames[3]), "fileNames[" + 3 + "] = " + fileNames[3]); + Assert.assertEquals(directives.get(4), parseDirective(fileNames[4]), "fileNames[" + 4 + "] = " + fileNames[4]); + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + Assert.fail("Unexpected parsing error (with test directive)!", e); + } + + Mockito.verify(fileSystem, Mockito.never()).rename(Mockito.any(), Mockito.any()); + } + + @Test + public void getScalingDirectivesWithOverlayPlaceholdersButInvalidDefinitions() throws IOException { + String[] fileNames = { + "1700020000.some_profile=9,+(...)", + "1700030000.other_profile=2,-(...)", + "1700070000.=10" + }; + FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), (fileName, i) -> + createFileStatus(fileName, 1000 * (i + 1)) + ).toArray(FileStatus[]::new); + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + + // NOTE: switch these, so the overlay defs are invalid: `addingOverlayDef` with Removing and `removingOverlayDef` with Adding + String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1] + String removingOverlayDef = "b.c,y.z.a"; // for [0] + Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, fileNames[0]))).thenReturn(createInputStreamFromString(removingOverlayDef)); + Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef)); + List directives = source.getScalingDirectives(); + + Assert.assertEquals(directives.size(), 1); + try { + Assert.assertEquals(directives.get(0), parseDirective(fileNames[2]), "fileNames[" + 2 + "] = " + fileNames[2]); + } catch (ScalingDirectiveParser.InvalidSyntaxException e) { + Assert.fail("Unexpected parsing error (with test directive)!", e); + } + + // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) work as expected: + ArgumentCaptor sourcePathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor destPathCaptor = ArgumentCaptor.forClass(Path.class); + Mockito.verify(fileSystem, Mockito.times(fileNames.length - directives.size())) + .rename(sourcePathCaptor.capture(), destPathCaptor.capture()); + + List expectedErrorFileNames = Lists.newArrayList(fileNames[0], fileNames[1]); + List expectedErrorDirectivePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(DIRECTIVES_DIR, fileName)) + .collect(Collectors.toList()); + List expectedErrorPostRenamePaths = expectedErrorFileNames.stream() + .map(fileName -> new Path(ERRORS_DIR, fileName)) + .collect(Collectors.toList()); + + Assert.assertEquals(sourcePathCaptor.getAllValues(), expectedErrorDirectivePaths); + Assert.assertEquals(destPathCaptor.getAllValues(), expectedErrorPostRenamePaths); + } + + @Test + public void getScalingDirectivesWithNoFilesReturnsEmpty() throws IOException { + FileStatus[] fileStatuses = {}; + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenReturn(fileStatuses); + Assert.assertTrue(source.getScalingDirectives().isEmpty()); + } + + @Test(expectedExceptions = IOException.class) + public void getScalingDirectivesWithIOExceptionPassesThrough() throws IOException { + Mockito.when(fileSystem.listStatus(new Path(DIRECTIVES_DIR))).thenThrow(new IOException()); + source.getScalingDirectives(); + } + + protected static ScalingDirective parseDirective(String s) throws ScalingDirectiveParser.InvalidSyntaxException { + return parser.parse(s); + } + + protected static FileStatus createFileStatus(String fileName, long modTime) { + return createFileStatus(fileName, modTime, true); + } + + protected static FileStatus createFileStatus(String fileName, long modTime, boolean isFile) { + return new FileStatus(0, !isFile, 0, 0, modTime, new Path(DIRECTIVES_DIR, fileName)); + } + + public static FSDataInputStream createInputStreamFromString(String input) { + return new FSDataInputStream(new SeekableFSInputStream(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)))); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java new file mode 100644 index 00000000000..e953298c66f --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Optional; +import java.util.function.Function; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.Test; +import org.testng.Assert; + + +/** Test {@link ProfileDerivation} */ +public class ProfileDerivationTest { + + @Test + public void testFormulateConfigWithSuccessfulBasisResolution() throws ProfileDerivation.UnknownBasisException { + String basisProfileName = "testProfile"; + ProfileOverlay overlay = new ProfileOverlay.Adding(new ProfileOverlay.KVPair("key1", "value1B")); + ProfileDerivation profileDerivation = new ProfileDerivation(basisProfileName, overlay); + + Function> basisResolver = name -> { + if (basisProfileName.equals(name)) { + Config config = ConfigFactory.parseString("key1=value1A, key2=value2"); + WorkerProfile profile = new WorkerProfile(basisProfileName, config); + return Optional.of(profile); + } + return Optional.empty(); + }; + + Config resultConfig = profileDerivation.formulateConfig(basisResolver); + Assert.assertEquals(resultConfig.getString("key1"), "value1B"); + Assert.assertEquals(resultConfig.getString("key2"), "value2"); + } + + @Test + public void testFormulateConfigUnknownBasis() { + String basisProfileName = "foo"; + try { + ProfileDerivation derivation = new ProfileDerivation(basisProfileName, null); + derivation.formulateConfig(ignore -> Optional.empty()); + Assert.fail("Expected instead: UnknownBasisException"); + } catch (ProfileDerivation.UnknownBasisException ube) { + Assert.assertEquals(ube.getName(), basisProfileName); + } + } + + @Test + public void testRenderNameNonBaseline() { + String name = "testProfile"; + ProfileDerivation profileDerivation = new ProfileDerivation(name, null); + String renderedName = profileDerivation.renderName(); + Assert.assertEquals(renderedName, name); + } + + @Test + public void testRenderNameBaseline() { + ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null); + String renderedName = profileDerivation.renderName(); + Assert.assertEquals(renderedName, WorkforceProfiles.BASELINE_NAME_RENDERING); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java new file mode 100644 index 00000000000..bca2dee0acd --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.Test; +import org.testng.Assert; + + +/** Test {@link ProfileOverlay} */ +public class ProfileOverlayTest { + + @Test + public void testAddingApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1A, key4=value4"); + ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new ProfileOverlay.KVPair("key1", "value1B"), new ProfileOverlay.KVPair("key2", "value2")); + Config updatedConfig = adding.applyOverlay(config); + Assert.assertEquals(updatedConfig.getString("key1"), "value1B"); + Assert.assertEquals(updatedConfig.getString("key2"), "value2"); + Assert.assertEquals(updatedConfig.getString("key4"), "value4"); + } + + @Test + public void testRemovingApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1, key2=value2"); + ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key1"); + Config updatedConfig = removing.applyOverlay(config); + Assert.assertFalse(updatedConfig.hasPath("key1")); + Assert.assertEquals(updatedConfig.getString("key2"), "value2"); + } + + @Test + public void testComboApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1, key2=value2, key3=value3"); + ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new ProfileOverlay.KVPair("key4", "value4"), new ProfileOverlay.KVPair("key5", "value5")); + ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key2", "key4"); + ProfileOverlay.Combo combo = ProfileOverlay.Combo.normalize(adding, removing); + Config updatedConfig = combo.applyOverlay(config); + Assert.assertEquals(updatedConfig.getString("key1"), "value1"); + Assert.assertEquals(updatedConfig.hasPath("key2"), false); + Assert.assertEquals(updatedConfig.getString("key3"), "value3"); + Assert.assertEquals(updatedConfig.hasPath("key4"), false); + Assert.assertEquals(updatedConfig.getString("key5"), "value5"); + + // validate `Combo::normalize` works too: + Assert.assertEquals(combo.getAdding().getAdditionPairs().size(), 1); + Assert.assertEquals(combo.getAdding().getAdditionPairs().get(0), new ProfileOverlay.KVPair("key5", "value5")); + Assert.assertEquals(combo.getRemoving().getRemovalKeys().size(), 2); + Assert.assertEqualsNoOrder(combo.getRemoving().getRemovalKeys().toArray(), removing.getRemovalKeys().toArray()); + } + + @Test + public void testAddingOver() { + ProfileOverlay.Adding adding1 = new ProfileOverlay.Adding(new ProfileOverlay.KVPair("key1", "value1"), new ProfileOverlay.KVPair("key2", "value2A")); + ProfileOverlay.Adding adding2 = new ProfileOverlay.Adding(new ProfileOverlay.KVPair("key2", "value2B"), new ProfileOverlay.KVPair("key3", "value3")); + ProfileOverlay result = adding1.over(adding2); + Config config = result.applyOverlay(ConfigFactory.empty()); + Assert.assertEquals(config.getString("key1"), "value1"); + Assert.assertEquals(config.getString("key2"), "value2A"); + Assert.assertEquals(config.getString("key3"), "value3"); + } + + @Test + public void testRemovingOver() { + ProfileOverlay.Removing removing1 = new ProfileOverlay.Removing("key1", "key2"); + ProfileOverlay.Removing removing2 = new ProfileOverlay.Removing("key2", "key3"); + ProfileOverlay result = removing1.over(removing2); + Assert.assertTrue(result instanceof ProfileOverlay.Removing); + ProfileOverlay.Removing removingResult = (ProfileOverlay.Removing) result; + Assert.assertEqualsNoOrder(removingResult.getRemovalKeys().toArray(), new String[]{"key1", "key2", "key3"}); + + Config config = result.applyOverlay(ConfigFactory.parseString("key1=value1, key2=value2, key3=value3, key4=value4")); + Assert.assertFalse(config.hasPath("key1")); + Assert.assertFalse(config.hasPath("key2")); + Assert.assertFalse(config.hasPath("key3")); + Assert.assertTrue(config.hasPath("key4")); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java new file mode 100644 index 00000000000..890b3a01308 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Arrays; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.IntStream; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.testng.Assert; + + +/** Test {@link ScalingDirectiveParser} */ +public class ScalingDirectiveParserTest { + + private final ScalingDirectiveParser parser = new ScalingDirectiveParser(); + + @Test + public void parseSimpleDirective() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728435970.my_profile=24"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728435970L); + Assert.assertEquals(sd.getProfileName(), "my_profile"); + Assert.assertEquals(sd.getSetPoint(), 24); + Assert.assertFalse(sd.getOptDerivedFrom().isPresent()); + } + + @Test + public void parseUnnamedBaselineProfile() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728436821.=12"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436821L); + Assert.assertEquals(sd.getProfileName(), WorkforceProfiles.BASELINE_NAME); + Assert.assertEquals(sd.getSetPoint(), 12); + Assert.assertFalse(sd.getOptDerivedFrom().isPresent()); + } + + @Test + public void parseBaselineProfilePseudoIdentifier() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728436828.baseline()=6"); + Assert.assertEquals(sd, new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 1728436828L, Optional.empty())); + } + + @Test + public void parseAddingOverlayWithCommaSep() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728439210L); + Assert.assertEquals(sd.getProfileName(), "new_profile"); + Assert.assertEquals(sd.getSetPoint(), 16); + Assert.assertTrue(sd.getOptDerivedFrom().isPresent()); + ProfileDerivation derivation = sd.getOptDerivedFrom().get(); + Assert.assertEquals(derivation.getBasisProfileName(), "bar"); + Assert.assertEquals(derivation.getOverlay(), + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m", "sixteen"))); + } + + @Test + public void parseAddingOverlayWithSemicolonSep() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728439223.new_profile=32;baz+( a.b.c=7 ; l.m.n.o=sixteen )"); + Assert.assertEquals(sd, new ScalingDirective("new_profile", 32, 1728439223L, "baz", + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m.n.o", "sixteen")))); + } + + @Test + public void parseAddingOverlayWithCommaSepUrlEncoded() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728460832.new_profile=16,baa+(a.b.c=7,l.m=sixteen%2C%20again)"); + Assert.assertEquals(sd, new ScalingDirective("new_profile", 16, 1728460832L, "baa", + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m", "sixteen, again")))); + } + + @Test + public void parseRemovingOverlayWithCommaSep() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728436436.other_profile=9,my_profile-( x , y.z )"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436436L); + Assert.assertEquals(sd.getProfileName(), "other_profile"); + Assert.assertEquals(sd.getSetPoint(), 9); + Assert.assertTrue(sd.getOptDerivedFrom().isPresent()); + ProfileDerivation derivation = sd.getOptDerivedFrom().get(); + Assert.assertEquals(derivation.getBasisProfileName(), "my_profile"); + Assert.assertEquals(derivation.getOverlay(), new ProfileOverlay.Removing("x", "y.z")); + } + + @Test + public void parseRemovingOverlayWithSemicolonSep() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728436499.other_profile=9;my_profile-(x.y;z.z)"); + Assert.assertEquals(sd, new ScalingDirective("other_profile", 9, 1728436499L, "my_profile", + new ProfileOverlay.Removing("x.y", "z.z"))); + } + + @Test + public void parseAddingOverlayWithWhitespace() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse(" 1728998877 . another = 999 ; wow + ( t.r = jump%20 ; cb.az = foo%20#%20111 ) "); + Assert.assertEquals(sd, new ScalingDirective("another", 999, 1728998877L, "wow", + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("t.r", "jump "), new ProfileOverlay.KVPair("cb.az", "foo # 111")))); + } + + @Test + public void parseRemovingOverlayWithWhitespace() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse(" 1728334455 . also = 77 , really - ( t.r , cb.az ) "); + Assert.assertEquals(sd, new ScalingDirective("also", 77, 1728334455L, "really", + new ProfileOverlay.Removing("t.r", "cb.az"))); + } + + @Test + public void parseAddingOverlayUponUnnamedBaselineProfile() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728441200.plus_profile=16,+(q.r.s=four,l.m=16)"); + Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 1728441200L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r.s", "four"), new ProfileOverlay.KVPair("l.m", "16")))); + } + + @Test + public void parseAddingOverlayUponBaselineProfilePseudoIdentifier() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728443640.plus_profile=16,baseline()+(q.r=five,l.m=12)"); + Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 1728443640L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r", "five"), new ProfileOverlay.KVPair("l.m", "12")))); + } + + @Test + public void parseRemovingOverlayUponUnnamedBaselineProfile() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("1728448521.extra_profile=0,-(a.b, c.d)"); + Assert.assertEquals(sd, new ScalingDirective("extra_profile", 0, 1728448521L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Removing("a.b", "c.d"))); + } + + @Test + public void parseRemovingOverlayUponBaselineProfilePseudoIdentifier() throws ScalingDirectiveParser.InvalidSyntaxException { + ScalingDirective sd = parser.parse("4.extra_profile=9,baseline()-(a.b, c.d)"); + Assert.assertEquals(sd, new ScalingDirective("extra_profile", 9, 4L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Removing("a.b", "c.d"))); + } + + @Test + public void parseProfileIdTooLongThrows() throws ScalingDirectiveParser.InvalidSyntaxException { + BiFunction fmtRemovingOverlaySyntax = (profileId, basisProfileId) -> { + return "1728449000." + profileId + "=99," + basisProfileId + "-(foo,bar,baz)"; + }; + String alphabet = IntStream.rangeClosed('a', 'z').collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString(); + String tooLongId = alphabet + alphabet.toUpperCase() + alphabet + alphabet.toUpperCase(); + Assert.assertTrue(tooLongId.length() > ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH); + + final int atMaxLen = ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH; + final int beyondMaxLen = ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH + 1; + String notTooLongDirective1 = fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen), "some_profile"); + String notTooLongDirective2 = fmtRemovingOverlaySyntax.apply("new_profile", tooLongId.substring(0, atMaxLen)); + String notTooLongDirective3 = fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen), tooLongId.substring(1, atMaxLen + 1)); + + for (String directiveStr : new String[] { notTooLongDirective1, notTooLongDirective2, notTooLongDirective3 }) { + Assert.assertNotNull(parser.parse(directiveStr)); + } + + String tooLongDirective1 = fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen), "some_profile"); + String tooLongDirective2 = fmtRemovingOverlaySyntax.apply("new_profile", tooLongId.substring(0, beyondMaxLen)); + String tooLongDirective3 = fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen), tooLongId.substring(1, beyondMaxLen + 1)); + + Arrays.stream(new String[] { tooLongDirective1, tooLongDirective2, tooLongDirective3 }).forEach(directiveStr -> { + Assert.assertThrows(ScalingDirectiveParser.InvalidSyntaxException.class, () -> parser.parse(directiveStr)); + }); + } + + @DataProvider(name = "funkyButValidDirectives") + public String[][] validDirectives() { + return new String[][]{ + // null overlay upon unnamed baseline profile (null overlay functions as the 'identity element'): + { "1728435970.my_profile=24,+()" }, + { "1728435970.my_profile=24,-()" }, + { "1728435970.my_profile=24;+()" }, + { "1728435970.my_profile=24;-()" }, + + // null overlay upon named profile: + { "1728435970.my_profile=24,foo+()" }, + { "1728435970.my_profile=24,foo-()" }, + { "1728435970.my_profile=24;foo+()" }, + { "1728435970.my_profile=24;foo-()" }, + + // seemingly separator mismatch, but in fact the NOT-separator is part of the value (e.g. a="7;m=sixteen"): + { "1728439210.new_profile=16,bar+(a=7;m=sixteen)" }, + { "1728439210.new_profile=16;bar+(a=7,m=sixteen)" }, + { "1728439210.new_profile=16,bar+(a=7;)" }, + { "1728439210.new_profile=16;bar+(a=7,)" } + + // NOTE: unlike Adding, separator mismatch causes failure with the Removing overlay, because the NOT-separator is illegal in a key + }; + } + + @Test( + expectedExceptions = {}, + dataProvider = "funkyButValidDirectives" + ) + public void parseValidDirectives(String directive) throws ScalingDirectiveParser.InvalidSyntaxException { + Assert.assertNotNull(parser.parse(directive)); + } + + @DataProvider(name = "validDirectivesToRoundTripWithAsString") + public String[][] validDirectivesToRoundTripWithAsString() { + return new String[][]{ + { "2.some_profile=15" }, + { "6.extra_profile=9,the_basis+(a.b=foo, c.d=bar)" }, + { "6.extra_profile=9,the_basis-(a.b, c.d)" }, + // funky ones: + { "1728435970.my_profile=24,+()" }, + { "1728435970.my_profile=24,-()" }, + { "1728435970.my_profile=24,foo+()" }, + { "1728435970.my_profile=24,foo-()" } + }; + } + + @Test( + expectedExceptions = {}, + dataProvider = "validDirectivesToRoundTripWithAsString" + ) + public void roundTripAsStringFollowingSuccessfulParse(String directive) throws ScalingDirectiveParser.InvalidSyntaxException { + Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), directive); + } + + @DataProvider(name = "validDirectivesWithOverlayPlaceholder") + public String[][] validDirectivesWithOverlayPlaceholder() { + return new String[][]{ + { "6.extra_profile=9,the_basis+(...)" }, + { "6.extra_profile=9;the_basis+(...)" }, + { "6.extra_profile=9,the_basis-(...)" }, + { "6.extra_profile=9;the_basis-(...)" } + }; + } + + @Test( + expectedExceptions = ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition.class, + dataProvider = "validDirectivesWithOverlayPlaceholder" + ) + public void parseDirectivesWithPlaceholderThrowsOverlayPlaceholderNeedsDefinition(String directive) throws ScalingDirectiveParser.InvalidSyntaxException { + Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), directive); + } + + @DataProvider(name = "overlayPlaceholderDirectivesWithCompletionDefAndEquivalent") + public String[][] overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() { + return new String[][]{ + { "6.extra_profile=9,the_basis+(...)", "a=7,m=sixteen", "6.extra_profile=9,the_basis+(a=7,m=sixteen)" }, + { "6.extra_profile=9,the_basis+(...)", "a=7;m=sixteen", "6.extra_profile=9,the_basis+(a=7%3Bm%3Dsixteen)" }, // sep mismatch, so val == "7;m=sixteen" + { "6.extra_profile=9,the_basis+(...)", "a.b.c=7,l.m=sixteen%2C%20again", "6.extra_profile=9,the_basis+(a.b.c=7,l.m=sixteen%2C%20again)" }, + { "6.extra_profile=9;the_basis+(...)", "a=7,m=sixteen", "6.extra_profile=9;the_basis+(a=7%2Cm%3Dsixteen)" }, // sep mismatch, so val == "7,m=sixteen" + { "6.extra_profile=9;the_basis+(...)", "a=7;m=sixteen", "6.extra_profile=9;the_basis+(a=7;m=sixteen)" }, + { "6.extra_profile=9;the_basis+(...)", "a.b.c=7;l.m=sixteen%2C%20again", "6.extra_profile=9;the_basis+(a.b.c=7;l.m=sixteen%2C%20again)" }, + { "6.extra_profile=9,the_basis-(...)", "a.b,x.y.z", "6.extra_profile=9,the_basis-(a.b,x.y.z)" }, + { "6.extra_profile=9,the_basis-(...)", "x,y.z", "6.extra_profile=9,the_basis-(x,y.z)" }, + { "6.extra_profile=9;the_basis-(...)", "x;y.z", "6.extra_profile=9;the_basis-(x;y.z)" }, + { "6.extra_profile=9;the_basis-(...)", "a.b;x.y.z", "6.extra_profile=9;the_basis-(a.b;x.y.z)" } + }; + } + + @Test( + expectedExceptions = {}, + dataProvider = "overlayPlaceholderDirectivesWithCompletionDefAndEquivalent" + ) + public void verifyOverlayPlaceholderEquivalence(String directiveWithPlaceholder, String overlayDefinition, String equivDirective) + throws ScalingDirectiveParser.InvalidSyntaxException { + try { + parser.parse(directiveWithPlaceholder); + Assert.fail("Expected `ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the placeholder in the directive"); + } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) { + ScalingDirective directive = needsDefinition.retryParsingWithDefinition(overlayDefinition); + Assert.assertEquals(directive, parser.parse(equivDirective)); + } + } + + @DataProvider(name = "invalidDirectives") + public String[][] invalidDirectives() { + return new String[][] { + // invalid values: + { "invalid_timestamp.my_profile=24" }, + { "1728435970.my_profile=invalid_setpoint" }, + { "1728435970.my_profile=-15" }, + + // incomplete/fragments: + { "1728435970.my_profile=24," }, + { "1728435970.my_profile=24;" }, + { "1728435970.my_profile=24,+" }, + { "1728435970.my_profile=24,-" }, + { "1728435970.my_profile=24,foo+" }, + { "1728435970.my_profile=24,foo-" }, + { "1728435970.my_profile=24,foo+a=7" }, + { "1728435970.my_profile=24,foo-x" }, + + // adding: invalid set-point + missing token examples: + { "1728439210.new_profile=-6,bar+(a=7,m=sixteen)" }, + { "1728439210.new_profile=16,bar+(a=7,m=sixteen" }, + { "1728439210.new_profile=16,bar+a=7,m=sixteen)" }, + + // adding: key, instead of key-value pair: + { "1728439210.new_profile=16,bar+(a=7,m)" }, + { "1728439210.new_profile=16,bar+(a,m)" }, + + // adding: superfluous separator or used incorrectly as a terminator: + { "1728439210.new_profile=16,bar+(,)" }, + { "1728439210.new_profile=16;bar+(;)" }, + { "1728439210.new_profile=16,bar+(,,)" }, + { "1728439210.new_profile=16;bar+(;;)" }, + { "1728439210.new_profile=16,bar+(a=7,)" }, + { "1728439210.new_profile=16;bar+(a=7;)" }, + + // adding: overlay placeholder may not be used with key-value pairs: + { "1728439210.new_profile=16,bar+(a=7,...)" }, + { "1728439210.new_profile=16,bar+(...,b=4)" }, + { "1728439210.new_profile=16,bar+(a=7,...,b=4)" }, + { "1728439210.new_profile=16;bar+(a=7;...)" }, + { "1728439210.new_profile=16;bar+(...;b=4)" }, + { "1728439210.new_profile=16;bar+(a=7;...;b=4)" }, + + // removing: invalid set-point + missing token examples: + { "1728436436.other_profile=-9,my_profile-(x)" }, + { "1728436436.other_profile=69,my_profile-(x" }, + { "1728436436.other_profile=69,my_profile-x)" }, + + // removing: key-value pair instead of key: + { "1728436436.other_profile=69,my_profile-(x=y,z)" }, + { "1728436436.other_profile=69,my_profile-(x=y,z=1)" }, + + // removing: superfluous separator or used incorrectly as a terminator: + { "1728436436.other_profile=69,my_profile-(,)" }, + { "1728436436.other_profile=69;my_profile-(;)" }, + { "1728436436.other_profile=69,my_profile-(,,)" }, + { "1728436436.other_profile=69;my_profile-(;;)" }, + { "1728436436.other_profile=69,my_profile-(x,)" }, + { "1728436436.other_profile=69;my_profile-(x;)" }, + + // removing: overlay placeholder may not be used with keys: + { "1728436436.other_profile=69,my_profile-(x,...)" }, + { "1728436436.other_profile=69,my_profile-(...,z)" }, + { "1728436436.other_profile=69,my_profile-(x,...,z)" }, + { "1728436436.other_profile=69;my_profile-(x;...)" }, + { "1728436436.other_profile=69;my_profile-(...;z)" }, + { "1728436436.other_profile=69;my_profile-(x;...;z)" }, + + // removing: seemingly separator mismatch, but in fact the NOT-separator is illegal in a key (e.g. "x;y"): + { "1728436436.other_profile=69,my_profile-(x;y)" }, + { "1728436436.other_profile=69;my_profile-(x,y)" }, + { "1728436436.other_profile=69,my_profile-(x;)" }, + { "1728436436.other_profile=69;my_profile-(x,)" } + }; + } + + @Test( + expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class, + dataProvider = "invalidDirectives" + ) + public void parseInvalidDirectivesThrows(String directive) throws ScalingDirectiveParser.InvalidSyntaxException { + parser.parse(directive); + } + + @DataProvider(name = "overlayPlaceholderDirectivesWithInvalidCompletionDef") + public String[][] overlayPlaceholderDirectivesWithInvalidCompletionDef() { + return new String[][]{ + { "6.extra_profile=9,the_basis+(...)", "..." }, + { "6.extra_profile=9;the_basis+(...)", "..." }, + { "6.extra_profile=9,the_basis+(...)", "a=7," }, + { "6.extra_profile=9;the_basis+(...)", "a=7;" }, + { "6.extra_profile=9,the_basis+(...)", "a.b.c,l.m.n" }, + { "6.extra_profile=9;the_basis+(...)", "a.b.c;l.m.n" }, + { "6.extra_profile=9,the_basis-(...)", "..." }, + { "6.extra_profile=9;the_basis-(...)", "..." }, + { "6.extra_profile=9,the_basis-(...)", "a.b," }, + { "6.extra_profile=9;the_basis-(...)", "a.b;" }, + { "6.extra_profile=9,the_basis-(...)", "x=foo,y.z=bar" }, + { "6.extra_profile=9;the_basis-(...)", "x=foo;y.z=bar" } + }; + } + + @Test( + expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class, + dataProvider = "overlayPlaceholderDirectivesWithInvalidCompletionDef" + ) + public void parsePlaceholderDefWithInvalidPlaceholderThrows(String directiveWithPlaceholder, String invalidOverlayDefinition) + throws ScalingDirectiveParser.InvalidSyntaxException { + try { + parser.parse(directiveWithPlaceholder); + Assert.fail("Expected `ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the placeholder in the directive"); + } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition needsDefinition) { + Assert.assertNotNull(needsDefinition.retryParsingWithDefinition(invalidOverlayDefinition)); + } + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java new file mode 100644 index 00000000000..fc99bd9f94f --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.Assert; + + +/** Test {@link WorkforcePlan} */ +public class WorkforcePlanTest { + private Config baselineConfig = ConfigFactory.parseString("key1=value1, key2=value2"); + private final int initialBaselineSetPoint = 10; + private WorkforcePlan plan; + + @BeforeMethod + public void setUp() { + plan = new WorkforcePlan(baselineConfig, initialBaselineSetPoint); + } + + @Test + public void reviseWithValidReSetPoint() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 10000L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 20000L)); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 20000L); + Assert.assertEquals(plan.getNumProfiles(), 1); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(1), WorkforceProfiles.BASELINE_NAME_RENDERING); + } + + @Test + public void reviseWithValidDerivation() throws WorkforcePlan.IllegalRevisionException { + Assert.assertEquals(plan.getLastRevisionEpochMillis(), WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + Assert.assertEquals(plan.getNumProfiles(), 1); + ScalingDirective directive = createNewProfileDirective("new_profile", 5, 10000L, WorkforceProfiles.BASELINE_NAME); + plan.revise(directive); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 10000L); + Assert.assertEquals(plan.getNumProfiles(), 2); + Config expectedConfig = ConfigFactory.parseString("key1=new_value, key2=value2, key4=value4"); + Assert.assertEquals(plan.peepProfile("new_profile").getConfig(), expectedConfig); + + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(initialBaselineSetPoint), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5), "new_profile"); + } + + @Test + public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() { + AtomicInteger numErrors = new AtomicInteger(0); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + Assert.assertEquals(plan.getNumProfiles(), 1); + plan.reviseWhenNewer(Lists.newArrayList( + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L), + // (1) error: `OutdatedDirective` + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L), + // (2) error: `OutdatedDirective` + createNewProfileDirective("new_profile", 5, 450L, WorkforceProfiles.BASELINE_NAME), + // NOTE: the second attempt at derivation is NOT judged a duplicate, as the outdated timestamp of first attempt (above) meant it was ignored! + createNewProfileDirective("new_profile", 6, 600L, WorkforceProfiles.BASELINE_NAME), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L), + // (3) error: `OutdatedDirective` + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L) + ), failure -> numErrors.incrementAndGet()); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L); + Assert.assertEquals(plan.getNumProfiles(), 2); + Assert.assertEquals(numErrors.get(), 3); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6), "new_profile"); + } + + @Test + public void reviseWhenNewerRejectsErrorsAndContinues() { + AtomicInteger numErrors = new AtomicInteger(0); + plan.reviseWhenNewer(Lists.newArrayList( + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L), + // (1) error: `UnrecognizedProfile` + new ScalingDirective("UNKNOWN_PROFILE", 2, 250L), + createNewProfileDirective("new_profile", 3, 200L, WorkforceProfiles.BASELINE_NAME), + // (2) error: `Redefinition` + createNewProfileDirective("new_profile", 4, 450L, WorkforceProfiles.BASELINE_NAME), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 5, 300L), + // (3) error: `UnknownBasis` + createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"), + new ScalingDirective("new_profile", 7, 400L), + // (4) error: `OutdatedDirective` + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L), + createNewProfileDirective("another", 9, 500L, "new_profile") + ), failure -> numErrors.incrementAndGet()); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L); + Assert.assertEquals(plan.getNumProfiles(), 3); + Assert.assertEquals(numErrors.get(), 4); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7), "new_profile"); + Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9), "another"); + } + + @Test + public void calcStaffingDeltas() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 3, 10L, WorkforceProfiles.BASELINE_NAME)); + plan.revise(createNewProfileDirective("other_profile", 8, 20L, "new_profile")); + plan.revise(createNewProfileDirective("another", 7, 30L, "new_profile")); + plan.revise(new ScalingDirective("new_profile", 5, 40L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 50L)); + plan.revise(new ScalingDirective("another", 4, 60L)); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 60L); + Assert.assertEquals(plan.getNumProfiles(), 4); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(6), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5), "new_profile"); + Assert.assertEquals(plan.peepStaffing("another"), Optional.of(4), "another"); + Assert.assertEquals(plan.peepStaffing("other_profile"), Optional.of(8), "other_profile"); + + WorkforceStaffing referenceStaffing = WorkforceStaffing.initializeStaffing(100, ImmutableMap.of( + WorkforceProfiles.BASELINE_NAME, 100, + "new_profile", 1, + // not initialized - "another" + "other_profile", 8 + )); + StaffingDeltas deltas = plan.calcStaffingDeltas(referenceStaffing); + Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3); + deltas.getPerProfileDeltas().forEach(delta -> { + switch (delta.getProfile().getName()) { + case WorkforceProfiles.BASELINE_NAME: + Assert.assertEquals(delta.getDelta(), 6 - 100); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 50L); + break; + case "new_profile": + Assert.assertEquals(delta.getDelta(), 5 - 1); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 40L); + break; + case "another": + Assert.assertEquals(delta.getDelta(), 4 - 0); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 60L); + break; + case "other_profile": // NOTE: should NOT be present (since delta == 0)! + default: + Assert.fail("Unexpected profile: " + delta.getProfile().getName()); + } + }); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.OutOfOrderDirective.class) + public void reviseWithOutOfOrderDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 30000L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 12, 8000L)); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.Redefinition.class) + public void reviseWithRedefinitionDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 5, 10000L, WorkforceProfiles.BASELINE_NAME)); + plan.revise(createNewProfileDirective("new_profile", 9, 20000L, WorkforceProfiles.BASELINE_NAME)); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.UnknownBasis.class) + public void reviseWithUnknownBasisDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 5, 10000L, "NEVER_DEFINED")); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.UnrecognizedProfile.class) + public void reviseWithUnrecognizedProfileDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective("UNKNOWN_PROFILE", 7, 10000L)); + } + + private static ScalingDirective createNewProfileDirective(String profileName, int setPoint, long epochMillis, String basisProfileName) { + return new ScalingDirective(profileName, setPoint, epochMillis, Optional.of( + new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding( + new ProfileOverlay.KVPair("key1", "new_value"), + new ProfileOverlay.KVPair("key4", "value4"))))); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java new file mode 100644 index 00000000000..baed5f27788 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.Assert; + +import static org.mockito.ArgumentMatchers.anyString; + + +/** Test {@link WorkforceStaffing} */ +public class WorkforceStaffingTest { + + @Mock private WorkforceProfiles profiles; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + Mockito.when(profiles.getOrThrow(anyString())).thenAnswer(invocation -> + new WorkerProfile(invocation.getArgument(0), null)); + } + + @Test + public void initializeShouldSetInitialBaselineSetPoint() { + int initialBaselineSetPoint = 5; + WorkforceStaffing staffing = WorkforceStaffing.initialize(initialBaselineSetPoint); + Assert.assertEquals(staffing.getStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(initialBaselineSetPoint)); + } + + @Test + public void reviseStaffingShouldUpdateSetPoint() { + String profileName = "testProfile"; + WorkforceStaffing staffing = WorkforceStaffing.initialize(0); + staffing.reviseStaffing(profileName, 10, 5000L); + Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(10)); + + // NOTE: verify that `provenanceEpochMillis` is merely assoc. metadata, w/ no requirement for monotonic increase + staffing.reviseStaffing(profileName, 17, 2000L); + Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(17)); + } + + @Test + public void calcDeltasShouldReturnCorrectDeltas() { + String subsequentlyUnreferencedProfileName = "unreferenced"; + String newlyAddedProfileName = "added"; + String heldSteadyProfileName = "steady"; + WorkforceStaffing currentStaffing = WorkforceStaffing.initialize(5); + currentStaffing.reviseStaffing(subsequentlyUnreferencedProfileName, 3, 1000L); + currentStaffing.reviseStaffing(heldSteadyProfileName, 9, 2000L); + + WorkforceStaffing improvedStaffing = WorkforceStaffing.initialize(7); + improvedStaffing.updateSetPoint(newlyAddedProfileName, 10); + improvedStaffing.updateSetPoint(heldSteadyProfileName, 9); + + StaffingDeltas deltas = improvedStaffing.calcDeltas(currentStaffing, profiles); + Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3); + // validate every delta + Map deltaByProfileName = deltas.getPerProfileDeltas().stream() + .collect(Collectors.toMap(delta -> delta.getProfile().getName(), StaffingDeltas.ProfileDelta::getDelta)); + ImmutableMap expectedDeltaByProfileName = ImmutableMap.of( + WorkforceProfiles.BASELINE_NAME, 2, + subsequentlyUnreferencedProfileName, -3, + // NOTE: NOT present (since delta == 0)! + // heldSteadyProfileName, 0, + newlyAddedProfileName, 10 + ); + Assert.assertEqualsNoOrder(deltaByProfileName.keySet().toArray(), expectedDeltaByProfileName.keySet().toArray()); + Assert.assertEquals(deltaByProfileName.get(WorkforceProfiles.BASELINE_NAME), expectedDeltaByProfileName.get(WorkforceProfiles.BASELINE_NAME)); + Assert.assertEquals(deltaByProfileName.get(subsequentlyUnreferencedProfileName), expectedDeltaByProfileName.get(subsequentlyUnreferencedProfileName)); + Assert.assertEquals(deltaByProfileName.get(newlyAddedProfileName), expectedDeltaByProfileName.get(newlyAddedProfileName)); + } +}