diff --git a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java index ee5b4817112..b020f01b426 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java @@ -674,4 +674,10 @@ public class ConfigurationKeys { public static final Charset DEFAULT_CHARSET_ENCODING = Charsets.UTF_8; public static final String TEST_HARNESS_LAUNCHER_IMPL = "gobblin.testharness.launcher.impl"; public static final int PERMISSION_PARSING_RADIX = 8; + + /** + * Configuration properties related to continuous / streaming mode + */ + public static final String TASK_EXECUTION_MODE = "gobblin.task.executionMode"; + public static final String DEFAULT_TASK_EXECUTION_MODE = "BATCH"; } diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java b/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java new file mode 100644 index 00000000000..831c6df810b --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java @@ -0,0 +1,39 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.source.extractor; + +/** + * {@link Watermark} that is {@link Comparable} and Checkpointable + */ +public interface CheckpointableWatermark extends Watermark, Comparable { + + /** + * + * @return the unique id of the source that generated this watermark. + * Watermarks generated by different sources are not comparable and therefore need to be checkpoint-ed independently + */ + String getSource(); + + /** + * + * @return the source-local watermark. + */ + ComparableWatermark getWatermark(); +} diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/DefaultCheckpointableWatermark.java b/gobblin-api/src/main/java/gobblin/source/extractor/DefaultCheckpointableWatermark.java new file mode 100644 index 00000000000..9458cf9ed15 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/DefaultCheckpointableWatermark.java @@ -0,0 +1,75 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.source.extractor; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; + +import lombok.EqualsAndHashCode; + + +/** + * A {@link CheckpointableWatermark} that wraps a {@link ComparableWatermark} specific to a source. + */ +@EqualsAndHashCode +public class DefaultCheckpointableWatermark implements CheckpointableWatermark { + private static final Gson GSON = new Gson(); + + private final String source; + private final ComparableWatermark comparable; + + public DefaultCheckpointableWatermark(String source, ComparableWatermark comparableWatermark) { + this.source = source; + this.comparable = comparableWatermark; + } + + public String getSource() { + return this.source; + } + + @Override + public ComparableWatermark getWatermark() { + return this.comparable; + } + + @Override + public int compareTo(CheckpointableWatermark o) { + if (!(this.source.equals(o.getSource()))) { + throw new RuntimeException("Could not compare two checkpointable watermarks because they have different sources " + + this.source + ":" + o.getSource()); + } + return this.comparable.compareTo(o.getWatermark()); + } + + @Override + public JsonElement toJson() { + return comparable.toJson(); + } + + @Override + public short calculatePercentCompletion(Watermark lowWatermark, Watermark highWatermark) { + return comparable.calculatePercentCompletion(lowWatermark, highWatermark); + } + + @Override + public String toString() { + return String.format("%s : %s ", getSource(), GSON.toJson(this.comparable.toJson())); + } +} diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/RecordEnvelope.java b/gobblin-api/src/main/java/gobblin/source/extractor/RecordEnvelope.java new file mode 100644 index 00000000000..5c141b7e7ea --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/RecordEnvelope.java @@ -0,0 +1,48 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.source.extractor; + +public class RecordEnvelope { + + private D _record; + private final CheckpointableWatermark _watermark; + + public RecordEnvelope(D record, CheckpointableWatermark watermark) { + _record = record; + _watermark = watermark; + } + + public D getRecord() { + return _record; + } + + /** + * Swap the record attached to this watermark, useful when transmitting envelopes down a + * processing chain to save on object creation + */ + public RecordEnvelope setRecord(D record) { + _record = record; + return this; + } + + public CheckpointableWatermark getWatermark() { + return _watermark; + } +} diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/StreamingExtractor.java b/gobblin-api/src/main/java/gobblin/source/extractor/StreamingExtractor.java new file mode 100644 index 00000000000..92baefac33f --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/StreamingExtractor.java @@ -0,0 +1,26 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.source.extractor; + +/** + * An interface for implementing streaming / continuous extractors + */ +public interface StreamingExtractor extends Extractor> { +} diff --git a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java index fd18d1788ec..00a7f6819ac 100644 --- a/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java +++ b/gobblin-core-base/src/main/java/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java @@ -18,31 +18,42 @@ package gobblin.instrumented.writer; import java.io.IOException; +import java.util.Map; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import gobblin.configuration.State; import gobblin.instrumented.Instrumented; import gobblin.metrics.MetricContext; +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; import gobblin.util.Decorator; import gobblin.util.DecoratorUtils; import gobblin.util.FinalState; import gobblin.writer.DataWriter; +import gobblin.writer.WatermarkAwareWriter; /** * Decorator that automatically instruments {@link gobblin.writer.DataWriter}. Handles already instrumented * {@link gobblin.instrumented.writer.InstrumentedDataWriter} appropriately to avoid double metric reporting. */ -public class InstrumentedDataWriterDecorator extends InstrumentedDataWriterBase implements Decorator { +public class InstrumentedDataWriterDecorator extends InstrumentedDataWriterBase implements Decorator, WatermarkAwareWriter { private DataWriter embeddedWriter; private boolean isEmbeddedInstrumented; + private Optional watermarkAwareWriter; public InstrumentedDataWriterDecorator(DataWriter writer, State state) { super(state, Optional.> of(DecoratorUtils.resolveUnderlyingObject(writer).getClass())); this.embeddedWriter = this.closer.register(writer); this.isEmbeddedInstrumented = Instrumented.isLineageInstrumented(writer); + if (this.embeddedWriter instanceof WatermarkAwareWriter) { + this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) this.embeddedWriter); + } else { + this.watermarkAwareWriter = Optional.absent(); + } } @Override @@ -98,4 +109,28 @@ public State getFinalState() { public Object getDecoratedObject() { return this.embeddedWriter; } + + @Override + public boolean isWatermarkCapable() { + return watermarkAwareWriter.isPresent() && watermarkAwareWriter.get().isWatermarkCapable(); + } + + @Override + public void writeEnvelope(RecordEnvelope recordEnvelope) + throws IOException { + Preconditions.checkState(isWatermarkCapable()); + watermarkAwareWriter.get().writeEnvelope(recordEnvelope); + } + + @Override + public Map getCommittableWatermark() { + Preconditions.checkState(isWatermarkCapable()); + return watermarkAwareWriter.get().getCommittableWatermark(); + } + + @Override + public Map getUnacknowledgedWatermark() { + Preconditions.checkState(isWatermarkCapable()); + return watermarkAwareWriter.get().getUnacknowledgedWatermark(); + } } diff --git a/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriter.java b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriter.java new file mode 100644 index 00000000000..909c61c9970 --- /dev/null +++ b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriter.java @@ -0,0 +1,54 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.io.IOException; +import java.util.Map; + +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; + + +/** + * A DataWriter that is WatermarkAware. Required for implementing writers that + * can operate in streaming mode. + */ +public interface WatermarkAwareWriter extends DataWriter { + + /** + * + * @return true if the writer can support watermark-bearing record envelopes + */ + boolean isWatermarkCapable(); + + void writeEnvelope(final RecordEnvelope recordEnvelope) throws IOException; + /** + * @return A Watermark per source that can safely be committed because all records associated with it + * and earlier watermarks have been committed to the destination. Return empty if no such watermark exists. + * + */ + Map getCommittableWatermark(); + + /** + * + * @return The lowest watermark out of all pending write requests + */ + Map getUnacknowledgedWatermark(); +} diff --git a/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriterWrapper.java b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriterWrapper.java new file mode 100644 index 00000000000..f8bf990a7e6 --- /dev/null +++ b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkAwareWriterWrapper.java @@ -0,0 +1,57 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.io.IOException; +import java.util.Map; + +import com.google.common.base.Optional; + +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; + + +/** + * A convenience wrapper class for WatermarkAware writers. + */ +public abstract class WatermarkAwareWriterWrapper implements WatermarkAwareWriter { + private Optional watermarkAwareWriter = Optional.absent(); + + public final void setWatermarkAwareWriter(WatermarkAwareWriter watermarkAwareWriter) { + this.watermarkAwareWriter = Optional.of(watermarkAwareWriter); + } + + public final boolean isWatermarkCapable() { + return watermarkAwareWriter.get().isWatermarkCapable(); + } + + public final void writeEnvelope(final RecordEnvelope recordEnvelope) throws IOException { + watermarkAwareWriter.get().writeEnvelope(recordEnvelope); + } + + public final Map getCommittableWatermark() { + return watermarkAwareWriter.get().getCommittableWatermark(); + } + + public Map getUnacknowledgedWatermark() { + return watermarkAwareWriter.get().getUnacknowledgedWatermark(); + } + +} diff --git a/gobblin-core-base/src/main/java/gobblin/writer/WatermarkManager.java b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkManager.java new file mode 100644 index 00000000000..c7c5fd7ccf6 --- /dev/null +++ b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkManager.java @@ -0,0 +1,209 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import lombok.Getter; +import lombok.ToString; + +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.util.ExecutorsUtils; + + +/** + * Responsible for managing continuous commit of watermarks. + * Periodically fetches watermarks from WatermarkAwareWriters and commits them to WatermarkStorage. + * TODO: Add metrics monitoring + */ +public class WatermarkManager implements Closeable { + + @Getter + @ToString + public static class RetrievalStatus { + private long lastWatermarkRetrievalAttemptTimestampMillis = 0; + private long lastWatermarkRetrievalSuccessTimestampMillis = 0; + private long lastWatermarkRetrievalFailureTimestampMillis = 0; + private Map lastRetrievedWatermarks = Collections.EMPTY_MAP; + private Exception lastRetrievalException = null; + + synchronized void onAttempt() { + this.lastWatermarkRetrievalAttemptTimestampMillis = System.currentTimeMillis(); + } + + synchronized void onSuccess(Map retrievedWatermarks) { + this.lastWatermarkRetrievalSuccessTimestampMillis = System.currentTimeMillis(); + this.lastRetrievedWatermarks = retrievedWatermarks; + } + + synchronized void onFailure(Exception retrievalException) { + this.lastWatermarkRetrievalFailureTimestampMillis = System.currentTimeMillis(); + this.lastRetrievalException = retrievalException; + } + + } + + @Getter + @ToString + public static class CommitStatus { + private long lastWatermarkCommitAttemptTimestampMillis = 0; + private long lastWatermarkCommitSuccessTimestampMillis = 0; + private long lastWatermarkCommitFailureTimestampMillis = 0; + private Map lastCommittedWatermarks = Collections.EMPTY_MAP; + private Exception lastCommitException = null; + private Map lastFailedWatermarks = Collections.EMPTY_MAP; + + synchronized void onAttempt() { + lastWatermarkCommitAttemptTimestampMillis = System.currentTimeMillis(); + } + + synchronized void onSuccess(Map watermarksToCommit) { + lastWatermarkCommitSuccessTimestampMillis = System.currentTimeMillis(); + lastCommittedWatermarks = watermarksToCommit; + } + + synchronized void onFailure(Exception commitException, Map watermarksToCommit) { + lastWatermarkCommitFailureTimestampMillis = System.currentTimeMillis(); + lastCommitException = commitException; + lastFailedWatermarks = watermarksToCommit; + } + + } + + private final List _watermarkAwareWriters; + private final WatermarkStorage _watermarkStorage; + private final long _commitIntervalMillis; + private final ScheduledExecutorService _watermarkCommitThreadPool; + private final Logger _logger; + private final RetrievalStatus _retrievalStatus; + private final CommitStatus _commitStatus; + + @VisibleForTesting + final Runnable _watermarkCommitter = new + + Runnable() { + @Override + public void run() { + long startTime = System.nanoTime(); + Map watermarksToCommit = null; + try { + _retrievalStatus.onAttempt(); + WatermarkTracker watermarkTracker = new WatermarkTracker(); + for (WatermarkAwareWriter writer : _watermarkAwareWriters) { + Map writerWatermarks = writer.getCommittableWatermark(); + _logger.debug("Retrieved from writer {} : watermark {} ", writer.getClass().getName(), writerWatermarks); + watermarkTracker.committedWatermarks(writerWatermarks); + } + watermarksToCommit = watermarkTracker.getAllCommitableWatermarks(); + _retrievalStatus.onSuccess(watermarksToCommit); + } + catch (Exception e) { + _retrievalStatus.onFailure(e); + _logger.error("Failed to get watermark", e); + } + // Prevent multiple commits concurrently + synchronized (this) { + if (watermarksToCommit != null && !watermarksToCommit.isEmpty()) { + try { + _commitStatus.onAttempt(); + _logger.info("Will commit watermark {}", watermarksToCommit.toString()); + //TODO: Not checking if this watermark has already been committed successfully. + _watermarkStorage.commitWatermarks(watermarksToCommit.values()); + _commitStatus.onSuccess(watermarksToCommit); + } catch (Exception e) { + _commitStatus.onFailure(e, watermarksToCommit); + _logger.error("Failed to write watermark", e); + } + } else { + _logger.info("Nothing to commit"); + } + } + long duration = (System.nanoTime() - startTime)/1000000; + _logger.info("Duration of run {} milliseconds", duration); + } + }; + + + public WatermarkManager(WatermarkStorage storage, long commitIntervalMillis, Optional logger) { + Preconditions.checkArgument(storage != null, "WatermarkStorage cannot be null"); + _watermarkAwareWriters = new ArrayList<>(1); + _watermarkStorage = storage; + _commitIntervalMillis = commitIntervalMillis; + _logger = logger.or(LoggerFactory.getLogger(WatermarkManager.class)); + _watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(logger, + Optional.of("WatermarkManager-%d"))); + _retrievalStatus = new RetrievalStatus(); + _commitStatus = new CommitStatus(); + } + + public void registerWriter(WatermarkAwareWriter dataWriter) { + _watermarkAwareWriters.add(dataWriter); + _logger.info("Registered a watermark aware writer {}", dataWriter.getClass().getName()); + } + + public void start() { + _watermarkCommitThreadPool + .scheduleWithFixedDelay(_watermarkCommitter, 0, _commitIntervalMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void close() + throws IOException { + _logger.info("Watermark committer closing"); + _watermarkCommitThreadPool.shutdown(); + try { + long startTime = System.nanoTime(); + _watermarkCommitThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS); + long duration = (System.nanoTime() - startTime)/ 1000000; + _logger.info("Duration of termination wait was {} milliseconds", duration); + } + catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for committer to shutdown", ie); + } + finally { + // final watermark commit + _logger.info("Watermark committer: one last commit before shutting down"); + _watermarkCommitter.run(); + } + } + + public CommitStatus getCommitStatus() { + return _commitStatus; + } + + public RetrievalStatus getRetrievalStatus() { + return _retrievalStatus; + } +} diff --git a/gobblin-core-base/src/main/java/gobblin/writer/WatermarkStorage.java b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkStorage.java new file mode 100644 index 00000000000..37a86abfcd3 --- /dev/null +++ b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkStorage.java @@ -0,0 +1,31 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.io.IOException; + +import gobblin.source.extractor.CheckpointableWatermark; + + +public interface WatermarkStorage { + + void commitWatermarks(Iterable watermarks) throws IOException; + +} diff --git a/gobblin-core-base/src/main/java/gobblin/writer/WatermarkTracker.java b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkTracker.java new file mode 100644 index 00000000000..5c6a876f3e5 --- /dev/null +++ b/gobblin-core-base/src/main/java/gobblin/writer/WatermarkTracker.java @@ -0,0 +1,147 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.base.Optional; + +import gobblin.source.extractor.CheckpointableWatermark; + + +/** + * A helper class that tracks committed and uncommitted watermarks. + * Useful for building {@link WatermarkAwareWriter}s. + * + * Note: The current implementation is not meant to be used in a high-throughput scenario + * (e.g. in the path of a write or a callback) + */ +public class WatermarkTracker { + + + private final ConcurrentHashMap> candidateCommittables = new ConcurrentHashMap<>(); + //TreeSet<>(); + private final ConcurrentHashMap> unacknowledgedWatermarks = new ConcurrentHashMap<>(); + // + /** + * Reset current state + */ + public synchronized void reset() { + candidateCommittables.clear(); + unacknowledgedWatermarks.clear(); + } + + private synchronized Set getOrCreate(Map> map, String key) { + if (map.containsKey(key)) { + return map.get(key); + } else { + Set set = new TreeSet<>(); + map.put(key, set); + return set; + } + } + + public void committedWatermarks(Map committedMap) { + committedWatermarks(committedMap.values()); + } + + public void committedWatermarks(Iterable committedStream) { + for (CheckpointableWatermark committed: committedStream) { + committedWatermark(committed); + } + } + + + public void committedWatermark(CheckpointableWatermark committed) { + getOrCreate(candidateCommittables, committed.getSource()).add(committed); + } + + public void unacknowledgedWatermark(CheckpointableWatermark unacked) { + getOrCreate(unacknowledgedWatermarks, unacked.getSource()).add(unacked); + } + + public void unacknowledgedWatermarks(Map unackedMap) { + for (CheckpointableWatermark unacked: unackedMap.values()) { + unacknowledgedWatermark(unacked); + } + } + + + + public Map getAllCommitableWatermarks() { + Map commitables = new HashMap<>(candidateCommittables.size()); + for (String source: candidateCommittables.keySet()) { + Optional commitable = getCommittableWatermark(source); + if (commitable.isPresent()) { + commitables.put(commitable.get().getSource(), commitable.get()); + } + } + return commitables; + } + + + public Map getAllUnacknowledgedWatermarks() { + Map unackedMap = new HashMap<>(unacknowledgedWatermarks.size()); + for (String source: unacknowledgedWatermarks.keySet()) { + Optional unacked = getUnacknowledgedWatermark(source); + if (unacked.isPresent()) { + unackedMap.put(unacked.get().getSource(), unacked.get()); + } + } + return unackedMap; + } + + + public Optional getCommittableWatermark(String source) { + Set unacked = unacknowledgedWatermarks.get(source); + + + CheckpointableWatermark + minUnacknowledgedWatermark = (unacked == null || unacked.isEmpty())? null: unacked.iterator().next(); + + CheckpointableWatermark highestCommitableWatermark = null; + for (CheckpointableWatermark commitableWatermark : candidateCommittables.get(source)) { + if ((minUnacknowledgedWatermark == null) || (commitableWatermark.compareTo(minUnacknowledgedWatermark) < 0)) { + // commitableWatermark < minUnacknowledgedWatermark + highestCommitableWatermark = commitableWatermark; + } + } + if (highestCommitableWatermark == null) { + return Optional.absent(); + } else { + return Optional.of(highestCommitableWatermark); + } + + } + + public Optional getUnacknowledgedWatermark(String source) { + Set unacked = unacknowledgedWatermarks.get(source); + + if (unacked.isEmpty()) { + return Optional.absent(); + } else { + return Optional.of(unacked.iterator().next()); + } + } +} diff --git a/gobblin-core-base/src/test/java/gobblin/writer/WatermarkManagerTest.java b/gobblin-core-base/src/test/java/gobblin/writer/WatermarkManagerTest.java new file mode 100644 index 00000000000..549cbb92cf4 --- /dev/null +++ b/gobblin-core-base/src/test/java/gobblin/writer/WatermarkManagerTest.java @@ -0,0 +1,344 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; + +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.DefaultCheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; +import gobblin.source.extractor.extract.LongWatermark; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + + +@Test +public class WatermarkManagerTest { + + + @Test + public void testConstructor() { + + WatermarkStorage watermarkStorage = null; + try { + WatermarkManager watermarkManager = new WatermarkManager(watermarkStorage, 0, Optional.absent()); + Assert.fail("Should have thrown an exception"); + } catch (Exception e) { + + } + } + + /** + * Test that when no sources are registered, no side effects are observed + */ + @Test + public void testNoRegisteredSource() + throws IOException, InterruptedException { + + WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class); + WatermarkManager watermarkManager = new WatermarkManager(mockWatermarkStorage, 1000, Optional.absent()); + try { + watermarkManager.start(); + } catch (Exception e) { + Assert.fail("Should not throw exception", e); + } + + Thread.sleep(2000); + + watermarkManager.close(); + verify(mockWatermarkStorage, times(0)).commitWatermarks(any(Iterable.class)); + + WatermarkManager.CommitStatus watermarkMgrStatus = watermarkManager.getCommitStatus(); + Assert.assertTrue(watermarkMgrStatus.getLastCommittedWatermarks().isEmpty(), + "Last committed watermarks should be empty"); + Assert.assertEquals(watermarkMgrStatus.getLastWatermarkCommitSuccessTimestampMillis(), 0 , + "Last committed watermark timestamp should be 0"); + + } + + /** + * Test that when we have commits failing to watermark storage, the manager continues to try + * at every interval and keeps track of the exception it is seeing. + */ + @Test + public void testFailingWatermarkStorage() + throws IOException, InterruptedException { + + WatermarkStorage reallyBadWatermarkStorage = mock(WatermarkStorage.class); + IOException exceptionToThrow = new IOException("Failed to write coz the programmer told me to"); + + doThrow(exceptionToThrow).when(reallyBadWatermarkStorage).commitWatermarks(any(Iterable.class)); + + + long commitInterval = 1000; + + WatermarkManager watermarkManager = new WatermarkManager(reallyBadWatermarkStorage, commitInterval, Optional.absent()); + + WatermarkAwareWriter mockWriter = mock(WatermarkAwareWriter.class); + CheckpointableWatermark watermark = new DefaultCheckpointableWatermark("default", new LongWatermark(0)); + when(mockWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap("default", watermark)); + watermarkManager.registerWriter(mockWriter); + try { + watermarkManager.start(); + } catch (Exception e) { + Assert.fail("Should not throw exception", e); + } + + Thread.sleep(commitInterval * 2 + (commitInterval/2)); // sleep for 2.5 iterations + watermarkManager.close(); + int expectedCalls = 3; // 2 calls from iterations, 1 additional attempt due to close + verify(reallyBadWatermarkStorage, atLeast(expectedCalls)).commitWatermarks(any(Iterable.class)); + Assert.assertEquals(watermarkManager.getCommitStatus().getLastCommitException(), exceptionToThrow, + "Testing tracking of failed exceptions"); + + } + + + private WatermarkAwareWriter getFlakyWatermarkWriter(final long failEvery) { + WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() { + + private long watermark = 0; + + @Override + public boolean isWatermarkCapable() { + return true; + } + + @Override + public void writeEnvelope(RecordEnvelope recordEnvelope) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Map getCommittableWatermark() { + watermark++; + if (watermark % failEvery == 0) { + throw new RuntimeException("Failed because you asked me to"); + } + return Collections.singletonMap("default", + (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark))); + } + + @Override + public Map getUnacknowledgedWatermark() { + return null; + } + + @Override + public void write(Object record) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void commit() + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void cleanup() + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long recordsWritten() { + return 0; + } + + @Override + public long bytesWritten() + throws IOException { + return 0; + } + + @Override + public void close() + throws IOException { + + } + }; + + return mockWatermarkWriter; + + } + + /** + * Test that in the presence of flaky Watermark writers, we continue to log retrieval status correctly + */ + @Test + public void testRetrievalStatus() + throws InterruptedException, IOException { + + WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class); + + WatermarkManager watermarkManager = new WatermarkManager(mockWatermarkStorage, 1000, Optional.absent()); + + watermarkManager.registerWriter(getFlakyWatermarkWriter(2)); + + + try { + watermarkManager.start(); + } catch (Exception e) { + Assert.fail("Should not throw exception", e); + } + + Thread.sleep(2000); + + watermarkManager.close(); + + WatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus(); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() > 0); + System.out.println(retrievalStatus); + + } + + /** + * Test that in the presence of intermittent commit successes and failures, we continue to make progress + */ + @Test + public void testFlakyWatermarkStorage() + throws IOException, InterruptedException { + + final int failEvery = 2; + + WatermarkStorage mockWatermarkStorage = new WatermarkStorage() { + private int watermarkInstance = 0; + private List checkpointed = new ArrayList<>(); + @Override + public void commitWatermarks(java.lang.Iterable watermarks) + throws IOException { + ++watermarkInstance; + if (watermarkInstance % failEvery == 0) { + throw new IOException("Failed to write"); + } else { + checkpointed.clear(); + for (CheckpointableWatermark watermark: watermarks) { + checkpointed.add(watermark); + } + } + } + }; + + + WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() { + + private long watermark = 0; + + @Override + public boolean isWatermarkCapable() { + return true; + } + + @Override + public void writeEnvelope(RecordEnvelope recordEnvelope) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Map getCommittableWatermark() { + watermark++; + return Collections.singletonMap("default", + (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark))); + } + + @Override + public Map getUnacknowledgedWatermark() { + return null; + } + + @Override + public void write(Object record) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void commit() + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void cleanup() + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long recordsWritten() { + return 0; + } + + @Override + public long bytesWritten() + throws IOException { + return 0; + } + + @Override + public void close() + throws IOException { + + } + }; + + WatermarkManager watermarkManager = new WatermarkManager(mockWatermarkStorage, 1000, Optional.absent()); + + watermarkManager.registerWriter(mockWatermarkWriter); + + + try { + watermarkManager.start(); + } catch (Exception e) { + Assert.fail("Should not throw exception", e); + } + + Thread.sleep(2000); + + watermarkManager.close(); + + WatermarkManager.CommitStatus commitStatus = watermarkManager.getCommitStatus(); + System.out.println(commitStatus); + WatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus(); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0); + Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() == 0); + System.out.println(retrievalStatus); + + } + +} diff --git a/gobblin-core-base/src/test/java/gobblin/writer/WatermarkTrackerTest.java b/gobblin-core-base/src/test/java/gobblin/writer/WatermarkTrackerTest.java new file mode 100644 index 00000000000..c1b08cd6eb7 --- /dev/null +++ b/gobblin-core-base/src/test/java/gobblin/writer/WatermarkTrackerTest.java @@ -0,0 +1,67 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.writer; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import gobblin.source.extractor.DefaultCheckpointableWatermark; +import gobblin.source.extractor.extract.LongWatermark; + + +@Test +public class WatermarkTrackerTest { + + + private void commits(WatermarkTracker watermarkTracker, String source, int... commit) + { + for (int oneCommit: commit) { + watermarkTracker.committedWatermark(new DefaultCheckpointableWatermark(source, new LongWatermark(oneCommit))); + } + } + + @Test + public void testSingleSource() { + + WatermarkTracker watermarkTracker = new WatermarkTracker(); + commits(watermarkTracker, "default", 0, 4, 5, 6); + + Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default"); + Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default") + .get().getWatermark()).getValue(), 6L); + } + + + @Test + public void testMultiSource() { + WatermarkTracker watermarkTracker = new WatermarkTracker(); + commits(watermarkTracker, "default", 0, 4, 5, 6); + commits(watermarkTracker, "other", 1, 3, 5, 7); + + Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default"); + Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default") + .get().getWatermark()).getValue(), 6L); + Assert.assertEquals(watermarkTracker.getCommittableWatermark("other").get().getSource(), "other"); + Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("other") + .get().getWatermark()).getValue(), 7L); + + } + +} diff --git a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java index 653024b34f4..b4aa9c59f64 100644 --- a/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/gobblin/writer/PartitionedDataWriter.java @@ -17,8 +17,6 @@ package gobblin.writer; -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -35,11 +33,15 @@ import com.google.common.cache.LoadingCache; import com.google.common.io.Closer; +import lombok.extern.slf4j.Slf4j; + import gobblin.commit.SpeculativeAttemptAwareConstruct; import gobblin.configuration.ConfigurationKeys; import gobblin.configuration.State; import gobblin.instrumented.writer.InstrumentedDataWriterDecorator; import gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator; +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; import gobblin.util.AvroUtils; import gobblin.util.FinalState; import gobblin.writer.partitioner.WriterPartitioner; @@ -52,7 +54,7 @@ * @param record type. */ @Slf4j -public class PartitionedDataWriter implements DataWriter, FinalState, SpeculativeAttemptAwareConstruct { +public class PartitionedDataWriter implements DataWriter, FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter { private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord()); @@ -65,10 +67,12 @@ public class PartitionedDataWriter implements DataWriter, FinalState, S private final boolean shouldPartition; private final Closer closer; private boolean isSpeculativeAttemptSafe; + private boolean isWatermarkCapable; public PartitionedDataWriter(DataWriterBuilder builder, final State state) throws IOException { this.isSpeculativeAttemptSafe = true; + this.isWatermarkCapable = true; this.baseWriterId = builder.getWriterId(); this.closer = Closer.create(); this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader>() { @@ -104,25 +108,35 @@ public DataWriter load(final GenericRecord key) InstrumentedDataWriterDecorator writer = this.closer.register(new InstrumentedDataWriterDecorator<>(dataWriter, state)); this.isSpeculativeAttemptSafe = this.isDataWriterForPartitionSafe(dataWriter); + this.isWatermarkCapable = this.isDataWriterWatermarkCapable(dataWriter); this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, writer); this.partitioner = Optional.absent(); this.builder = Optional.absent(); } } + private boolean isDataWriterWatermarkCapable(DataWriter dataWriter) { + return (dataWriter instanceof WatermarkAwareWriter) && (((WatermarkAwareWriter) dataWriter).isWatermarkCapable()); + } + @Override public void write(D record) throws IOException { try { - GenericRecord partition = - this.shouldPartition ? this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY; - DataWriter writer = this.partitionWriters.get(partition); + DataWriter writer = getDataWriterForRecord(record); writer.write(record); } catch (ExecutionException ee) { throw new IOException(ee); } } + private DataWriter getDataWriterForRecord(D record) + throws ExecutionException { + GenericRecord partition = + this.shouldPartition ? this.partitioner.get().partitionForRecord(record) : NON_PARTITIONED_WRITER_KEY; + return this.partitionWriters.get(partition); + } + @Override public void commit() throws IOException { @@ -190,6 +204,7 @@ private DataWriter createPartitionWriter(GenericRecord partition) DataWriter dataWriter = this.builder.get().forPartition(partition).withWriterId(this.baseWriterId + "_" + this.writerIdSuffix++) .build(); this.isSpeculativeAttemptSafe = this.isSpeculativeAttemptSafe && this.isDataWriterForPartitionSafe(dataWriter); + this.isWatermarkCapable = this.isWatermarkCapable && this.isDataWriterWatermarkCapable(dataWriter); return dataWriter; } @@ -233,4 +248,59 @@ private boolean isDataWriterForPartitionSafe(DataWriter dataWriter) { return dataWriter instanceof SpeculativeAttemptAwareConstruct && ((SpeculativeAttemptAwareConstruct) dataWriter).isSpeculativeAttemptSafe(); } + + @Override + public boolean isWatermarkCapable() { + return this.isWatermarkCapable; + } + + @Override + public void writeEnvelope(RecordEnvelope recordEnvelope) + throws IOException { + try { + DataWriter writer = getDataWriterForRecord(recordEnvelope.getRecord()); + // Unsafe cast, presumably we've checked earlier through isWatermarkCapable() + // that we are wrapping watermark aware wrappers + ((WatermarkAwareWriter) writer).writeEnvelope(recordEnvelope); + } catch (ExecutionException ee) { + throw new IOException(ee); + } + } + + @Override + public Map getCommittableWatermark() { + // The committable watermark from a collection of commitable and unacknowledged watermarks is the highest + // committable watermark that is less than the lowest unacknowledged watermark + + WatermarkTracker watermarkTracker = new WatermarkTracker(); + for (Map.Entry> entry : this.partitionWriters.asMap().entrySet()) { + if (entry.getValue() instanceof WatermarkAwareWriter) { + Map commitableWatermarks = + ((WatermarkAwareWriter) entry.getValue()).getCommittableWatermark(); + if (!commitableWatermarks.isEmpty()) { + watermarkTracker.committedWatermarks(commitableWatermarks); + } + + Map unacknowledgedWatermark = + ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark(); + if (!unacknowledgedWatermark.isEmpty()) { + watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark); + } + } + } + return watermarkTracker.getAllCommitableWatermarks(); //TODO: Change this to use List of committables instead + } + + @Override + public Map getUnacknowledgedWatermark() { + WatermarkTracker watermarkTracker = new WatermarkTracker(); + for (Map.Entry> entry : this.partitionWriters.asMap().entrySet()) { + Map unacknowledgedWatermark = + ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark(); + if (!unacknowledgedWatermark.isEmpty()) { + watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark); + } + } + return watermarkTracker.getAllUnacknowledgedWatermarks(); + } } diff --git a/gobblin-core/src/main/java/gobblin/writer/RetryWriter.java b/gobblin-core/src/main/java/gobblin/writer/RetryWriter.java index 29ff9c8332e..02951111243 100644 --- a/gobblin-core/src/main/java/gobblin/writer/RetryWriter.java +++ b/gobblin-core/src/main/java/gobblin/writer/RetryWriter.java @@ -16,13 +16,6 @@ */ package gobblin.writer; -import gobblin.commit.SpeculativeAttemptAwareConstruct; -import gobblin.configuration.State; -import gobblin.instrumented.Instrumented; -import gobblin.metrics.GobblinMetrics; -import gobblin.writer.exception.NonTransientException; -import gobblin.util.FinalState; - import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -42,11 +35,18 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; +import gobblin.commit.SpeculativeAttemptAwareConstruct; +import gobblin.configuration.State; +import gobblin.instrumented.Instrumented; +import gobblin.metrics.GobblinMetrics; +import gobblin.util.FinalState; +import gobblin.writer.exception.NonTransientException; + /** * Retry writer follows decorator pattern that retries on inner writer's failure. * @param */ -public class RetryWriter implements DataWriter, FinalState, SpeculativeAttemptAwareConstruct { +public class RetryWriter extends WatermarkAwareWriterWrapper implements DataWriter, FinalState, SpeculativeAttemptAwareConstruct { private static final Logger LOG = LoggerFactory.getLogger(RetryWriter.class); public static final String RETRY_CONF_PREFIX = "gobblin.writer.retry."; public static final String FAILED_RETRY_WRITES_METER = RETRY_CONF_PREFIX + "failed_writes"; @@ -62,6 +62,9 @@ public class RetryWriter implements DataWriter, FinalState, SpeculativeAtt public RetryWriter(DataWriter writer, State state) { this.writer = writer; this.retryer = buildRetryer(state); + if (this.writer instanceof WatermarkAwareWriter) { + setWatermarkAwareWriter((WatermarkAwareWriter) this.writer); + } } /** @@ -118,6 +121,7 @@ public Void call() throws Exception { callWithRetry(writeCall); } + @Override public void commit() throws IOException { Callable commitCall = new Callable() { @@ -195,4 +199,5 @@ public State getFinalState() { state.setProp(FAILED_WRITES_KEY, this.failedWrites); return state; } + } diff --git a/gobblin-core/src/test/java/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/gobblin/writer/PartitionedWriterTest.java index 6782157c357..ba513aa04ea 100644 --- a/gobblin-core/src/test/java/gobblin/writer/PartitionedWriterTest.java +++ b/gobblin-core/src/test/java/gobblin/writer/PartitionedWriterTest.java @@ -17,15 +17,27 @@ package gobblin.writer; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + import gobblin.configuration.ConfigurationKeys; import gobblin.configuration.State; +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.DefaultCheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; +import gobblin.source.extractor.extract.LongWatermark; import gobblin.writer.test.TestPartitionAwareWriterBuilder; import gobblin.writer.test.TestPartitioner; -import java.io.IOException; - -import org.testng.Assert; -import org.testng.annotations.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** @@ -107,4 +119,48 @@ public void test() throws IOException { action = builder.actions.poll(); Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.COMMIT); } + + @Test + public void testWatermarkComputation() throws IOException { + testWatermarkComputation(0L, 1L, 0L); + testWatermarkComputation(1L, 0L, null); + testWatermarkComputation(0L, 0L, null); + testWatermarkComputation(20L, 1L, null); + } + + public void testWatermarkComputation(Long committed, Long unacknowledged, Long expected) throws IOException { + State state = new State(); + state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName()); + + String defaultSource = "default"; + + WatermarkAwareWriter mockDataWriter = mock(WatermarkAwareWriter.class); + when(mockDataWriter.isWatermarkCapable()).thenReturn(true); + when(mockDataWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap(defaultSource, + new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(committed)))); + when(mockDataWriter.getUnacknowledgedWatermark()).thenReturn(Collections.singletonMap(defaultSource, + new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(unacknowledged)))); + + PartitionAwareDataWriterBuilder builder = mock(PartitionAwareDataWriterBuilder.class); + when(builder.validatePartitionSchema(any(Schema.class))).thenReturn(true); + when(builder.forPartition(any(GenericRecord.class))).thenReturn(builder); + when(builder.withWriterId(any(String.class))).thenReturn(builder); + when(builder.build()).thenReturn(mockDataWriter); + + PartitionedDataWriter writer = new PartitionedDataWriter(builder, state); + + RecordEnvelope recordEnvelope = new RecordEnvelope<>("0", + new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(0))); + writer.writeEnvelope(recordEnvelope); + + Map watermark = writer.getCommittableWatermark(); + System.out.println(watermark.toString()); + if (expected == null) { + Assert.assertTrue(watermark.isEmpty(), "Expected watermark to be absent"); + } else { + Assert.assertTrue(watermark.size() == 1); + Assert.assertEquals((long) expected, ((LongWatermark) watermark.values().iterator().next().getWatermark()).getValue()); + } + } + } diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/ExecutionModel.java b/gobblin-runtime/src/main/java/gobblin/runtime/ExecutionModel.java new file mode 100644 index 00000000000..5d0e8196a4f --- /dev/null +++ b/gobblin-runtime/src/main/java/gobblin/runtime/ExecutionModel.java @@ -0,0 +1,28 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.runtime; + +/** + * An Enum to capture the execution model of a specific Gobblin task. + */ +public enum ExecutionModel { + BATCH, // Tasks start and stop + STREAMING // Tasks run continuously until failure / termination +} diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/Fork.java b/gobblin-runtime/src/main/java/gobblin/runtime/Fork.java index bfab384e839..662ef25067b 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/Fork.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/Fork.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; @@ -45,6 +46,7 @@ import gobblin.qualitychecker.row.RowLevelPolicyChecker; import gobblin.qualitychecker.task.TaskLevelPolicyCheckResults; import gobblin.runtime.util.TaskMetrics; +import gobblin.source.extractor.RecordEnvelope; import gobblin.state.ConstructState; import gobblin.util.FinalState; import gobblin.util.ForkOperatorUtils; @@ -53,6 +55,7 @@ import gobblin.writer.DataWriterWrapperBuilder; import gobblin.writer.Destination; import gobblin.writer.PartitionedDataWriter; +import gobblin.writer.WatermarkAwareWriter; /** @@ -95,6 +98,7 @@ enum ForkState { private final int branches; private final int index; + private final ExecutionModel executionModel; private final Converter converter; private final Optional convertedSchema; @@ -120,8 +124,9 @@ enum ForkState { private final AtomicReference forkState; private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName"; + private static final Object SHUTDOWN_RECORD = new Object(); - public Fork(TaskContext taskContext, Object schema, int branches, int index) + public Fork(TaskContext taskContext, Object schema, int branches, int index, ExecutionModel executionModel) throws Exception { this.logger = LoggerFactory.getLogger(Fork.class.getName() + "-" + index); @@ -134,6 +139,7 @@ public Fork(TaskContext taskContext, Object schema, int branches, int index) this.branches = branches; this.index = index; + this.executionModel = executionModel; this.converter = this.closer.register(new MultiConverter(this.taskContext.getConverters(this.index, this.forkTaskState))); @@ -141,9 +147,10 @@ public Fork(TaskContext taskContext, Object schema, int branches, int index) this.rowLevelPolicyChecker = this.closer.register(this.taskContext.getRowLevelPolicyChecker(this.index)); this.rowLevelPolicyCheckingResult = new RowLevelPolicyCheckResults(); + // Build writer eagerly if configured, or if streaming is enabled boolean useEagerWriterInitialization = this.taskState .getPropAsBoolean(ConfigurationKeys.WRITER_EAGER_INITIALIZATION_KEY, - ConfigurationKeys.DEFAULT_WRITER_EAGER_INITIALIZATION); + ConfigurationKeys.DEFAULT_WRITER_EAGER_INITIALIZATION) || isStreamingMode(); if (useEagerWriterInitialization) { buildWriterIfNotPresent(); } @@ -171,6 +178,10 @@ public Fork(TaskContext taskContext, Object schema, int branches, int index) } } + private boolean isStreamingMode() { + return this.executionModel.equals(ExecutionModel.STREAMING); + } + @Override public void run() { compareAndSetForkState(ForkState.PENDING, ForkState.RUNNING); @@ -238,6 +249,11 @@ public boolean putRecord(Object record) */ public void markParentTaskDone() { this.parentTaskDone = true; + try { + this.putRecord(SHUTDOWN_RECORD); + } catch (InterruptedException e) { + this.logger.info("Interrupted while writing a shutdown record into the fork queue. Ignoring"); + } } /** @@ -408,18 +424,33 @@ private void processRecords() while (true) { try { Object record = this.recordQueue.get(); - if (record == null) { + if (record == null || record == SHUTDOWN_RECORD) { // The parent task has already done pulling records so no new record means this fork is done if (this.parentTaskDone) { return; + } else { + this.logger.error("Found a {} record but parent task is not done. Aborting this fork.", record); + throw new AssertionError("Found a " + record + " record but parent task is not done. Aborting this fork."); } } else { - buildWriterIfNotPresent(); - - // Convert the record, check its data quality, and finally write it out if quality checking passes. - for (Object convertedRecord : this.converter.convertRecord(this.convertedSchema, record, this.taskState)) { - if (this.rowLevelPolicyChecker.executePolicies(convertedRecord, this.rowLevelPolicyCheckingResult)) { - this.writer.get().write(convertedRecord); + if (isStreamingMode()) { + // Unpack the record from its container + RecordEnvelope recordEnvelope = (RecordEnvelope) record; + // Convert the record, check its data quality, and finally write it out if quality checking passes. + for (Object convertedRecord : this.converter.convertRecord(this.convertedSchema, recordEnvelope.getRecord(), this.taskState)) { + if (this.rowLevelPolicyChecker.executePolicies(convertedRecord, this.rowLevelPolicyCheckingResult)) { + // Preserve watermark, swap record + ((WatermarkAwareWriter) this.writer.get()).writeEnvelope(recordEnvelope.setRecord(convertedRecord)); + } + } + } else { + buildWriterIfNotPresent(); + + // Convert the record, check its data quality, and finally write it out if quality checking passes. + for (Object convertedRecord : this.converter.convertRecord(this.convertedSchema, record, this.taskState)) { + if (this.rowLevelPolicyChecker.executePolicies(convertedRecord, this.rowLevelPolicyCheckingResult)) { + this.writer.get().write(convertedRecord); + } } } } @@ -554,4 +585,9 @@ public boolean isSpeculativeExecutionSafe() { } return ((SpeculativeAttemptAwareConstruct) this.writer.get()).isSpeculativeAttemptSafe(); } + + public DataWriter getWriter() throws IOException { + Preconditions.checkState(this.writer.isPresent(), "Asked to get a writer, but writer is null"); + return this.writer.get(); + } } diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/gobblin/runtime/Task.java index 1c79c11aa98..1115c64008f 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/Task.java @@ -23,7 +23,10 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +56,16 @@ import gobblin.qualitychecker.row.RowLevelPolicyCheckResults; import gobblin.qualitychecker.row.RowLevelPolicyChecker; import gobblin.runtime.util.TaskMetrics; +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.Extractor; import gobblin.source.extractor.JobCommitPolicy; +import gobblin.source.extractor.RecordEnvelope; +import gobblin.source.extractor.StreamingExtractor; import gobblin.state.ConstructState; +import gobblin.writer.DataWriter; +import gobblin.writer.WatermarkAwareWriter; +import gobblin.writer.WatermarkManager; +import gobblin.writer.WatermarkStorage; /** @@ -104,10 +115,19 @@ public class Task implements Runnable { private final Converter converter; private final InstrumentedExtractorBase extractor; private final RowLevelPolicyChecker rowChecker; + private final ExecutionModel taskMode; + private Optional watermarkManager; private final Closer closer; private long startTime; + private volatile long lastRecordPulledTimestampMillis; + private final AtomicLong recordsPulled; + + private final AtomicBoolean shutdownRequested; + private final CountDownLatch shutdownLatch; + private AtomicBoolean isShutdown; + /** * Instantiate a new {@link Task}. @@ -141,6 +161,74 @@ public Task(TaskContext context, TaskStateTracker taskStateTracker, TaskExecutor } throw new RuntimeException("Failed to instantiate row checker.", e); } + + this.taskMode = getTaskMode(this.taskContext); + this.recordsPulled = new AtomicLong(0); + this.lastRecordPulledTimestampMillis = 0; + this.shutdownRequested = new AtomicBoolean(false); + this.isShutdown = new AtomicBoolean(false); + this.shutdownLatch = new CountDownLatch(1); + } + + private ExecutionModel getTaskMode(TaskContext taskContext) { + String mode = taskContext.getTaskState().getProp(ConfigurationKeys.TASK_EXECUTION_MODE, + ConfigurationKeys.DEFAULT_TASK_EXECUTION_MODE); + try { + return ExecutionModel.valueOf(mode.toUpperCase()); + } catch (Exception e) { + LOG.warn("Could not find an execution model corresponding to {}, returning {}", mode, ExecutionModel.BATCH, e); + return ExecutionModel.BATCH; + } + } + + private boolean isStreamingTask() { + return this.taskMode.equals(ExecutionModel.STREAMING); + } + + + private boolean isShutdown() { + return this.isShutdown.get(); + } + + public boolean awaitShutdown(long timeoutInMillis) + throws InterruptedException { + this.shutdownLatch.await(timeoutInMillis, TimeUnit.MILLISECONDS); + return this.isShutdown.get(); + } + + private void completeShutdown() { + this.isShutdown.set(true); + this.shutdownLatch.countDown(); + } + + private boolean shutdownRequested() { + if (!this.shutdownRequested.get()) { + this.shutdownRequested.set(Thread.currentThread().isInterrupted()); + } + return this.shutdownRequested.get(); + } + + public void shutdown() { + this.shutdownRequested.set(true); + } + + public String getProgress() { + long currentTime = System.currentTimeMillis(); + long lastRecordTimeElapsed = currentTime - this.lastRecordPulledTimestampMillis; + if (isStreamingTask()) { + WatermarkManager.CommitStatus commitStatus = this.watermarkManager.get().getCommitStatus(); + long lastWatermarkCommitTimeElapsed = currentTime - commitStatus.getLastWatermarkCommitSuccessTimestampMillis(); + + String progressString = String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago, " + + "lastWatermarkCommitted: %d ms ago, lastWatermarkCommitted: %s", + this.recordsPulled.get(), lastRecordTimeElapsed, lastWatermarkCommitTimeElapsed, + commitStatus.getLastCommittedWatermarks()); + return progressString; + } else { + String progressString = String.format("recordsPulled:%d, lastRecordExtracted: %d ms ago", + this.recordsPulled.get(), lastRecordTimeElapsed); + return progressString; + } } @Override @@ -174,12 +262,44 @@ public void run() { throw new CopyNotSupportedException(schema + " is not copyable"); } + if (isStreamingTask()) + { + Extractor underlyingExtractor = this.taskContext.getExtractor(); + if (!(underlyingExtractor instanceof StreamingExtractor)) { + LOG.error("Extractor {} is not an instance of StreamingExtractor but the task is configured to run in continuous mode", underlyingExtractor.getClass().getName()); + throw new TaskInstantiationException("Extraction " + underlyingExtractor.getClass().getName() + + " is not an instance of StreamingExtractor but the task is configured to run in continuous mode"); + } + if (!(underlyingExtractor instanceof WatermarkStorage)) { + LOG.error("Extractor {} is not an instance of WatermarkStorage but the task is configured to run in continuous mode", underlyingExtractor.getClass().getName()); + throw new TaskInstantiationException("Extractor " + underlyingExtractor.getClass().getName() + + " is not an instance of WatermarkStorage but the task is configured to run in continuous mode"); + } + long commitIntervalMillis = 1000; // TODO: Configure + this.watermarkManager = Optional.of(this.closer.register + (new WatermarkManager((WatermarkStorage) underlyingExtractor, commitIntervalMillis, Optional.of(this.LOG)))); + } + else { + this.watermarkManager = Optional.absent(); + } + // Create one fork for each forked branch for (int i = 0; i < branches; i++) { if (forkedSchemas.get(i)) { Fork fork = closer.register( new Fork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema, branches, - i)); + i, this.taskMode)); + if (isStreamingTask()) { + DataWriter forkWriter = fork.getWriter(); + if (forkWriter instanceof WatermarkAwareWriter) { + this.watermarkManager.get().registerWriter((WatermarkAwareWriter) forkWriter); + } else { + String errorMessage = String.format("The Task is configured to run in continuous mode, " + + "but the writer %s is not a WatermarkAwareWriter", forkWriter.getClass().getName()); + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } // Run the Fork this.forks.put(Optional.of(fork), Optional.>of(this.taskExecutor.submit(fork))); } else { @@ -191,20 +311,35 @@ public void run() { rowChecker = closer.register(this.taskContext.getRowLevelPolicyChecker()); RowLevelPolicyCheckResults rowResults = new RowLevelPolicyCheckResults(); - long recordsPulled = 0; - Object record; - // Extract, convert, and fork one source record at a time. - while ((record = extractor.readRecord(null)) != null) { - recordsPulled++; - for (Object convertedRecord : converter.convertRecord(schema, record, this.taskState)) { - processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches); + if (isStreamingTask()) + { + this.watermarkManager.get().start(); + } + + if (isStreamingTask()) { + RecordEnvelope recordEnvelope; + // Extract, convert, and fork one source record at a time. + while (!shutdownRequested() && (recordEnvelope = (RecordEnvelope) extractor.readRecord(null)) != null) { + onRecordExtract(); + for (Object convertedRecord : converter.convertRecord(schema, recordEnvelope.getRecord(), this.taskState)) { + processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, recordEnvelope.getWatermark()); + } + } + } else { + Object record; + // Extract, convert, and fork one source record at a time. + while ((record = extractor.readRecord(null)) != null) { + onRecordExtract(); + for (Object convertedRecord : converter.convertRecord(schema, record, this.taskState)) { + processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null); + } } } - LOG.info("Extracted " + recordsPulled + " data records"); + LOG.info("Extracted " + this.recordsPulled + " data records"); LOG.info("Row quality checker finished with results: " + rowResults.getResults()); - this.taskState.setProp(ConfigurationKeys.EXTRACTOR_ROWS_EXTRACTED, recordsPulled); + this.taskState.setProp(ConfigurationKeys.EXTRACTOR_ROWS_EXTRACTED, this.recordsPulled); this.taskState.setProp(ConfigurationKeys.EXTRACTOR_ROWS_EXPECTED, extractor.getExpectedRecordCount()); for (Optional fork : this.forks.keySet()) { @@ -217,19 +352,33 @@ public void run() { for (Optional> forkFuture : this.forks.values()) { if (forkFuture.isPresent()) { try { + long forkFutureStartTime = System.nanoTime(); forkFuture.get().get(); + long forkDuration = System.nanoTime() - forkFutureStartTime; + LOG.info("Task shutdown: Fork future reaped in {} millis", forkDuration/1000000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } + + if (watermarkManager.isPresent()) { + watermarkManager.get().close(); + } } catch (Throwable t) { failTask(t); } finally { - this.taskStateTracker.onTaskRunCompletion(this); + this.taskStateTracker.onTaskRunCompletion(this); + completeShutdown(); } } + + private void onRecordExtract() { + this.recordsPulled.incrementAndGet(); + this.lastRecordPulledTimestampMillis = System.currentTimeMillis(); + } + private void failTask(Throwable t) { LOG.error(String.format("Task %s failed", this.taskId), t); this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED); @@ -418,7 +567,7 @@ public String toString() { */ @SuppressWarnings("unchecked") private void processRecord(Object convertedRecord, ForkOperator forkOperator, RowLevelPolicyChecker rowChecker, - RowLevelPolicyCheckResults rowResults, int branches) + RowLevelPolicyCheckResults rowResults, int branches, CheckpointableWatermark watermark) throws Exception { // Skip the record if quality checking fails if (!rowChecker.executePolicies(convertedRecord, rowResults)) { @@ -455,8 +604,13 @@ private void processRecord(Object convertedRecord, ForkOperator forkOperator, Ro continue; } if (fork.isPresent() && forkedRecords.get(branch)) { - boolean succeeded = fork.get().putRecord( - convertedRecord instanceof Copyable ? ((Copyable) convertedRecord).copy() : convertedRecord); + Object recordForFork = convertedRecord instanceof Copyable ? ((Copyable) convertedRecord).copy() : convertedRecord; + if (isStreamingTask()) + { + // Send the record, watermark pair down the fork + recordForFork = new RecordEnvelope<>(recordForFork, watermark); + } + boolean succeeded = fork.get().putRecord(recordForFork); succeededPuts[branch] = succeeded; if (!succeeded) { allPutsSucceeded = false; diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/TaskInstantiationException.java b/gobblin-runtime/src/main/java/gobblin/runtime/TaskInstantiationException.java new file mode 100644 index 00000000000..929a3521177 --- /dev/null +++ b/gobblin-runtime/src/main/java/gobblin/runtime/TaskInstantiationException.java @@ -0,0 +1,30 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.runtime; + +/** + * An exception class to capture failures in Task instantiation + */ +public class TaskInstantiationException extends Exception { + + public TaskInstantiationException(String s) { + super(s); + } +} diff --git a/gobblin-runtime/src/test/java/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/gobblin/runtime/TaskContinuousTest.java new file mode 100644 index 00000000000..d13d8bbf47a --- /dev/null +++ b/gobblin-runtime/src/test/java/gobblin/runtime/TaskContinuousTest.java @@ -0,0 +1,325 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package gobblin.runtime; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; + +import lombok.extern.slf4j.Slf4j; + +import gobblin.configuration.ConfigurationKeys; +import gobblin.configuration.State; +import gobblin.configuration.WorkUnitState; +import gobblin.fork.IdentityForkOperator; +import gobblin.publisher.TaskPublisher; +import gobblin.qualitychecker.row.RowLevelPolicyCheckResults; +import gobblin.qualitychecker.row.RowLevelPolicyChecker; +import gobblin.qualitychecker.task.TaskLevelPolicyCheckResults; +import gobblin.qualitychecker.task.TaskLevelPolicyChecker; +import gobblin.source.extractor.CheckpointableWatermark; +import gobblin.source.extractor.DataRecordException; +import gobblin.source.extractor.DefaultCheckpointableWatermark; +import gobblin.source.extractor.RecordEnvelope; +import gobblin.source.extractor.StreamingExtractor; +import gobblin.source.extractor.extract.LongWatermark; +import gobblin.source.workunit.Extract; +import gobblin.source.workunit.WorkUnit; +import gobblin.util.ExecutorsUtils; +import gobblin.writer.DataWriter; +import gobblin.writer.WatermarkAwareWriter; +import gobblin.writer.WatermarkStorage; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@Slf4j +@Test +public class TaskContinuousTest { + + private static long MILLIS_TO_NANOS = 1000000; + + private static class ContinuousExtractor implements StreamingExtractor, WatermarkStorage { + + private long index = 0L; + private final long sleepTimeInMillis; + private volatile boolean closed = false; + private final Map watermarkMap; + + public ContinuousExtractor(long sleepTimeInMillis, Map watermarkMap) { + this.sleepTimeInMillis = sleepTimeInMillis; + this.watermarkMap = watermarkMap; + } + + @Override + public Object getSchema() + throws IOException { + return null; + } + + @Override + public RecordEnvelope readRecord(@Deprecated RecordEnvelope reuse) + throws DataRecordException, IOException { + if (!this.closed) { + String record = index + ""; + RecordEnvelope recordEnvelope = + new RecordEnvelope<>(record, new DefaultCheckpointableWatermark("default", new LongWatermark(index))); + index++; + return recordEnvelope; + } else { + log.info("Extractor has been closed, returning null"); + return null; + } + } + + @Override + public long getExpectedRecordCount() { + return this.index; + } + + @Override + public long getHighWatermark() { + return -1; + } + + @Override + public void close() + throws IOException { + this.closed = true; + } + + @Override + public void commitWatermarks(Iterable watermarks) + throws IOException { + for (CheckpointableWatermark watermark : watermarks) { + watermarkMap.put(watermark.getSource(), watermark); + } + } + + public boolean validateWatermarks(boolean exact) { + if (!watermarkMap.isEmpty()) { + // watermark must be <= the index + LongWatermark longWatermark = (LongWatermark) watermarkMap.values().iterator().next().getWatermark(); + if (exact) { + System.out.println(index-1 + ":" + longWatermark.getValue()); + return ((index-1) == longWatermark.getValue()); + } else { + return (index > longWatermark.getValue()); + } + } + return true; + } + } + + @Test + public void testContinuousTask() + throws Exception { + // Create a TaskState + TaskState taskState = new TaskState(new WorkUnitState(WorkUnit.create( + new Extract(Extract.TableType.SNAPSHOT_ONLY, this.getClass().getName(), this.getClass().getSimpleName())))); + taskState.setProp(ConfigurationKeys.METRICS_ENABLED_KEY, Boolean.toString(false)); + taskState.setProp(ConfigurationKeys.TASK_EXECUTION_MODE, ExecutionModel.STREAMING.name()); + taskState.setJobId("1234"); + taskState.setTaskId("testContinuousTaskId"); + + ArrayList recordCollector = new ArrayList<>(100); + long perRecordExtractLatencyMillis = 1000; // 1 second per record + + ConcurrentHashMap externalWatermarkStorage = new ConcurrentHashMap<>(); + ContinuousExtractor continuousExtractor = + new ContinuousExtractor(perRecordExtractLatencyMillis, externalWatermarkStorage); + // Create a mock RowLevelPolicyChecker + RowLevelPolicyChecker mockRowLevelPolicyChecker = mock(RowLevelPolicyChecker.class); + when(mockRowLevelPolicyChecker.executePolicies(any(Object.class), any(RowLevelPolicyCheckResults.class))).thenReturn(true); + when(mockRowLevelPolicyChecker.getFinalState()).thenReturn(new State()); + + // Create a mock TaskContext + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getExtractor()).thenReturn(continuousExtractor); + when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator()); + when(mockTaskContext.getTaskState()).thenReturn(taskState); + when(mockTaskContext.getRowLevelPolicyChecker()).thenReturn(mockRowLevelPolicyChecker); + when(mockTaskContext.getRowLevelPolicyChecker(anyInt())).thenReturn(mockRowLevelPolicyChecker); + when(mockTaskContext.getTaskLevelPolicyChecker(any(TaskState.class), anyInt())).thenReturn(mock(TaskLevelPolicyChecker.class)); + when(mockTaskContext.getDataWriterBuilder(anyInt(), anyInt())).thenReturn(new TestStreamingDataWriterBuilder(recordCollector)); + + // Create a mock TaskPublisher + TaskPublisher mockTaskPublisher = mock(TaskPublisher.class); + when(mockTaskPublisher.canPublish()).thenReturn(TaskPublisher.PublisherState.SUCCESS); + when(mockTaskContext.getTaskPublisher(any(TaskState.class), any(TaskLevelPolicyCheckResults.class))).thenReturn(mockTaskPublisher); + + // Create a mock TaskStateTracker + TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class); + + // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread + TaskExecutor taskExecutor = new TaskExecutor(new Properties()); + + // Create the Task + Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.absent()); + + ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log))); + + taskRunner.execute(task); + + + // Let the task run for 10 seconds + int sleepIterations = 10; + int currentIteration = 0; + + while (currentIteration < sleepIterations) { + Thread.sleep(1000); + currentIteration++; + if (!externalWatermarkStorage.isEmpty()) { + for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) { + log.info("Observed committed watermark: {}", watermark); + } + log.info("Task progress: {}", task.getProgress()); + // Ensure that watermarks seem reasonable at each step + Assert.assertTrue(continuousExtractor.validateWatermarks(false)); + } + } + + // Let's try to shutdown the task + task.shutdown(); + log.info("Shutting down task now"); + boolean success = task.awaitShutdown(1000); + Assert.assertTrue(success, "Task should shutdown in 1 second"); + log.info("Task done waiting to shutdown {}", success); + + // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner. + Assert.assertTrue(continuousExtractor.validateWatermarks(true)); + + task.commit(); + + // Shutdown the executor + taskRunner.shutdown(); + taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS); + + + } + + private class TestStreamingDataWriterBuilder extends gobblin.writer.DataWriterBuilder { + + private final List _recordCollector; + + TestStreamingDataWriterBuilder(List recordCollector) { + _recordCollector = recordCollector; + } + + @Override + public DataWriter build() + throws IOException { + return new WatermarkAwareWriter() { + + private AtomicReference lastWatermark = new AtomicReference<>(null); + private AtomicReference source = new AtomicReference<>(null); + + @Override + public void write(Object record) + throws IOException { + throw new UnsupportedOperationException("Does not support writing non-envelope records"); + } + + @Override + public boolean isWatermarkCapable() { + return true; + } + + @Override + public void writeEnvelope(RecordEnvelope recordEnvelope) + throws IOException { + _recordCollector.add(recordEnvelope.getRecord()); + String source = recordEnvelope.getWatermark().getSource(); + if (this.source.get() != null) { + if (!source.equals(this.source.get())) { + throw new RuntimeException("This writer only supports a single source"); + } + } + this.lastWatermark.set(recordEnvelope.getWatermark()); + this.source.set(source); + } + + @Override + public Map getCommittableWatermark() { + CheckpointableWatermark committable = lastWatermark.get(); + if (committable != null) { + Map singletonMap = new HashMap<>(); + singletonMap.put(source.get(), committable); + return singletonMap; + } else { + return Collections.EMPTY_MAP; + } + } + + @Override + public Map getUnacknowledgedWatermark() { + return Collections.EMPTY_MAP; + } + + @Override + public void commit() + throws IOException { + + } + + @Override + public void cleanup() + throws IOException { + + } + + @Override + public long recordsWritten() { + return 0; + } + + @Override + public long bytesWritten() + throws IOException { + return 0; + } + + @Override + public void close() + throws IOException { + + } + }; + } + } +}