Skip to content

Commit

Permalink
fix: add null check when trying to change metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Sep 23, 2024
1 parent 208d320 commit e13b725
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private final MetricOption metricOption;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();
private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

Expand All @@ -236,17 +236,15 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (sourceExactlyMetric == null && metricOption != null) {
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
// instantiate checkpointStartTimeMap after restoring from checkpoint
if (checkpointStartTimeMap == null) {
checkpointStartTimeMap = new HashMap<>();
}
checkpointStartTimeMap = new HashMap<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -361,7 +359,9 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
sourceExactlyMetric.incNumSnapshotCreate();
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
}

private void snapshotOffsetState(long checkpointId) throws Exception {
Expand Down Expand Up @@ -544,7 +544,7 @@ public void notifyCheckpointComplete(long checkpointId) {
// get the start time of the currently completed checkpoint
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
Expand Down

0 comments on commit e13b725

Please sign in to comment.