diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 6f32d15f76..b05078ad9c 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -124,6 +124,7 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); registerMetricsForDeserializeTimeLag(); + registerMetricsForNumCompletedSnapshots(new ThreadSafeCounter()); registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); registerMetricsForNumDeserializeError(new ThreadSafeCounter()); registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); @@ -229,7 +230,7 @@ public void registerMetricsForNumSnapshotError(Counter counter) { numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); } - public void registerMetricsForNumCompletedCheckpoints(Counter counter) { + public void registerMetricsForNumCompletedSnapshots(Counter counter) { numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5ef6abacb9..689e14b40e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import com.ververica.cdc.debezium.Validator; @@ -202,10 +203,13 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; - private SourceExactlyMetric sourceExactlyMetric; + /** Self-defined Flink metrics. */ + private transient SourceExactlyMetric sourceExactlyMetric; - // record the start time of each checkpoint - private final transient Map checkpointStartTimeMap = new HashMap<>(); + private final MetricOption metricOption; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap = new HashMap<>(); // --------------------------------------------------------------------------------------- @@ -213,11 +217,13 @@ public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, + MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -230,9 +236,16 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); - // get sourceExactlyMetric from deserializer to record metrics - if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) { - sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric(); + if (sourceExactlyMetric == null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .setSourceExactlyMetric(sourceExactlyMetric); + } + // instantiate checkpointStartTimeMap after restoring from checkpoint + if (checkpointStartTimeMap == null) { + checkpointStartTimeMap = new HashMap<>(); } } @@ -342,8 +355,12 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } // record the start time of each checkpoint - long checkpointId = functionSnapshotContext.getCheckpointId(); - checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + Long checkpointId = functionSnapshotContext.getCheckpointId(); + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } sourceExactlyMetric.incNumSnapshotCreate(); } @@ -525,10 +542,15 @@ public void notifyCheckpointComplete(long checkpointId) { schema.updateLastCheckpointId(checkpointId); } // get the start time of the currently completed checkpoint - Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); - if (snapShotStartTimeById != null) { - sourceExactlyMetric.incNumCompletedSnapshots(); - sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null) { + sourceExactlyMetric.incNumCompletedSnapshots(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); } } catch (Exception e) { // ignore exception if we are no longer running diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java index 040541b827..cbd1eb2679 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.debezium.Validator; import io.debezium.connector.postgresql.PostgresConnector; @@ -53,6 +55,7 @@ public static class Builder { private String[] tableList; private Properties dbzProperties; private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** * The name of the Postgres logical decoding plug-in installed on the server. Supported @@ -146,6 +149,12 @@ public Builder deserializer(DebeziumDeserializationSchema deserializer) { return this; } + /** metricOption used to instantiate SourceExactlyMetric */ + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); @@ -178,7 +187,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator()); + deserializer, props, null, Validator.getDefaultValidator(), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 6e4bd7c922..cabfe255fc 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setMetricOption(metricOption) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .slotName(slotName) .debeziumProperties(dbzProperties) .deserializer(deserializer) + .metricOption(metricOption) .build(); return SourceFunctionProvider.of(sourceFunction, false); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index f5075ed4e7..33d51a013b 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.postgre; -import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; @@ -35,7 +34,7 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; -import lombok.Getter; +import lombok.Setter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -101,10 +100,9 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private final MetricOption metricOption; - // Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction - @Getter + /** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */ + @Setter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -119,8 +117,7 @@ public static Builder newBuilder() { ValueValidator validator, ZoneId serverTimeZone, DeserializationRuntimeConverterFactory userDefinedConverterFactory, - DebeziumChangelogMode changelogMode, - MetricOption metricOption) { + DebeziumChangelogMode changelogMode) { this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); this.physicalConverter = @@ -131,21 +128,17 @@ public static Builder newBuilder() { this.resultTypeInfo = checkNotNull(resultTypeInfo); this.validator = checkNotNull(validator); this.changelogMode = checkNotNull(changelogMode); - this.metricOption = metricOption; } @Override public void open() { - if (metricOption != null) { - sourceExactlyMetric = new SourceExactlyMetric(metricOption); - } } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - long deseializeStartTime = System.currentTimeMillis(); + long deserializeStartTime = System.currentTimeMillis(); try { - doDeserialize(record, out, deseializeStartTime); + doDeserialize(record, out, deserializeStartTime); } catch (Exception e) { if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeError(); @@ -154,7 +147,8 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } } - private void doDeserialize(SourceRecord record, Collector out, long deseializeStartTime) throws Exception { + private void doDeserialize(SourceRecord record, Collector out, long deserializeStartTime) + throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); @@ -192,7 +186,7 @@ private void doDeserialize(SourceRecord record, Collector out, long des } if (sourceExactlyMetric != null) { sourceExactlyMetric.incNumDeserializeSuccess(); - sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); } } @@ -239,7 +233,6 @@ public static class Builder { private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; - private MetricOption metricOption; public Builder setPhysicalRowType(RowType physicalRowType) { this.physicalRowType = physicalRowType; @@ -271,10 +264,6 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { this.changelogMode = changelogMode; return this; } - public Builder setMetricOption(MetricOption metricOption) { - this.metricOption = metricOption; - return this; - } public RowDataDebeziumDeserializeSchema build() { return new RowDataDebeziumDeserializeSchema( @@ -284,8 +273,7 @@ public RowDataDebeziumDeserializeSchema build() { validator, serverTimeZone, userDefinedConverterFactory, - changelogMode, - metricOption); + changelogMode); } }