Skip to content

Commit

Permalink
[INLONG-11129][Sort] Enhanced source metric instrumentation for InLon…
Browse files Browse the repository at this point in the history
…g Sort Flink Connector (#11130)
  • Loading branch information
PeterZh6 authored Sep 25, 2024
1 parent 5c722da commit 0122aa4
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ public final class Constants {

public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";

public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag";

public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess";

public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError";

public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate";

public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError";

public static final String NUM_SNAPSHOT_COMPLETE = "numSnapshotComplete";

public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag";

/**
* Timestamp when the read phase changed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_COMPLETE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR;
import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter {
Expand All @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
private Counter numBytesIn;
private Counter numRecordsInForMeter;
private Counter numBytesInForMeter;
private Counter numDeserializeSuccess;
private Counter numDeserializeError;
private Gauge<Long> deserializeTimeLag;
private Counter numSnapshotCreate;
private Counter numSnapshotError;
private Counter numSnapshotComplete;
private Gauge<Long> snapshotToCheckpointTimeLag;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
private AuditReporterImpl auditReporter;
Expand Down Expand Up @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
*/
private volatile long emitDelay = 0L;

/**
* deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called,
* and deserializeEndTime is the time the record is emitted
*/
private volatile long deserializeDelay = 0L;

/**
* snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished
*/
private volatile long snapshotToCheckpointDelay = 0L;

public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
Expand All @@ -98,6 +123,13 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForNumRecordsInPerSecond();
registerMetricsForCurrentFetchEventTimeLag();
registerMetricsForCurrentEmitEventTimeLag();
registerMetricsForDeserializeTimeLag();
registerMetricsForNumSnapshotComplete(new ThreadSafeCounter());
registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter());
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
registerMetricsForNumSnapshotCreate(new ThreadSafeCounter());
registerMetricsForNumSnapshotError(new ThreadSafeCounter());
registerMetricsForSnapshotToCheckpointTimeLag();
break;
}

Expand Down Expand Up @@ -178,6 +210,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() {
public void registerMetricsForCurrentEmitEventTimeLag() {
currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge<Long>) this::getEmitDelay);
}
public void registerMetricsForDeserializeTimeLag() {
deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge<Long>) this::getDeserializeDelay);
}

public void registerMetricsForNumDeserializeSuccess(Counter counter) {
numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter);
}

public void registerMetricsForNumDeserializeError(Counter counter) {
numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter);
}

public void registerMetricsForNumSnapshotCreate(Counter counter) {
numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter);
}

public void registerMetricsForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricsForNumSnapshotComplete(Counter counter) {
numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, counter);
}

public void registerMetricsForSnapshotToCheckpointTimeLag() {
snapshotToCheckpointTimeLag =
registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge<Long>) this::getSnapshotToCheckpointDelay);
}

public Gauge getDeserializeTimeLag() {
return deserializeTimeLag;
}

public Gauge getSnapshotToCheckpointTimeLag() {
return snapshotToCheckpointTimeLag;
}

public Counter getNumDeserializeSuccess() {
return numDeserializeSuccess;
}

public Counter getNumDeserializeError() {
return numDeserializeError;
}

public Counter getNumSnapshotCreate() {
return numSnapshotCreate;
}

public Counter getNumSnapshotError() {
return numSnapshotError;
}

public Counter getNumRecordsIn() {
return numRecordsIn;
Expand Down Expand Up @@ -211,6 +295,26 @@ public long getEmitDelay() {
return emitDelay;
}

public long getDeserializeDelay() {
return deserializeDelay;
}

public long getSnapshotToCheckpointDelay() {
return snapshotToCheckpointDelay;
}

public Counter getNumSnapshotComplete() {
return numSnapshotComplete;
}

public void recordDeserializeDelay(long deserializeDelay) {
this.deserializeDelay = deserializeDelay;
}

public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) {
this.snapshotToCheckpointDelay = snapshotToCheckpointDelay;
}

@Override
public MetricGroup getMetricGroup() {
return metricGroup;
Expand Down Expand Up @@ -262,6 +366,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
}
}

public void incNumDeserializeSuccess() {
if (numDeserializeSuccess != null) {
numDeserializeSuccess.inc();
}
}

public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
}
}

public void incNumSnapshotCreate() {
if (numSnapshotCreate != null) {
numSnapshotCreate.inc();
}
}

public void incNumSnapshotError() {
if (numSnapshotError != null) {
numSnapshotError.inc();
}
}

public void incNumSnapshotComplete() {
if (numSnapshotComplete != null) {
numSnapshotComplete.inc();
}
}

/**
* flush audit data
* usually call this method in close method or when checkpointing
Expand Down Expand Up @@ -292,6 +426,15 @@ public String toString() {
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue()
+ ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue()
+ ", deserializeTimeLag=" + deserializeTimeLag.getValue()
+ ", numDeserializeSuccess=" + numDeserializeSuccess.getCount()
+ ", numDeserializeError=" + numDeserializeError.getCount()
+ ", numSnapshotCreate=" + numSnapshotCreate.getCount()
+ ", numSnapshotError=" + numSnapshotError.getCount()
+ ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue()
+ ", numSnapshotComplete=" + numSnapshotComplete.getCount()
+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", auditReporter=" + auditReporter
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -62,6 +65,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -198,17 +203,27 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

/** Self-defined Flink metrics. */
private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -221,6 +236,15 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
// instantiate checkpointStartTimeMap after restoring from checkpoint
checkpointStartTimeMap = new HashMap<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -305,6 +329,17 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
try {
doSnapshotState(functionSnapshotContext);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
}
throw e;
}
}

private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
Expand All @@ -317,6 +352,15 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
// record the start time of each checkpoint
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
}

private void snapshotOffsetState(long checkpointId) throws Exception {
Expand Down Expand Up @@ -496,6 +540,17 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.postgresql.PostgresConnector;

Expand Down Expand Up @@ -53,6 +55,7 @@ public static class Builder<T> {
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported
Expand Down Expand Up @@ -146,6 +149,12 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

/** metricOption used to instantiate SourceExactlyMetric */
public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
Expand Down Expand Up @@ -178,7 +187,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setMetricOption(metricOption)
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
Expand All @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down
Loading

0 comments on commit 0122aa4

Please sign in to comment.