Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11129][Sort] Enhanced source metric instrumentation for InLong Sort Flink Connector #11130

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

PeterZh6
Copy link
Contributor

@PeterZh6 PeterZh6 commented Sep 16, 2024

Fixes #11129

Motivation

The primary goal of this PR is to enhance metric instrumentation for the InLong Sort Flink Connector, specifically for the Postgres-CDC connector. This change is aimed at improving observability by introducing additional metrics that track serialization/deserialization, snapshot states, and checkpoint completion.

Modifications

This feature focuses on SourceMetric only

Deserialization Metrics:
Added counters to track successful and failed deserialization attempts (numDeserializeSuccess, numDeserializeError).
Added latency gauge to measure time taken for deserialization (deserializeTimeLag).

SnapshotState Metrics:
Added counters for the number of snapshots created (numSnapshotCreate) and errors encountered during snapshot operations (numSnapshotError).

NotifyComplete Metrics:
Added a counter to track completed snapshots (numCompletedSnapshots).
Added latency gauge for the time between snapshot creation and checkpoint completion (snapshotToCheckpointTimeLag).

Verifying this change

(Please pick either of the following options)

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

Make Flink takemanager report the metrics, with Slf4jReporter for example.
Add the following configurations to conf/flink-conf.yaml of flink taskmanager and the above mentioned metrics will be printed to the logging file once inlong-sort starts to process data.

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter 
metrics.reporter.slf4j.interval: 15 SECONDS

Documentation

  • Does this pull request introduce a new feature? Yes
  • If yes, how is the feature documented? JavaDocs

woofyzhao
woofyzhao previously approved these changes Sep 16, 2024
@XiaoYou201
Copy link
Contributor

plz provide some test result shortcut in the comment~, thx~

@PeterZh6 PeterZh6 marked this pull request as draft September 18, 2024 07:20
@PeterZh6 PeterZh6 marked this pull request as ready for review September 18, 2024 15:27
@PeterZh6
Copy link
Contributor Author

PeterZh6 commented Sep 18, 2024

plz provide some test result shortcut in the comment~, thx~

Thanks for your question :)

The first image shows the newly-added metrics in Flink Web Dashboard.

Click To View Image pgDemo1

The second image shows all the metrics reported by Slf4jReporter and the newly-added metrics have been highlighted with red rectangles.

Click To View Image

The following log is the metrics copied from the log of the docker container of Flink taskmanager, just for your reference.

Click To View Complete Log

2024-09-18 22:42:47 2024-09-18 14:42:47,470 INFO  org.apache.flink.metrics.slf4j.Slf4jReporter                 [] - 
2024-09-18 22:42:47 =========================== Starting metrics report ===========================
2024-09-18 22:42:47 
2024-09-18 22:42:47 -- Counters -------------------------------------------------------------------
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numRecordsOut: 306
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushBytes: 25648
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBytesInLocal: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushFailedTimes: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numDeserializeError: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.dirtyBytesOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.dirtyRecordsOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFilteredRows: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBytesInRemote: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersInRemote: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numRecordsIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numBytesOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numRecordsOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numBytesIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushTimeNsWithoutRetries: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numBytesOut: 14363
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushTimeNs: 88398294
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBuffersInLocal: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numRecordsIn: 306
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numRecordsInForMeter: 305
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numBytesOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numRecordsIn: 306
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushSucceededTimes: 15
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numRecordsOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numBytesInForMeter: 14363
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numRecordsOut: 306
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersInLocal: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numSnapshotError: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numBytesIn: 14363
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numCompletedSnapshots: 1
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numDeserializeSuccess: 305
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numBytesIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numRecordsIn: 305
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numRecordsOutForMeter: 305
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.totalFlushRows: 305
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numBytesOutForMeter: 14363
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBuffersInRemote: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numBytesOut: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesInLocal: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesInRemote: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numRecordsIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numBytesIn: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numSnapshotCreate: 1
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numRecordsOut: 305
2024-09-18 22:42:47 
2024-09-18 22:42:47 -- Gauges ---------------------------------------------------------------------
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Output.Buffers.outPoolUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Threads.Count: 64
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.Buffers.inputExclusiveBuffersUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.inputFloatingBuffersUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.GarbageCollector.G1 Old Generation.Count: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.currentOutputWatermark: -9223372036854775808
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.Buffers.inPoolUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.GarbageCollector.G1 Young Generation.Count: 5
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.RequestedMemoryUsage: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.ClassLoader.ClassesUnloaded: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.NonHeap.Committed: 104988672
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Mapped.MemoryUsed: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.isBackPressured: false
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.currentInputWatermark: -9223372036854775808
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.CPU.Time: 14110000000
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.outPoolUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.deserializeTimeLag: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.maxHardBackPressureTimeMs: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.inputExclusiveBuffersUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.currentOutputWatermark: -9223372036854775808
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.CPU.Load: 4.369538077403246E-4
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.Buffers.inputQueueLength: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Output.Buffers.outputQueueLength: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.Buffers.inputQueueSize: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.Buffers.inputFloatingBuffersUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.GarbageCollector.G1 Old Generation.Time: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.outputQueueLength: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.maxSoftBackPressureTimeMs: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.AvailableMemory: 134217728
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.inputQueueSize: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.snapshotToCheckpointTimeLag: 489
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.softBackPressuredTimeMsPerSecond: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.currentEmitEventTimeLag: 399
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.AvailableMemorySegments: 4096
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Output.Buffers.outputQueueSize: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Network.AvailableMemorySegments: 4096
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Direct.TotalCapacity: 135851008
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Heap.Committed: 536870912
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.hardBackPressuredTimeMsPerSecond: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Direct.Count: 4140
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.NonHeap.Max: 780140544
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.idleTimeMsPerSecond: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Flink.Memory.Managed.Total: 536870920
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.currentFetchEventTimeLag: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Metaspace.Committed: 70778880
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.currentInputWatermark: -9223372036854775808
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.ClassLoader.ClassesLoaded: 11545
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.inputQueueLength: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.GarbageCollector.G1 Young Generation.Time: 46
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Metaspace.Used: 67911496
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.TotalMemory: 134217728
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.outputQueueSize: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Mapped.Count: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.currentFetchEventTimeLag: 396
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Heap.Used: 135028752
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.sourceIdleTime: 75195
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Heap.Max: 536870912
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Network.TotalMemorySegments: 4096
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.UsedMemory: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Mapped.TotalCapacity: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.checkpointStartDelayNanos: 13000000
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.currentEmitEventTimeLag: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Direct.MemoryUsed: 135851009
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.buffers.inPoolUsage: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.NonHeap.Used: 98140368
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.UsedMemorySegments: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.JVM.Memory.Metaspace.Max: 268435456
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Flink.Memory.Managed.Used: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.backPressuredTimeMsPerSecond: 0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.Status.Shuffle.Netty.TotalMemorySegments: 4096
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.currentOutputWatermark: -9223372036854775808
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.busyTimeMsPerSecond: NaN
2024-09-18 22:42:47 
2024-09-18 22:42:47 -- Meters ---------------------------------------------------------------------
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBuffersInRemotePerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numRecordsInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numBytesOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numRecordsOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesInRemotePerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersInRemotePerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesInLocalPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numBytesInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBuffersInLocalPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numRecordsInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBytesInLocalPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.Shuffle.Netty.Input.numBytesInRemotePerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numBytesInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersInLocalPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numBytesInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numBytesOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numRecordsOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numRecordsInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.groupId.pggroup.streamId.pgStream.nodeId.pgNode.numRecordsInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numRecordsOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numBytesInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numRecordsInPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.groupId.srgroup.streamId.srStream.nodeId.srNode.numRecordsOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBytesOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1] -> ConstraintEnforcer[2] -> Sink: test_output1[2].0.numBuffersOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.ConstraintEnforcer[2].0.numRecordsOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Source: test_input1[1].0.numBytesOutPerSecond: 0.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.numBytesOutPerSecond: 0.0
2024-09-18 22:42:47 
2024-09-18 22:42:47 -- Histograms -----------------------------------------------------------------
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.streamLoadPutTimeMs: count=10, min=1, max=47, mean=10.2, stddev=19.395302982366058, p50=1.0, p75=12.5, p95=47.0, p98=47.0, p99=47.0, p999=47.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.writeDataTimeMs: count=5, min=62, max=79, mean=69.0, stddev=7.1763500472036625, p50=65.0, p75=76.5, p95=79.0, p98=79.0, p99=79.0, p999=79.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.readDataTimeMs: count=0, min=0, max=0, mean=NaN, stddev=NaN, p50=NaN, p75=NaN, p95=NaN, p98=NaN, p99=NaN, p999=NaN
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.loadTimeMs: count=5, min=81, max=171, mean=102.6, stddev=38.91400776070231, p50=82.0, p75=134.5, p95=171.0, p98=171.0, p99=171.0, p999=171.0
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.offerTimeNs: count=0, min=0, max=0, mean=NaN, stddev=NaN, p50=NaN, p75=NaN, p95=NaN, p98=NaN, p99=NaN, p999=NaN
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.flushTimeNs: count=5, min=5923688, max=59784259, mean=1.76796588E7, stddev=2.3563572359127566E7, p50=7772864.0, p75=3.42224035E7, p95=5.9784259E7, p98=5.9784259E7, p99=5.9784259E7, p999=5.9784259E7
2024-09-18 22:42:47 192_168_16_5.taskmanager.192_168_16_5:35611-a1b33e.InLong-Sort-Job.Sink: test_output1[2].0.commitAndPublishTimeMs: count=5, min=8, max=22, mean=11.0, stddev=6.164414002968976, p50=8.0, p75=15.5, p95=22.0, p98=22.0, p99=22.0, p999=22.0
2024-09-18 22:42:47 
2024-09-18 22:42:47 =========================== Finished metrics report ===========================

@PeterZh6
Copy link
Contributor Author

Briefly explaining the fixes made to the instantiation of SourceExactlyMetric: When only the MetricOptions parameter is passed to the SourceExactlyMetric constructor, self-defined metrics are not registered with the metricGroup. Therefore, to ensure proper registration of self-defined metrics, it is necessary to retrieve the metricGroup from the runtimeContext within the sourceFunction and pass it as the second parameter to the SourceExactlyMetric constructor.

@PeterZh6
Copy link
Contributor Author

bugs fixed

@dockerzhang
Copy link
Contributor

@PeterZh6 please fix the failed workflow, thanks.

@PeterZh6
Copy link
Contributor Author

@PeterZh6 please fix the failed workflow, thanks.

Thanks for following up with this. However, I am unable to reproduce the test failure locally, so may still need some time.

@PeterZh6
Copy link
Contributor Author

@PeterZh6 please fix the failed workflow, thanks.

Thanks for following up with this. However, I am unable to reproduce the test failure locally, so may still need some time.

The failure is linked to [Bug][Sort] Mongodb2StarRocksTest Failure Due to Potential Dependency Conflicts #11166
I am currently working on a solution and will submit a pull request to address the problem as soon as possible.

vernedeng
vernedeng previously approved these changes Sep 23, 2024
luchunliang
luchunliang previously approved these changes Sep 23, 2024
@PeterZh6 PeterZh6 changed the title [INLONG-11129][Sort] Enhanced Metric Instrumentation for InLong Sort Flink Connector [INLONG-11129][Sort] Enhanced source metric instrumentation for InLong Sort Flink Connector Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] Enhanced source metric instrumentation for InLong Sort Flink Connector
6 participants