Skip to content

Commit

Permalink
fix: refactor late records processing (#73)
Browse files Browse the repository at this point in the history
- fix micro batch processing logic to not initialize/retrieve
calculators for late records.
- add some additional debug logging including number of late records per
batch.
  • Loading branch information
gabb1er authored Dec 27, 2024
1 parent 3297b90 commit bdf8f00
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ final case class DQStreamWindowJob(jobConfig: JobConfig,
* @return Sequence of windows ready to be processed.
*/
private def getWindowsToProcess: Seq[Long] = {
val minWatermark = buffer.watermarks.readOnlySnapshot()
val watermarks = buffer.watermarks.readOnlySnapshot()
.filter{ case (k, _) => processedSources.contains(k) }
.values.min

val minWatermark = watermarks.values.min

log.debug(s"$bufferStage Watermarks per stream: ${watermarks.map(_.productIterator.mkString(":")).mkString("{", ",", "}")}")
log.debug(s"$bufferStage Minimum watermark: $minWatermark")

val filterWindows = (windows: Iterable[(String, Long)]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.checkita.dqf.core.metrics.rdd

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.util.CollectionAccumulator
import org.checkita.dqf.config.appconf.StreamConfig
import org.checkita.dqf.core.metrics.ErrorCollection.AccumulatedErrors
import org.checkita.dqf.core.metrics.{BasicMetricProcessor, RegularMetric}
Expand All @@ -11,6 +12,7 @@ import org.checkita.dqf.utils.Common._
import org.checkita.dqf.utils.Logging
import org.checkita.dqf.utils.ResultUtils._

import scala.:+
import scala.jdk.CollectionConverters._
import scala.util.Try

Expand All @@ -28,12 +30,13 @@ object RDDMetricStreamProcessor extends RDDMetricProcessor with Logging {
* Type alias for micro-batch state used in streaming applications only.
* State includes following:
* - Maximum observed event timestamp
* - Map of grouped calculators per each window in a stream.
* - Map of grouped calculators per each window in a stream
* - Updated state of checkpoint (in case if checkpoint provided)
* - Number of late records that were skipped
*
* @note window is identified by its start time as unix epoch (in seconds).
*/
type MicroBatchState = (Long, Map[Long, GroupedCalculators], Option[Checkpoint])

type MicroBatchState = (Long, Map[Long, GroupedCalculators], Option[Checkpoint], Int)

/**
* Processes all regular metrics for micro-batch of the given stream.
Expand Down Expand Up @@ -61,9 +64,12 @@ object RDDMetricStreamProcessor extends RDDMetricProcessor with Logging {
if (batchDf.isEmpty)
log.info(s"[STREAM '$streamId'] Skipping metric calculation for empty batch (batchId = $batchId).")
else {
log.debug(s"[STREAM '$streamId'] Start processing regular metrics for non-empty batch (batchId = $batchId).")

val watermark = buffer.watermarks.getOrElse(streamId, throw new NoSuchElementException(
s"Watermark for stream '$streamId' not found."
))
log.debug(s"[STREAM '$streamId'] Current watermark: $watermark.")

val df = if (caseSensitive) batchDf else
batchDf.select(batchDf.columns.map(c => col(c).as(c.toLowerCase)).toSeqUnsafe: _*)
Expand Down Expand Up @@ -93,33 +99,38 @@ object RDDMetricStreamProcessor extends RDDMetricProcessor with Logging {
// get and register metric error accumulator:
val metricErrorAccumulator = getAndRegisterErrorAccumulator

val initState: MicroBatchState = (0L, Map.empty, streamCheckpoint)
val initState: MicroBatchState = (0L, Map.empty, streamCheckpoint, 0)
val updatedState = df.rdd.treeAggregate(initState)(
seqOp = {
case (state: MicroBatchState, row: Row) =>
val rowEventTime = row.get(columnIndexes(streamConf.eventTsCol)).asInstanceOf[Long]
val rowWindowStart = row.get(columnIndexes(streamConf.windowTsCol)).asInstanceOf[Long]

// getting either existing calculators and accumulator for this window or initializing them:
val groupedCalculators = state._2.getOrElse(rowWindowStart, getGroupedCalculators(metricsByColumns))
if (rowEventTime > watermark) {
// record is above watermark and will be processed

// update calculators:
val updatedCalculators: GroupedCalculators = if (rowEventTime > watermark)
updateGroupedCalculators(groupedCalculators, row, columnIndexes)
else {
log.info(s"[STREAM '$streamId'] Records is late to watermark of $watermark: $row")
groupedCalculators
}
// getting either existing calculators and accumulator for this window or initializing them:
val groupedCalculators = state._2.getOrElse(rowWindowStart, getGroupedCalculators(metricsByColumns))

// collect errors:
getErrorsFromGroupedCalculators(updatedCalculators, row, columnIndexes, columnNames, streamKeyIds)
.foreach(err => metricErrorAccumulator.add(rowWindowStart, err))
// update calculators:
val updatedCalculators: GroupedCalculators =
updateGroupedCalculators(groupedCalculators, row, columnIndexes)

// update checkpoint:
val updatedCheckpoint = state._3.map(_.update(row, columnIndexes))

// yield updated state:
(math.max(state._1, rowEventTime), state._2.updated(rowWindowStart, updatedCalculators), updatedCheckpoint)
// collect errors:
getErrorsFromGroupedCalculators(updatedCalculators, row, columnIndexes, columnNames, streamKeyIds)
.foreach(err => metricErrorAccumulator.add(rowWindowStart, err))

// update checkpoint:
val updatedCheckpoint = state._3.map(_.update(row, columnIndexes))

// yield updated state:
(math.max(state._1, rowEventTime), state._2.updated(rowWindowStart, updatedCalculators), updatedCheckpoint, state._4)
} else {
// record is below watermark and will be skipped.
// Return the same state with late records count incremented.
log.debug(s"[STREAM '$streamId'] Records is late to watermark of $watermark: $row")
state.copy(_1 = state._4 + 1)
}
},
combOp = (l, r) => {
val maxEventTime = math.max(l._1, r._1)
Expand All @@ -134,11 +145,20 @@ object RDDMetricStreamProcessor extends RDDMetricProcessor with Logging {
lChk <- l._3
rChk <- r._3
} yield lChk.merge(rChk)

(maxEventTime, mergedState, mergedCheckpoint)

val numLateRecords = l._4 + r._4

(maxEventTime, mergedState, mergedCheckpoint, numLateRecords)
}
)

log.debug(
s"[Stream $streamId] Finished processing with state: " +
s"maxEventTime = ${updatedState._1}; " +
s"numLateRecords = ${updatedState._4}; " +
s"updatedWindows = ${updatedState._2.keySet.toSeq.sorted.mkString("[", ", ", "]")}."
)

// update buffer watermarks:
buffer.watermarks.update(streamId, math.max(watermark, updatedState._1 - streamConf.watermark.toSeconds))

Expand Down

0 comments on commit bdf8f00

Please sign in to comment.