Skip to content

Commit

Permalink
[INLONG-8629][agent] fix: sending invalid data to dataproxy failed bl…
Browse files Browse the repository at this point in the history
…ocks normal data sending

1. clean the resend queue and release the semaphore
2. sender manager will send the data in the resend queue first
  • Loading branch information
justinwwhuang committed Aug 4, 2023
1 parent 2fc03b4 commit 1943f4a
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ public void Stop() {
shutdown = true;
resendExecutorService.shutdown();
sender.close();
cleanResendQueue();
}

private void cleanResendQueue() {
while (!resendQueue.isEmpty()) {
try {
AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS);
if (callback != null) {
MemoryManager.getInstance()
.release(AGENT_GLOBAL_WRITER_PERMIT, (int) callback.batchMessage.getTotalSize());
}
} catch (InterruptedException e) {
LOGGER.error("clean resend queue error{}", e.getMessage());
}
}
}

private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
Expand Down Expand Up @@ -222,6 +237,10 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry)
boolean suc = false;
while (!suc) {
try {
if (!resendQueue.isEmpty()) {
AgentUtils.silenceSleepInMs(retrySleepTime);
continue;
}
sender.asyncSendMessage(new AgentSenderCallback(batchMessage, retry),
batchMessage.getDataList(), batchMessage.getGroupId(), batchMessage.getStreamId(),
batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
Expand Down

0 comments on commit 1943f4a

Please sign in to comment.