Skip to content

Commit

Permalink
Rollback class.
Browse files Browse the repository at this point in the history
  • Loading branch information
almeidast committed Sep 19, 2024
1 parent f2941cf commit 6973703
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -49,13 +52,13 @@ public class QueueStatsService {
private final Semaphore incomingRequestQuota;

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand Down Expand Up @@ -94,10 +97,10 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
});
});
} catch (Exception ex) {
Expand All @@ -110,7 +113,7 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
}
}

private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
String filter = req.mentor.filter(req.mCtx);
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
Expand Down Expand Up @@ -157,7 +160,7 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
});
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> {
Expand All @@ -183,7 +186,7 @@ private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<T
});
}

private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> {
for (Queue queue : req.queues) {
if (event.containsKey(queue.name)) {
Expand All @@ -206,13 +209,13 @@ private int compareLargestFirst(Queue a, Queue b) {
}


static class GetQueueStatsRequest<CTX> {
private static class GetQueueStatsRequest<CTX> {
private CTX mCtx;
private GetQueueStatsMentor<CTX> mentor;
private List<String> queueNames;
/* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */
private JsonArray queuesJsonArr;
List<Queue> queues;
private List<Queue> queues;
}


Expand All @@ -222,7 +225,7 @@ public static class Queue {
private Long lastDequeueAttemptEpochMs;
private Long lastDequeueSuccessEpochMs;
private Long nextDequeueDueTimestampEpochMs;
Queue(String name, long size){
private Queue(String name, long size){
assert name != null;
this.name = name;
this.size = size;
Expand Down

0 comments on commit 6973703

Please sign in to comment.