Skip to content

Commit

Permalink
fix: handle invalid json issue (#486)
Browse files Browse the repository at this point in the history
* fix: handle invalid message json

* refactor: use cleanUpEvents()

* chore: add doc

* fix: handle scenario with multiple invalid and valid events in a single batch

Previously, Invalid events were not being marked as device mode processing done.
  • Loading branch information
1abhishekpandey authored Dec 6, 2024
1 parent 179c262 commit 4579fc2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
32 changes: 20 additions & 12 deletions core/src/main/java/com/rudderstack/android/sdk/core/FlushUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static com.rudderstack.android.sdk.core.RudderNetworkManager.Result;
import static com.rudderstack.android.sdk.core.util.Utils.getBatch;
import static com.rudderstack.android.sdk.core.util.Utils.getNumberOfBatches;
import androidx.annotation.Nullable;

import android.text.TextUtils;

Expand Down Expand Up @@ -88,6 +89,7 @@ static boolean flushToServer(int flushQueueSize, String dataPlaneUrl, DBPersiste

} else {
lastErrorMessage = ReportManager.LABEL_TYPE_PAYLOAD_NULL;
dbManager.markCloudModeDone(batchMessageIds);
}
RudderLogger.logWarn(String.format(Locale.US, "EventRepository: flush: Failed to send batch %d/%d retrying again, %d retries left", i, numberOfBatches, retries));
}
Expand Down Expand Up @@ -132,6 +134,7 @@ private static void reportBatchesAndMessages(int numberOfBatches, int messagesSi
* of deserialization and forming the payload object and creating the json string
* again from the object
* */
@Nullable
static String getPayloadFromMessages(List<Integer> messageIds, List<String> messages) {
if (messageIds.isEmpty() || messages.isEmpty()) {
RudderLogger.logWarn("FlushUtils: getPayloadFromMessages: Payload Construction failed: no messages to send");
Expand Down Expand Up @@ -159,22 +162,27 @@ static String getPayloadFromMessages(List<Integer> messageIds, List<String> mess
String message = messages.get(index);
// strip last ending object character
message = message.substring(0, message.length() - 1);
// add sentAt time stamp
message = String.format("%s,\"sentAt\":\"%s\"},", message, sentAtTimestamp);
// add message size to batch size
messageSize = Utils.getUTF8Length(message);
totalBatchSize += messageSize;
// check batch size
if (totalBatchSize >= Utils.MAX_BATCH_SIZE) {
RudderLogger.logDebug(String.format(Locale.US, "FlushUtils: getPayloadFromMessages: MAX_BATCH_SIZE reached at index: %d | Total: %d", index, totalBatchSize));
incrementDiscardedCounter(1, Collections.singletonMap(LABEL_TYPE, ReportManager.LABEL_TYPE_BATCH_SIZE_INVALID));
break;
// Handle Invalid Message whose length is 0
if (!message.isEmpty()) {
// add sentAt time stamp
message = String.format("%s,\"sentAt\":\"%s\"},", message, sentAtTimestamp);
// add message size to batch size
messageSize = Utils.getUTF8Length(message);
totalBatchSize += messageSize;
// check batch size
if (totalBatchSize >= Utils.MAX_BATCH_SIZE) {
RudderLogger.logDebug(String.format(Locale.US, "FlushUtils: getPayloadFromMessages: MAX_BATCH_SIZE reached at index: %d | Total: %d", index, totalBatchSize));
incrementDiscardedCounter(1, Collections.singletonMap(LABEL_TYPE, ReportManager.LABEL_TYPE_BATCH_SIZE_INVALID));
break;
}
// finally add message string to builder
batchMessagesBuilder.append(message);
}
// finally add message string to builder
batchMessagesBuilder.append(message);
// add message to batch ArrayLists
batchMessageIds.add(messageIds.get(index));
}
// If the batchMessagesBuilder is empty, return null
if (batchMessagesBuilder.length() == 0) return null;
if (batchMessagesBuilder.charAt(batchMessagesBuilder.length() - 1) == ',') {
// remove trailing ','
batchMessagesBuilder.deleteCharAt(batchMessagesBuilder.length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public void run() {
RudderLogger.logInfo(String.format(Locale.US, "CloudModeManager: cloudModeProcessor: ServerResponse: %d", result.statusCode));
if (result.status == NetworkResponses.SUCCESS) {
ReportManager.incrementCloudModeUploadSuccessCounter(messageIds.size());
dbManager.markCloudModeDone(messageIds);
dbManager.runGcForEvents();
cleanUpEvents(messageIds);
exponentialBackOff.resetBackOff();
upTimeInMillis = Utils.getUpTimeInMillis();
sleepCount = Utils.getSleepDurationInSecond(upTimeInMillis, Utils.getUpTimeInMillis());
} else {
incrementCloudModeUploadRetryCounter(1);
}
} else {
cleanUpEvents(messageIds);
}
}
}
Expand Down Expand Up @@ -112,6 +113,11 @@ public void run() {
}.start();
}

private void cleanUpEvents(List<Integer> messageIds) {
dbManager.markCloudModeDone(messageIds);
dbManager.runGcForEvents();
}

private void deleteEventsWithoutAnonymousId(ArrayList<String> messages, ArrayList<Integer> messageIds) {
List<Integer> eventsToDelete = new ArrayList<>();
for (int i = 0; i < messages.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ private void replayMessageQueue() {
RudderMessage message = RudderGson.deserialize(messages.get(i), RudderMessage.class);
if (message != null) {
processMessage(message, messageIds.get(i), true);
} else {
markDeviceModeTransformationDone(messageIds.get(i));
}
} catch (Exception e) {
ReportManager.reportError(e);
Expand Down

0 comments on commit 4579fc2

Please sign in to comment.