diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 4edb52b9c7..d89fdc6232 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -353,9 +353,8 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } // record the start time of each checkpoint - Long checkpointId = functionSnapshotContext.getCheckpointId(); if (checkpointStartTimeMap != null) { - checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); } else { LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index 33d51a013b..8afca47c94 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -34,7 +34,6 @@ import io.debezium.time.NanoTime; import io.debezium.time.NanoTimestamp; import io.debezium.time.Timestamp; -import lombok.Setter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -102,7 +101,6 @@ public interface ValueValidator extends Serializable { private final DebeziumChangelogMode changelogMode; /** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */ - @Setter private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -712,4 +710,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** setter to enable DebeziumSourceFunction to set the metric */ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } }