Skip to content

Commit

Permalink
Tweaks for long-running tasks. Tests pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Jan 8, 2017
1 parent b8733ec commit 9bb0e71
Show file tree
Hide file tree
Showing 21 changed files with 1,884 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -674,4 +674,10 @@ public class ConfigurationKeys {
public static final Charset DEFAULT_CHARSET_ENCODING = Charsets.UTF_8;
public static final String TEST_HARNESS_LAUNCHER_IMPL = "gobblin.testharness.launcher.impl";
public static final int PERMISSION_PARSING_RADIX = 8;

/**
* Configuration properties related to continuous / streaming mode
*/
public static final String TASK_EXECUTION_MODE = "gobblin.task.executionMode";
public static final String DEFAULT_TASK_EXECUTION_MODE = "BATCH";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.source.extractor;

/**
* {@link Watermark} that is {@link Comparable} and Checkpointable
*/
public interface CheckpointableWatermark extends Watermark, Comparable<CheckpointableWatermark> {

/**
*
* @return the unique id of the source that generated this watermark.
* Watermarks generated by different sources are not comparable and therefore need to be checkpoint-ed independently
*/
String getSource();

/**
*
* @return the source-local watermark.
*/
ComparableWatermark getWatermark();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.source.extractor;

import com.google.gson.Gson;
import com.google.gson.JsonElement;

import lombok.EqualsAndHashCode;


/**
* A {@link CheckpointableWatermark} that wraps a {@link ComparableWatermark} specific to a source.
*/
@EqualsAndHashCode
public class DefaultCheckpointableWatermark implements CheckpointableWatermark {
private static final Gson GSON = new Gson();

private final String source;
private final ComparableWatermark comparable;

public DefaultCheckpointableWatermark(String source, ComparableWatermark comparableWatermark) {
this.source = source;
this.comparable = comparableWatermark;
}

public String getSource() {
return this.source;
}

@Override
public ComparableWatermark getWatermark() {
return this.comparable;
}

@Override
public int compareTo(CheckpointableWatermark o) {
if (!(this.source.equals(o.getSource()))) {
throw new RuntimeException("Could not compare two checkpointable watermarks because they have different sources "
+ this.source + ":" + o.getSource());
}
return this.comparable.compareTo(o.getWatermark());
}

@Override
public JsonElement toJson() {
return comparable.toJson();
}

@Override
public short calculatePercentCompletion(Watermark lowWatermark, Watermark highWatermark) {
return comparable.calculatePercentCompletion(lowWatermark, highWatermark);
}

@Override
public String toString() {
return String.format("%s : %s ", getSource(), GSON.toJson(this.comparable.toJson()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.source.extractor;

public class RecordEnvelope<D> {

private D _record;
private final CheckpointableWatermark _watermark;

public RecordEnvelope(D record, CheckpointableWatermark watermark) {
_record = record;
_watermark = watermark;
}

public D getRecord() {
return _record;
}

/**
* Swap the record attached to this watermark, useful when transmitting envelopes down a
* processing chain to save on object creation
*/
public RecordEnvelope setRecord(D record) {
_record = record;
return this;
}

public CheckpointableWatermark getWatermark() {
return _watermark;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.source.extractor;

/**
* An interface for implementing streaming / continuous extractors
*/
public interface StreamingExtractor<S, D> extends Extractor<S, RecordEnvelope<D>> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,42 @@
package gobblin.instrumented.writer;

import java.io.IOException;
import java.util.Map;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;

import gobblin.configuration.State;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.MetricContext;
import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.source.extractor.RecordEnvelope;
import gobblin.util.Decorator;
import gobblin.util.DecoratorUtils;
import gobblin.util.FinalState;
import gobblin.writer.DataWriter;
import gobblin.writer.WatermarkAwareWriter;


/**
* Decorator that automatically instruments {@link gobblin.writer.DataWriter}. Handles already instrumented
* {@link gobblin.instrumented.writer.InstrumentedDataWriter} appropriately to avoid double metric reporting.
*/
public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBase<D> implements Decorator {
public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBase<D> implements Decorator, WatermarkAwareWriter<D> {

private DataWriter<D> embeddedWriter;
private boolean isEmbeddedInstrumented;
private Optional<WatermarkAwareWriter> watermarkAwareWriter;

public InstrumentedDataWriterDecorator(DataWriter<D> writer, State state) {
super(state, Optional.<Class<?>> of(DecoratorUtils.resolveUnderlyingObject(writer).getClass()));
this.embeddedWriter = this.closer.register(writer);
this.isEmbeddedInstrumented = Instrumented.isLineageInstrumented(writer);
if (this.embeddedWriter instanceof WatermarkAwareWriter) {
this.watermarkAwareWriter = Optional.of((WatermarkAwareWriter) this.embeddedWriter);
} else {
this.watermarkAwareWriter = Optional.absent();
}
}

@Override
Expand Down Expand Up @@ -98,4 +109,28 @@ public State getFinalState() {
public Object getDecoratedObject() {
return this.embeddedWriter;
}

@Override
public boolean isWatermarkCapable() {
return watermarkAwareWriter.isPresent() && watermarkAwareWriter.get().isWatermarkCapable();
}

@Override
public void writeEnvelope(RecordEnvelope<D> recordEnvelope)
throws IOException {
Preconditions.checkState(isWatermarkCapable());
watermarkAwareWriter.get().writeEnvelope(recordEnvelope);
}

@Override
public Map<String, CheckpointableWatermark> getCommittableWatermark() {
Preconditions.checkState(isWatermarkCapable());
return watermarkAwareWriter.get().getCommittableWatermark();
}

@Override
public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
Preconditions.checkState(isWatermarkCapable());
return watermarkAwareWriter.get().getUnacknowledgedWatermark();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.writer;

import java.io.IOException;
import java.util.Map;

import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.source.extractor.RecordEnvelope;


/**
* A DataWriter that is WatermarkAware. Required for implementing writers that
* can operate in streaming mode.
*/
public interface WatermarkAwareWriter<D> extends DataWriter<D> {

/**
*
* @return true if the writer can support watermark-bearing record envelopes
*/
boolean isWatermarkCapable();

void writeEnvelope(final RecordEnvelope<D> recordEnvelope) throws IOException;
/**
* @return A Watermark per source that can safely be committed because all records associated with it
* and earlier watermarks have been committed to the destination. Return empty if no such watermark exists.
*
*/
Map<String, CheckpointableWatermark> getCommittableWatermark();

/**
*
* @return The lowest watermark out of all pending write requests
*/
Map<String, CheckpointableWatermark> getUnacknowledgedWatermark();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package gobblin.writer;

import java.io.IOException;
import java.util.Map;

import com.google.common.base.Optional;

import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.source.extractor.RecordEnvelope;


/**
* A convenience wrapper class for WatermarkAware writers.
*/
public abstract class WatermarkAwareWriterWrapper<D> implements WatermarkAwareWriter<D> {
private Optional<WatermarkAwareWriter> watermarkAwareWriter = Optional.absent();

public final void setWatermarkAwareWriter(WatermarkAwareWriter watermarkAwareWriter) {
this.watermarkAwareWriter = Optional.of(watermarkAwareWriter);
}

public final boolean isWatermarkCapable() {
return watermarkAwareWriter.get().isWatermarkCapable();
}

public final void writeEnvelope(final RecordEnvelope<D> recordEnvelope) throws IOException {
watermarkAwareWriter.get().writeEnvelope(recordEnvelope);
}

public final Map<String, CheckpointableWatermark> getCommittableWatermark() {
return watermarkAwareWriter.get().getCommittableWatermark();
}

public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
return watermarkAwareWriter.get().getUnacknowledgedWatermark();
}

}
Loading

0 comments on commit 9bb0e71

Please sign in to comment.