Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11129][Sort] Enhanced source metric instrumentation for InLong Sort Flink Connector #11130

Merged
merged 7 commits into from
Sep 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ public final class Constants {

public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";

public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag";

public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess";

public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError";

public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate";

public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError";

public static final String NUM_SNAPSHOT_COMPLETE = "numSnapshotComplete";

public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag";

/**
* Timestamp when the read phase changed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR;
import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_COMPLETE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE;
import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR;
import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG;
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter {
Expand All @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
private Counter numBytesIn;
private Counter numRecordsInForMeter;
private Counter numBytesInForMeter;
private Counter numDeserializeSuccess;
private Counter numDeserializeError;
private Gauge<Long> deserializeTimeLag;
private Counter numSnapshotCreate;
private Counter numSnapshotError;
private Counter numSnapshotComplete;
private Gauge<Long> snapshotToCheckpointTimeLag;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
private AuditReporterImpl auditReporter;
Expand Down Expand Up @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr
*/
private volatile long emitDelay = 0L;

/**
* deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called,
* and deserializeEndTime is the time the record is emitted
*/
private volatile long deserializeDelay = 0L;

/**
* snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished
*/
private volatile long snapshotToCheckpointDelay = 0L;

public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
Expand All @@ -98,6 +123,13 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForNumRecordsInPerSecond();
registerMetricsForCurrentFetchEventTimeLag();
registerMetricsForCurrentEmitEventTimeLag();
registerMetricsForDeserializeTimeLag();
registerMetricsForNumSnapshotComplete(new ThreadSafeCounter());
registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter());
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
registerMetricsForNumSnapshotCreate(new ThreadSafeCounter());
registerMetricsForNumSnapshotError(new ThreadSafeCounter());
registerMetricsForSnapshotToCheckpointTimeLag();
break;
}

Expand Down Expand Up @@ -178,6 +210,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() {
public void registerMetricsForCurrentEmitEventTimeLag() {
currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge<Long>) this::getEmitDelay);
}
public void registerMetricsForDeserializeTimeLag() {
deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge<Long>) this::getDeserializeDelay);
}

public void registerMetricsForNumDeserializeSuccess(Counter counter) {
numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter);
}

public void registerMetricsForNumDeserializeError(Counter counter) {
numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter);
}

public void registerMetricsForNumSnapshotCreate(Counter counter) {
numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter);
}

public void registerMetricsForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricsForNumSnapshotComplete(Counter counter) {
numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, counter);
}

public void registerMetricsForSnapshotToCheckpointTimeLag() {
snapshotToCheckpointTimeLag =
registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge<Long>) this::getSnapshotToCheckpointDelay);
}

public Gauge getDeserializeTimeLag() {
return deserializeTimeLag;
}

public Gauge getSnapshotToCheckpointTimeLag() {
return snapshotToCheckpointTimeLag;
}

public Counter getNumDeserializeSuccess() {
return numDeserializeSuccess;
}

public Counter getNumDeserializeError() {
return numDeserializeError;
}

public Counter getNumSnapshotCreate() {
return numSnapshotCreate;
}

public Counter getNumSnapshotError() {
return numSnapshotError;
}

public Counter getNumRecordsIn() {
return numRecordsIn;
Expand Down Expand Up @@ -211,6 +295,26 @@ public long getEmitDelay() {
return emitDelay;
}

public long getDeserializeDelay() {
return deserializeDelay;
}

public long getSnapshotToCheckpointDelay() {
return snapshotToCheckpointDelay;
}

public Counter getNumSnapshotComplete() {
return numSnapshotComplete;
}

public void recordDeserializeDelay(long deserializeDelay) {
this.deserializeDelay = deserializeDelay;
}

public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) {
this.snapshotToCheckpointDelay = snapshotToCheckpointDelay;
}

@Override
public MetricGroup getMetricGroup() {
return metricGroup;
Expand Down Expand Up @@ -262,6 +366,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
}
}

public void incNumDeserializeSuccess() {
if (numDeserializeSuccess != null) {
numDeserializeSuccess.inc();
}
}

public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
}
}

public void incNumSnapshotCreate() {
if (numSnapshotCreate != null) {
numSnapshotCreate.inc();
}
}

public void incNumSnapshotError() {
if (numSnapshotError != null) {
numSnapshotError.inc();
}
}

public void incNumSnapshotComplete() {
if (numSnapshotComplete != null) {
numSnapshotComplete.inc();
}
}

/**
* flush audit data
* usually call this method in close method or when checkpointing
Expand Down Expand Up @@ -292,6 +426,15 @@ public String toString() {
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue()
+ ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue()
+ ", deserializeTimeLag=" + deserializeTimeLag.getValue()
+ ", numDeserializeSuccess=" + numDeserializeSuccess.getCount()
+ ", numDeserializeError=" + numDeserializeError.getCount()
+ ", numSnapshotCreate=" + numSnapshotCreate.getCount()
+ ", numSnapshotError=" + numSnapshotError.getCount()
+ ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue()
+ ", numSnapshotComplete=" + numSnapshotComplete.getCount()
+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+ ", auditReporter=" + auditReporter
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -62,6 +65,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -198,17 +203,27 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

/** Self-defined Flink metrics. */
private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -221,6 +236,15 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
// instantiate checkpointStartTimeMap after restoring from checkpoint
checkpointStartTimeMap = new HashMap<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -305,6 +329,17 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
try {
doSnapshotState(functionSnapshotContext);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotError();
}
throw e;
}
}

private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
Expand All @@ -317,6 +352,15 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
// record the start time of each checkpoint
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
}

private void snapshotOffsetState(long checkpointId) throws Exception {
Expand Down Expand Up @@ -496,6 +540,17 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.postgresql.PostgresConnector;

Expand Down Expand Up @@ -53,6 +55,7 @@ public static class Builder<T> {
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported
Expand Down Expand Up @@ -146,6 +149,12 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

/** metricOption used to instantiate SourceExactlyMetric */
public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
Expand Down Expand Up @@ -178,7 +187,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setMetricOption(metricOption)
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
Expand All @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down
Loading
Loading