Skip to content

Commit

Permalink
fix: self-defined metrics not present in log and added one missing me…
Browse files Browse the repository at this point in the history
…tric
  • Loading branch information
PeterZh6 committed Sep 18, 2024
1 parent 02cc2bf commit b7cc498
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
registerMetricsForCurrentFetchEventTimeLag();
registerMetricsForCurrentEmitEventTimeLag();
registerMetricsForDeserializeTimeLag();
registerMetricsForNumCompletedSnapshots(new ThreadSafeCounter());
registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter());
registerMetricsForNumDeserializeError(new ThreadSafeCounter());
registerMetricsForNumSnapshotCreate(new ThreadSafeCounter());
Expand Down Expand Up @@ -229,7 +230,7 @@ public void registerMetricsForNumSnapshotError(Counter counter) {
numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter);
}

public void registerMetricsForNumCompletedCheckpoints(Counter counter) {
public void registerMetricsForNumCompletedSnapshots(Counter counter) {
numCompletedSnapshots = registerCounter(NUM_COMPLETED_SNAPSHOTS, counter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
Expand Down Expand Up @@ -202,22 +203,27 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private SourceExactlyMetric sourceExactlyMetric;
/** Self-defined Flink metrics. */
private transient SourceExactlyMetric sourceExactlyMetric;

// record the start time of each checkpoint
private final transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();
private final MetricOption metricOption;

/** The map to store the start time of each checkpoint. */
private transient Map<Long, Long> checkpointStartTimeMap = new HashMap<>();

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -230,9 +236,16 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
// get sourceExactlyMetric from deserializer to record metrics
if (sourceExactlyMetric == null && deserializer instanceof RowDataDebeziumDeserializeSchema) {
sourceExactlyMetric = ((RowDataDebeziumDeserializeSchema) deserializer).getSourceExactlyMetric();
if (sourceExactlyMetric == null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
// instantiate checkpointStartTimeMap after restoring from checkpoint
if (checkpointStartTimeMap == null) {
checkpointStartTimeMap = new HashMap<>();
}
}

Expand Down Expand Up @@ -342,8 +355,12 @@ private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) th
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
// record the start time of each checkpoint
long checkpointId = functionSnapshotContext.getCheckpointId();
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
Long checkpointId = functionSnapshotContext.getCheckpointId();
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
sourceExactlyMetric.incNumSnapshotCreate();
}

Expand Down Expand Up @@ -525,10 +542,15 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.updateLastCheckpointId(checkpointId);
}
// get the start time of the currently completed checkpoint
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null) {
sourceExactlyMetric.incNumCompletedSnapshots();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.postgresql.PostgresConnector;

Expand Down Expand Up @@ -53,6 +55,7 @@ public static class Builder<T> {
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported
Expand Down Expand Up @@ -146,6 +149,12 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

/** metricOption used to instantiate SourceExactlyMetric */
public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
Expand Down Expand Up @@ -178,7 +187,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
PostgreSQLDeserializationConverterFactory.instance())
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.setChangelogMode(changelogMode)
.setMetricOption(metricOption)
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
Expand All @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.inlong.sort.postgre;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

Expand All @@ -35,7 +34,7 @@
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import lombok.Getter;
import lombok.Setter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -101,10 +100,9 @@ public interface ValueValidator extends Serializable {

/** Changelog Mode to use for encoding changes in Flink internal data structure. */
private final DebeziumChangelogMode changelogMode;
private final MetricOption metricOption;

// Getter to make sourceExactlyMetric accessible to DebeziumSourceFunction
@Getter
/** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */
@Setter
private SourceExactlyMetric sourceExactlyMetric;

/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
Expand All @@ -119,8 +117,7 @@ public static Builder newBuilder() {
ValueValidator validator,
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
DebeziumChangelogMode changelogMode,
MetricOption metricOption) {
DebeziumChangelogMode changelogMode) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
this.physicalConverter =
Expand All @@ -131,21 +128,17 @@ public static Builder newBuilder() {
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.validator = checkNotNull(validator);
this.changelogMode = checkNotNull(changelogMode);
this.metricOption = metricOption;
}

@Override
public void open() {
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
long deseializeStartTime = System.currentTimeMillis();
long deserializeStartTime = System.currentTimeMillis();
try {
doDeserialize(record, out, deseializeStartTime);
doDeserialize(record, out, deserializeStartTime);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
Expand All @@ -154,7 +147,8 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
}
}

private void doDeserialize(SourceRecord record, Collector<RowData> out, long deseializeStartTime) throws Exception {
private void doDeserialize(SourceRecord record, Collector<RowData> out, long deserializeStartTime)
throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Expand Down Expand Up @@ -192,7 +186,7 @@ private void doDeserialize(SourceRecord record, Collector<RowData> out, long des
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deseializeStartTime);
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}
}

Expand Down Expand Up @@ -239,7 +233,6 @@ public static class Builder {
private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
DeserializationRuntimeConverterFactory.DEFAULT;
private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
private MetricOption metricOption;

public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
Expand Down Expand Up @@ -271,10 +264,6 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
this.changelogMode = changelogMode;
return this;
}
public Builder setMetricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public RowDataDebeziumDeserializeSchema build() {
return new RowDataDebeziumDeserializeSchema(
Expand All @@ -284,8 +273,7 @@ public RowDataDebeziumDeserializeSchema build() {
validator,
serverTimeZone,
userDefinedConverterFactory,
changelogMode,
metricOption);
changelogMode);
}
}

Expand Down

0 comments on commit b7cc498

Please sign in to comment.