11import { Heap } from 'heap-js' ;
22import { SdkComponent } from '@temporalio/common' ;
33import { native } from '@temporalio/core-bridge' ;
4- import { DefaultLogger , LogEntry , Logger , LogTimestamp } from './logger' ;
4+ import { DefaultLogger , FlushableLogger , LogEntry , Logger , LogTimestamp } from './logger' ;
55
66/**
77 * A log collector that accepts log entries either through the TS `Logger` interface (e.g. used by
@@ -25,10 +25,61 @@ export class NativeLogCollector {
2525
2626 protected buffer = new Heap < LogEntry > ( ( a , b ) => Number ( a . timestampNanos - b . timestampNanos ) ) ;
2727
28+ /**
29+ * A timer that periodically flushes the buffer to the downstream logger.
30+ */
31+ protected flushIntervalTimer : NodeJS . Timeout ;
32+
33+ /**
34+ * The minimum time an entry should be buffered before getting flushed.
35+ *
36+ * Increasing this value allows the buffer to do a better job of correctly reordering messages
37+ * emitted from different sources (notably from Workflow executions through Sinks, and from Core)
38+ * based on their absolute timestamps, but also increases latency of logs.
39+ *
40+ * The minimum buffer time requirement only applies as long as the buffer is not full. Once the
41+ * buffer reaches its maximum size, older messages are unconditionally flushed, to prevent
42+ * unbounded growth of the buffer.
43+ *
44+ * TODO(JWH): Is 100ms a reasonable compromise? That might seem a little high on latency, but to
45+ * be useful, that value needs to exceed the time it typically takes to process
46+ * Workflow Activations, let's say above the expected P90, but that's highly variable
47+ * across our user base, and we don't really have field data anyway.
48+ * We can revisit depending on user feedback.
49+ */
50+ protected readonly minBufferTimeMs = 100 ;
51+
52+ /**
53+ * Interval between flush passes checking for expired messages.
54+ *
55+ * This really is redundant, since Core itself is expected to flush its buffer every 10 ms, and
56+ * we're checking for expired messages when it does. However, Core will only flush if it has
57+ * accumulated at least one message; when Core's log level is set to WARN or higher, it may be
58+ * many seconds, and even minutes, between Core's log messages, resulting in very rare flush
59+ * from that end, which cause considerable delay on flushing log messages from other sources.
60+ */
61+ protected readonly flushPassIntervalMs = 100 ;
62+
63+ /**
64+ * The maximum number of log messages to buffer before flushing.
65+ *
66+ * When the buffer reaches this limit, older messages are unconditionally flushed (i.e. without
67+ * regard to the minimum buffer time requirement), to prevent unbounded growth of the buffer.
68+ */
69+ protected readonly maxBufferSize = 2000 ;
70+
2871 constructor ( downstream : Logger ) {
29- this . logger = new DefaultLogger ( 'TRACE' , ( entry ) => this . buffer . add ( entry ) ) ;
72+ this . logger = new DefaultLogger ( 'TRACE' , this . appendOne . bind ( this ) ) ;
73+ ( this . logger as FlushableLogger ) . flush = this . flush . bind ( this ) ;
74+ ( this . logger as FlushableLogger ) . close = this . close . bind ( this ) ;
75+
3076 this . downstream = downstream ;
3177 this . receive = this . receive . bind ( this ) ;
78+
79+ // Flush matured messages from the buffer every so often.
80+ // Unref'ed so that it doesn't prevent the process from exiting if ever the
81+ // runtime doesn't close the logger properly for whatever reason.
82+ this . flushIntervalTimer = setInterval ( this . flushMatured . bind ( this ) , this . flushPassIntervalMs ) . unref ( ) ;
3283 }
3384
3485 /**
@@ -44,14 +95,20 @@ export class NativeLogCollector {
4495 this . buffer . add ( log ) ;
4596 }
4697 }
47- this . flush ( ) ;
98+ this . flushExcess ( ) ;
99+ this . flushMatured ( ) ;
48100 } catch ( _e ) {
49101 // We're not allowed to throw from here, and conversion errors have already been handled in
50102 // convertFromNativeLogEntry(), so an error at this point almost certainly indicates a problem
51103 // with the downstream logger. Just swallow it, there's really nothing else we can do.
52104 }
53105 }
54106
107+ private appendOne ( entry : LogEntry ) : void {
108+ this . buffer . add ( entry ) ;
109+ this . flushExcess ( ) ;
110+ }
111+
55112 private convertFromNativeLogEntry ( entry : native . JsonString < native . LogEntry > ) : LogEntry | undefined {
56113 try {
57114 const log = JSON . parse ( entry ) as native . LogEntry ;
@@ -78,15 +135,62 @@ export class NativeLogCollector {
78135 }
79136
80137 /**
81- * Flush all buffered logs into the logger supplied to the constructor/
138+ * Flush messages that have exceeded their required minimal buffering time.
82139 */
83- flush ( ) : void {
140+ private flushMatured ( ) : void {
141+ const threadholdTimeNanos = BigInt ( Date . now ( ) - this . minBufferTimeMs ) * 1_000_000n ;
142+ for ( ; ; ) {
143+ const entry = this . buffer . peek ( ) ;
144+ if ( ! entry || entry . timestampNanos > threadholdTimeNanos ) break ;
145+ this . buffer . pop ( ) ;
146+
147+ this . downstream . log ( entry . level , entry . message , {
148+ [ LogTimestamp ] : entry . timestampNanos ,
149+ ...entry . meta ,
150+ } ) ;
151+ }
152+ }
153+
154+ /**
155+ * Flush messages in excess of the buffer size limit, starting with oldest ones, without regard
156+ * to the `minBufferTimeMs` requirement. This is called every time messages are appended to the
157+ * buffer, to prevent unbounded growth of the buffer when messages are being emitted at high rate.
158+ *
159+ * The only downside of flushing messages before their time is that it increases the probability
160+ * that messages from different sources might end up being passed down to the downstream logger
161+ * in the wrong order; e.g. if an "older" message emitted by the Workflow Logger is received by
162+ * the Collector after we've already flushed a "newer" message emitted by Core. This is totally
163+ * acceptable, and definitely better than a memory leak caused by unbounded growth of the buffer.
164+ */
165+ private flushExcess ( ) : void {
166+ let excess = this . buffer . size ( ) - this . maxBufferSize ;
167+ while ( excess -- > 0 ) {
168+ const entry = this . buffer . pop ( ) ;
169+ if ( ! entry ) break ;
170+
171+ this . downstream . log ( entry . level , entry . message , {
172+ [ LogTimestamp ] : entry . timestampNanos ,
173+ ...entry . meta ,
174+ } ) ;
175+ }
176+ }
177+
178+ /**
179+ * Flush all messages contained in the buffer, without regard to the `minBufferTimeMs` requirement.
180+ *
181+ * This is called on Runtime and on Worker shutdown.
182+ */
183+ public flush ( ) : void {
84184 for ( const entry of this . buffer ) {
85185 this . downstream . log ( entry . level , entry . message , {
86186 [ LogTimestamp ] : entry . timestampNanos ,
87187 ...entry . meta ,
88188 } ) ;
89189 }
90- this . buffer . clear ( ) ;
190+ }
191+
192+ public close ( ) : void {
193+ this . flush ( ) ;
194+ clearInterval ( this . flushIntervalTimer ) ;
91195 }
92196}
0 commit comments