Skip to content

Commit

Permalink
fix: modify execution time for streaming jobs
Browse files Browse the repository at this point in the history
- During windows processing the execution time will be set to actual time when window processing starts.
  • Loading branch information
gabb1er committed Feb 6, 2024
1 parent 234f9e1 commit 3e1211a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ final case class DQStreamWindowJob(settings: AppSettings,
private val bufferStage: String = RunStage.CheckProcessorBuffer.entryName

/**
* Returns copy of the application settings object with reference and execution date being
* set for the provided window start time.
* Returns copy of the application settings object with modified execution and reference dates:
* - reference date is set to the window start time
* - execution date is set to current time (time when windows processing has started)
*
* @param windowId Window start time as unix epoch
* @return Copy of the application settings with updated reference and execution datetime.
*/
private def copySettings(windowId: Long): AppSettings = settings.copy(
executionDateTime = EnrichedDT.fromEpoch(
windowId, settings.executionDateTime.dateFormat, settings.executionDateTime.timeZone),
executionDateTime = settings.executionDateTime.resetToCurrentTime,
referenceDateTime = EnrichedDT.fromEpoch(
windowId, settings.referenceDateTime.dateFormat, settings.referenceDateTime.timeZone)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ case class EnrichedDT(dateFormat: DateFormat, timeZone: ZoneId, dateString: Opti
def getUtcTsWithOffset(offset: Duration): Timestamp = Timestamp.valueOf(
zonedDT.minusSeconds(offset.toSeconds).withZoneSameInstant(ZoneId.of("UTC")).toLocalDateTime
)

/**
* Returns a new EnrichedDT instance with the same date format and time zone, but with time
* being set to current time.
* @return New instance of EnrichedDT for current time.
*/
def resetToCurrentTime: EnrichedDT = EnrichedDT(this.dateFormat, this.timeZone)
}

object EnrichedDT {
/**
* Builds EnrichedDT instance from Unix epoch (in seconds)
Expand Down

0 comments on commit 3e1211a

Please sign in to comment.