From 3a0eef3f3bac1c65619131ab8bb21fc35b73eb2f Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Mon, 16 Sep 2024 10:23:27 +0800 Subject: [PATCH 1/5] [Feature][Sort] Enhanced Metric Instrumentation for InLong Sort Flink Connector --- .../apache/inlong/sort/base/Constants.java | 14 ++ .../sort/base/metric/SourceExactlyMetric.java | 141 ++++++++++++++++++ .../sort/postgre/DebeziumSourceFunction.java | 34 +++++ .../RowDataDebeziumDeserializeSchema.java | 20 +++ 4 files changed, 209 insertions(+) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 8fd698fa126..ff3c6946a4c 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -67,6 +67,20 @@ public final class Constants { public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; + public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag"; + + public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess"; + + public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError"; + + public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate"; + + public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError"; + + public static final String NUM_COMPLETED_SNAPSHOTS = "numCompletedSnapshots"; + + public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag"; + /** * Timestamp when the read phase changed */ diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 19f9f1eda92..6f32d15f767 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -33,12 +33,19 @@ import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION; import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG; import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG; +import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_COMPLETED_SNAPSHOTS; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR; +import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG; import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter { @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr private Counter numBytesIn; private Counter numRecordsInForMeter; private Counter numBytesInForMeter; + private Counter numDeserializeSuccess; + private Counter numDeserializeError; + private Gauge deserializeTimeLag; + private Counter numSnapshotCreate; + private Counter numSnapshotError; + private Counter numCompletedSnapshots; + private Gauge snapshotToCheckpointTimeLag; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; private AuditReporterImpl auditReporter; @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr */ private volatile long emitDelay = 0L; + /** + * deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called, + * and deserializeEndTime is the time the record is emitted + */ + private volatile long deserializeDelay = 0L; + + /** + * snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished + */ + private volatile long snapshotToCheckpointDelay = 0L; + public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; this.labels = option.getLabels(); @@ -98,6 +123,12 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForNumRecordsInPerSecond(); registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); + registerMetricsForDeserializeTimeLag(); + registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); + registerMetricsForNumDeserializeError(new ThreadSafeCounter()); + registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); + registerMetricsForNumSnapshotError(new ThreadSafeCounter()); + registerMetricsForSnapshotToCheckpointTimeLag(); break; } @@ -178,6 +209,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() { public void registerMetricsForCurrentEmitEventTimeLag() { currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge) this::getEmitDelay); } + public void registerMetricsForDeserializeTimeLag() { + deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge) this::getDeserializeDelay); + } + + public void registerMetricsForNumDeserializeSuccess(Counter counter) { + numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter); + } + + public void registerMetricsForNumDeserializeError(Counter counter) { + numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter); + } + + public void registerMetricsForNumSnapshotCreate(Counter counter) { + numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter); + } + + public void registerMetricsForNumSnapshotError(Counter counter) { + numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); + } + + public void registerMetricsForNumCompletedCheckpoints(Counter counter) { + numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter); + } + + public void registerMetricsForSnapshotToCheckpointTimeLag() { + snapshotToCheckpointTimeLag = + registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge) this::getSnapshotToCheckpointDelay); + } + + public Gauge getDeserializeTimeLag() { + return deserializeTimeLag; + } + + public Gauge getSnapshotToCheckpointTimeLag() { + return snapshotToCheckpointTimeLag; + } + + public Counter getNumDeserializeSuccess() { + return numDeserializeSuccess; + } + + public Counter getNumDeserializeError() { + return numDeserializeError; + } + + public Counter getNumSnapshotCreate() { + return numSnapshotCreate; + } + + public Counter getNumSnapshotError() { + return numSnapshotError; + } public Counter getNumRecordsIn() { return numRecordsIn; @@ -211,6 +294,26 @@ public long getEmitDelay() { return emitDelay; } + public long getDeserializeDelay() { + return deserializeDelay; + } + + public long getSnapshotToCheckpointDelay() { + return snapshotToCheckpointDelay; + } + + public Counter getNumCompletedSnapshots() { + return numCompletedSnapshots; + } + + public void recordDeserializeDelay(long deserializeDelay) { + this.deserializeDelay = deserializeDelay; + } + + public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) { + this.snapshotToCheckpointDelay = snapshotToCheckpointDelay; + } + @Override public MetricGroup getMetricGroup() { return metricGroup; @@ -262,6 +365,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) { } } + public void incNumDeserializeSuccess() { + if (numDeserializeSuccess != null) { + numDeserializeSuccess.inc(); + } + } + + public void incNumDeserializeError() { + if (numDeserializeError != null) { + numDeserializeError.inc(); + } + } + + public void incNumSnapshotCreate() { + if (numSnapshotCreate != null) { + numSnapshotCreate.inc(); + } + } + + public void incNumSnapshotError() { + if (numSnapshotError != null) { + numSnapshotError.inc(); + } + } + + public void incNumCompletedSnapshots() { + if (numCompletedSnapshots != null) { + numCompletedSnapshots.inc(); + } + } + /** * flush audit data * usually call this method in close method or when checkpointing @@ -292,6 +425,14 @@ public String toString() { + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue() + ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue() + + ", deserializeTimeLag=" + deserializeTimeLag.getValue() + + ", numDeserializeSuccess=" + numDeserializeSuccess.getCount() + + ", numDeserializeError=" + numDeserializeError.getCount() + + ", numSnapshotCreate=" + numSnapshotCreate.getCount() + + ", numSnapshotError=" + numSnapshotError.getCount() + + ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue() + + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", auditReporter=" + auditReporter + '}'; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5efc6c6ea5d..5ef6abacb98 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -62,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -198,6 +202,11 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private SourceExactlyMetric sourceExactlyMetric; + + // record the start time of each checkpoint + private final transient Map checkpointStartTimeMap = new HashMap<>(); + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -221,6 +230,10 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + // get sourceExactlyMetric from deserializer to record metrics + if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) { + sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric(); + } } // ------------------------------------------------------------------------ @@ -305,6 +318,17 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + try { + doSnapshotState(functionSnapshotContext); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotError(); + } + throw e; + } + } + + private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { if (handover.hasError()) { LOG.debug("snapshotState() called on closed source"); throw new FlinkRuntimeException( @@ -317,6 +341,10 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw ((RowDataDebeziumDeserializeSchema) deserializer) .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } + // record the start time of each checkpoint + long checkpointId = functionSnapshotContext.getCheckpointId(); + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + sourceExactlyMetric.incNumSnapshotCreate(); } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -496,6 +524,12 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null) { + sourceExactlyMetric.incNumCompletedSnapshots(); + sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index fdf2d013279..f5075ed4e7a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -35,6 +35,7 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; +import lombok.Getter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -101,6 +102,9 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; private final MetricOption metricOption; + + // Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction + @Getter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -139,6 +143,18 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { + long deseializeStartTime = System.currentTimeMillis(); + try { + doDeserialize(record, out, deseializeStartTime); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; + } + } + + private void doDeserialize(SourceRecord record, Collector out, long deseializeStartTime) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); @@ -174,6 +190,10 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } emit(record, after, out); } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime); + } } private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { From b68633090f1c75356c8e2d5c0d80c957ba25e778 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Wed, 18 Sep 2024 23:25:52 +0800 Subject: [PATCH 2/5] fix: self-defined metrics not present in log and added one missing metric --- .../sort/base/metric/SourceExactlyMetric.java | 3 +- .../sort/postgre/DebeziumSourceFunction.java | 48 ++++++++++++++----- .../inlong/sort/postgre/PostgreSQLSource.java | 11 ++++- .../sort/postgre/PostgreSQLTableSource.java | 2 +- .../RowDataDebeziumDeserializeSchema.java | 32 ++++--------- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 6f32d15f767..b05078ad9c1 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -124,6 +124,7 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); registerMetricsForDeserializeTimeLag(); + registerMetricsForNumCompletedSnapshots(new ThreadSafeCounter()); registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); registerMetricsForNumDeserializeError(new ThreadSafeCounter()); registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); @@ -229,7 +230,7 @@ public void registerMetricsForNumSnapshotError(Counter counter) { numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); } - public void registerMetricsForNumCompletedCheckpoints(Counter counter) { + public void registerMetricsForNumCompletedSnapshots(Counter counter) { numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5ef6abacb98..689e14b40e3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import com.ververica.cdc.debezium.Validator; @@ -202,10 +203,13 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; - private SourceExactlyMetric sourceExactlyMetric; + /** Self-defined Flink metrics. */ + private transient SourceExactlyMetric sourceExactlyMetric; - // record the start time of each checkpoint - private final transient Map checkpointStartTimeMap = new HashMap<>(); + private final MetricOption metricOption; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap = new HashMap<>(); // --------------------------------------------------------------------------------------- @@ -213,11 +217,13 @@ public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, + MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -230,9 +236,16 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); - // get sourceExactlyMetric from deserializer to record metrics - if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) { - sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric(); + if (sourceExactlyMetric == null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .setSourceExactlyMetric(sourceExactlyMetric); + } + // instantiate checkpointStartTimeMap after restoring from checkpoint + if (checkpointStartTimeMap == null) { + checkpointStartTimeMap = new HashMap<>(); } } @@ -342,8 +355,12 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } // record the start time of each checkpoint - long checkpointId = functionSnapshotContext.getCheckpointId(); - checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + Long checkpointId = functionSnapshotContext.getCheckpointId(); + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } sourceExactlyMetric.incNumSnapshotCreate(); } @@ -525,10 +542,15 @@ public void notifyCheckpointComplete(long checkpointId) { schema.updateLastCheckpointId(checkpointId); } // get the start time of the currently completed checkpoint - Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); - if (snapShotStartTimeById != null) { - sourceExactlyMetric.incNumCompletedSnapshots(); - sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null) { + sourceExactlyMetric.incNumCompletedSnapshots(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); } } catch (Exception e) { // ignore exception if we are no longer running diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java index 040541b8272..cbd1eb2679d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.debezium.Validator; import io.debezium.connector.postgresql.PostgresConnector; @@ -53,6 +55,7 @@ public static class Builder { private String[] tableList; private Properties dbzProperties; private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** * The name of the Postgres logical decoding plug-in installed on the server. Supported @@ -146,6 +149,12 @@ public Builder deserializer(DebeziumDeserializationSchema deserializer) { return this; } + /** metricOption used to instantiate SourceExactlyMetric */ + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); @@ -178,7 +187,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator()); + deserializer, props, null, Validator.getDefaultValidator(), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 6e4bd7c9229..cabfe255fc4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setMetricOption(metricOption) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .slotName(slotName) .debeziumProperties(dbzProperties) .deserializer(deserializer) + .metricOption(metricOption) .build(); return SourceFunctionProvider.of(sourceFunction, false); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index f5075ed4e7a..33d51a013b8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.postgre; -import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; @@ -35,7 +34,7 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; -import lombok.Getter; +import lombok.Setter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -101,10 +100,9 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private final MetricOption metricOption; - // Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction - @Getter + /** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */ + @Setter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -119,8 +117,7 @@ public static Builder newBuilder() { ValueValidator validator, ZoneId serverTimeZone, DeserializationRuntimeConverterFactory userDefinedConverterFactory, - DebeziumChangelogMode changelogMode, - MetricOption metricOption) { + DebeziumChangelogMode changelogMode) { this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); this.physicalConverter = @@ -131,21 +128,17 @@ public static Builder newBuilder() { this.resultTypeInfo = checkNotNull(resultTypeInfo); this.validator = checkNotNull(validator); this.changelogMode = checkNotNull(changelogMode); - this.metricOption = metricOption; } @Override public void open() { - if (metricOption != null) { - sourceExactlyMetric = new SourceExactlyMetric(metricOption); - } } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - long deseializeStartTime = System.currentTimeMillis(); + long deserializeStartTime = System.currentTimeMillis(); try { - doDeserialize(record, out, deseializeStartTime); + doDeserialize(record, out, deserializeStartTime); } catch (Exception e) { if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeError(); @@ -154,7 +147,8 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } } - private void doDeserialize(SourceRecord record, Collector out, long deseializeStartTime) throws Exception { + private void doDeserialize(SourceRecord record, Collector out, long deserializeStartTime) + throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); @@ -192,7 +186,7 @@ private void doDeserialize(SourceRecord record, Collector out, long des } if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeSuccess(); - sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); } } @@ -239,7 +233,6 @@ public static class Builder { private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; - private MetricOption metricOption; public Builder setPhysicalRowType(RowType physicalRowType) { this.physicalRowType = physicalRowType; @@ -271,10 +264,6 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { this.changelogMode = changelogMode; return this; } - public Builder setMetricOption(MetricOption metricOption) { - this.metricOption = metricOption; - return this; - } public RowDataDebeziumDeserializeSchema build() { return new RowDataDebeziumDeserializeSchema( @@ -284,8 +273,7 @@ public RowDataDebeziumDeserializeSchema build() { validator, serverTimeZone, userDefinedConverterFactory, - changelogMode, - metricOption); + changelogMode); } } From 208d320408ab85bb6a5b8dacae9444fe916d490d Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Thu, 19 Sep 2024 14:49:51 +0800 Subject: [PATCH 3/5] fix: SourceExactlyMetric failed to instantiate without inlong.metric.labels set --- .../org/apache/inlong/sort/postgre/DebeziumSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 689e14b40e3..d6de2d1f171 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -236,7 +236,7 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); - if (sourceExactlyMetric == null) { + if (sourceExactlyMetric == null && metricOption != null) { sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); } if (deserializer instanceof RowDataDebeziumDeserializeSchema) { From e13b7253695fddb0ca8763e58901d12ad20aecaa Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Thu, 19 Sep 2024 19:11:52 +0800 Subject: [PATCH 4/5] fix: add null check when trying to change metrics --- .../sort/postgre/DebeziumSourceFunction.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index d6de2d1f171..4edb52b9c7c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -209,7 +209,7 @@ public class DebeziumSourceFunction extends RichSourceFunction private final MetricOption metricOption; /** The map to store the start time of each checkpoint. */ - private transient Map checkpointStartTimeMap = new HashMap<>(); + private transient Map checkpointStartTimeMap; // --------------------------------------------------------------------------------------- @@ -236,7 +236,7 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); - if (sourceExactlyMetric == null && metricOption != null) { + if (metricOption != null) { sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); } if (deserializer instanceof RowDataDebeziumDeserializeSchema) { @@ -244,9 +244,7 @@ public void open(Configuration parameters) throws Exception { .setSourceExactlyMetric(sourceExactlyMetric); } // instantiate checkpointStartTimeMap after restoring from checkpoint - if (checkpointStartTimeMap == null) { - checkpointStartTimeMap = new HashMap<>(); - } + checkpointStartTimeMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -361,7 +359,9 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th } else { LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); } - sourceExactlyMetric.incNumSnapshotCreate(); + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -544,7 +544,7 @@ public void notifyCheckpointComplete(long checkpointId) { // get the start time of the currently completed checkpoint if (checkpointStartTimeMap != null) { Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); - if (snapShotStartTimeById != null) { + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { sourceExactlyMetric.incNumCompletedSnapshots(); sourceExactlyMetric .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); From ab4420fbf17588e50563053bf25b0baa0de0c85b Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Mon, 23 Sep 2024 20:40:09 +0800 Subject: [PATCH 5/5] refactor: used method instead of @Setter and removed redundant variables --- .../apache/inlong/sort/postgre/DebeziumSourceFunction.java | 3 +-- .../sort/postgre/RowDataDebeziumDeserializeSchema.java | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 4edb52b9c7c..d89fdc62325 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -353,9 +353,8 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } // record the start time of each checkpoint - Long checkpointId = functionSnapshotContext.getCheckpointId(); if (checkpointStartTimeMap != null) { - checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); } else { LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 33d51a013b8..8afca47c949 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -34,7 +34,6 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; -import lombok.Setter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -102,7 +101,6 @@ public interface ValueValidator extends Serializable { private final DebeziumChangelogMode changelogMode; /** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */ - @Setter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -712,4 +710,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** setter to enable DebeziumSourceFunction to set the metric */ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } }