From 713a31256f1b97ccc647f74b1c3a4f1cda7341ff Mon Sep 17 00:00:00 2001 From: Neelkanth Kaushik Date: Fri, 23 May 2025 10:19:18 +0530 Subject: [PATCH 1/4] LIBRARIES-2612 - Refactor Dequeue Handling Operation --- .../core/lib/plugins/segment_destination.dart | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/core/lib/plugins/segment_destination.dart b/packages/core/lib/plugins/segment_destination.dart index 501d213..55c0329 100644 --- a/packages/core/lib/plugins/segment_destination.dart +++ b/packages/core/lib/plugins/segment_destination.dart @@ -18,43 +18,57 @@ class SegmentDestination extends DestinationPlugin with Flushable { _queuePlugin = QueueFlushingPlugin(sendEvents); } + /// Splits a list of raw events into size-limited batches. + /// Sends them to the Segment API one by one. + /// Tracks which batches succeed or fail. + /// Only dequeues (removes) events that were successfully sent. + /// Logs everything for monitoring and debugging. Future sendEvents(List events) async { + + // If the events list is empty, the function exits early without doing anything. if (events.isEmpty) { return; } + // Break the list of events into smaller batches using the chunk() utility. final List> chunkedEvents = chunk(events, analytics?.state.configuration.state.maxBatchSize ?? maxEventsPerBatch, maxKB: maxPayloadSizeInKb); - final List sentEvents = []; - var numFailedEvents = 0; + final List sentEvents = []; // a list to collect events that were successfully sent. + var numFailedEvents = 0; // a counter to keep track of how many events failed to send. + // Iterate over each batch in chunkedEvents sequentially using await await Future.forEach(chunkedEvents, (batch) async { try { + // Send the current batch to the server. final succeeded = await analytics?.httpClient.startBatchUpload( analytics!.state.configuration.state.writeKey, batch, host: _apiHost); - if (succeeded == null || !succeeded) { - numFailedEvents += batch.length; + // succeeded is true if the server confirms the batch was accepted. + if (succeeded == true) { + sentEvents.addAll(batch); // If the upload succeeded, all events in the batch are added to sentEvents. + } else { + numFailedEvents += batch.length; // If failed, increase the numFailedEvents counter by the number of events in the failed batch. } - sentEvents.addAll(batch); } catch (e) { numFailedEvents += batch.length; - } finally { - _queuePlugin.dequeue(sentEvents); } }); + // After all batches have been processed if (sentEvents.isNotEmpty) { - log("Sent ${sentEvents.length} events", kind: LogFilterKind.debug); + // Remove successfully sent events from the queue and log them + _queuePlugin.dequeue(sentEvents); + log("Successfully Sent ${sentEvents.length} events.", kind: LogFilterKind.debug); } if (numFailedEvents > 0) { - log("Failed to send $numFailedEvents events", kind: LogFilterKind.error); + // If any events failed to send, log an error message indicating how many. + log("Failed to send $numFailedEvents events.", kind: LogFilterKind.error); } - return; + return; // No value is returned, but it signals that async processing is complete. } @override From 05dde325f6b6040ce5485ddb57556d0674af3416 Mon Sep 17 00:00:00 2001 From: Neelkanth Kaushik Date: Fri, 23 May 2025 10:23:42 +0530 Subject: [PATCH 2/4] LIBRARIES-2612 - Refactor Dequeue Handling Operation --- .../core/lib/plugins/segment_destination.dart | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/packages/core/lib/plugins/segment_destination.dart b/packages/core/lib/plugins/segment_destination.dart index 55c0329..36f4643 100644 --- a/packages/core/lib/plugins/segment_destination.dart +++ b/packages/core/lib/plugins/segment_destination.dart @@ -18,57 +18,43 @@ class SegmentDestination extends DestinationPlugin with Flushable { _queuePlugin = QueueFlushingPlugin(sendEvents); } - /// Splits a list of raw events into size-limited batches. - /// Sends them to the Segment API one by one. - /// Tracks which batches succeed or fail. - /// Only dequeues (removes) events that were successfully sent. - /// Logs everything for monitoring and debugging. Future sendEvents(List events) async { - - // If the events list is empty, the function exits early without doing anything. if (events.isEmpty) { return; } - // Break the list of events into smaller batches using the chunk() utility. final List> chunkedEvents = chunk(events, analytics?.state.configuration.state.maxBatchSize ?? maxEventsPerBatch, maxKB: maxPayloadSizeInKb); - final List sentEvents = []; // a list to collect events that were successfully sent. - var numFailedEvents = 0; // a counter to keep track of how many events failed to send. + final List sentEvents = []; + var numFailedEvents = 0; - // Iterate over each batch in chunkedEvents sequentially using await await Future.forEach(chunkedEvents, (batch) async { try { - // Send the current batch to the server. final succeeded = await analytics?.httpClient.startBatchUpload( analytics!.state.configuration.state.writeKey, batch, host: _apiHost); - // succeeded is true if the server confirms the batch was accepted. if (succeeded == true) { - sentEvents.addAll(batch); // If the upload succeeded, all events in the batch are added to sentEvents. + sentEvents.addAll(batch); } else { - numFailedEvents += batch.length; // If failed, increase the numFailedEvents counter by the number of events in the failed batch. + numFailedEvents += batch.length; } } catch (e) { numFailedEvents += batch.length; } }); - // After all batches have been processed if (sentEvents.isNotEmpty) { - // Remove successfully sent events from the queue and log them _queuePlugin.dequeue(sentEvents); log("Successfully Sent ${sentEvents.length} events.", kind: LogFilterKind.debug); } if (numFailedEvents > 0) { - // If any events failed to send, log an error message indicating how many. log("Failed to send $numFailedEvents events.", kind: LogFilterKind.error); } - return; // No value is returned, but it signals that async processing is complete. + return; } @override From 78bfc961789a10dd8a32e9b147238113f773ab02 Mon Sep 17 00:00:00 2001 From: Neelkanth Kaushik Date: Fri, 23 May 2025 10:24:43 +0530 Subject: [PATCH 3/4] LIBRARIES-2612 - Refactor Dequeue Handling Operation --- packages/core/lib/plugins/segment_destination.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/lib/plugins/segment_destination.dart b/packages/core/lib/plugins/segment_destination.dart index 36f4643..5f0df7c 100644 --- a/packages/core/lib/plugins/segment_destination.dart +++ b/packages/core/lib/plugins/segment_destination.dart @@ -47,11 +47,11 @@ class SegmentDestination extends DestinationPlugin with Flushable { if (sentEvents.isNotEmpty) { _queuePlugin.dequeue(sentEvents); - log("Successfully Sent ${sentEvents.length} events.", kind: LogFilterKind.debug); + log("Successfully Sent ${sentEvents.length} events", kind: LogFilterKind.debug); } if (numFailedEvents > 0) { - log("Failed to send $numFailedEvents events.", kind: LogFilterKind.error); + log("Failed to send $numFailedEvents events", kind: LogFilterKind.error); } return; From 8b31680c06f05b7d779c766f4ca6f9275efb4a81 Mon Sep 17 00:00:00 2001 From: Neelkanth Kaushik Date: Fri, 23 May 2025 10:30:38 +0530 Subject: [PATCH 4/4] Refactor Dequeue Handling Operation --- packages/core/lib/plugins/segment_destination.dart | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/core/lib/plugins/segment_destination.dart b/packages/core/lib/plugins/segment_destination.dart index 5f0df7c..e33201e 100644 --- a/packages/core/lib/plugins/segment_destination.dart +++ b/packages/core/lib/plugins/segment_destination.dart @@ -30,14 +30,15 @@ class SegmentDestination extends DestinationPlugin with Flushable { final List sentEvents = []; var numFailedEvents = 0; + // Iterate over each batch in chunkedEvents sequentially await Future.forEach(chunkedEvents, (batch) async { try { final succeeded = await analytics?.httpClient.startBatchUpload( analytics!.state.configuration.state.writeKey, batch, host: _apiHost); - if (succeeded == true) { + if (succeeded == true) { // If the upload succeeded, all events in the batch are added to sentEvents. sentEvents.addAll(batch); - } else { + } else { // If it failed, increase the numFailedEvents counter by the number of events in the failed batch. numFailedEvents += batch.length; } } catch (e) { @@ -46,7 +47,7 @@ class SegmentDestination extends DestinationPlugin with Flushable { }); if (sentEvents.isNotEmpty) { - _queuePlugin.dequeue(sentEvents); + _queuePlugin.dequeue(sentEvents); // Removed events that were successfully sent from the internal queue log("Successfully Sent ${sentEvents.length} events", kind: LogFilterKind.debug); }