Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for release #160

Merged
merged 23 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
82fe0dc
Merge pull request #4 from swisspush/develop
lbovet Jan 8, 2018
913ee4e
updating poms for branch'release-3.1.1' with non-snapshot versions
Jan 23, 2024
b861e4a
updating poms for 3.1.2-SNAPSHOT development
Jan 23, 2024
1781490
Merge branch 'release-3.1.1'
Jan 23, 2024
9efaba1
updating develop poms to master versions to avoid merge conflicts
Jan 23, 2024
3b8c16f
Merge branch 'master' into develop
Jan 23, 2024
60d8a17
Updating develop poms back to pre merge state
Jan 23, 2024
9b28b3e
Merge branch 'lbovet:master' into develop
Kusig Feb 19, 2024
0de96ef
[SDCISA-15223] Drop logger implementation dependency
hiddenalpha Mar 19, 2024
f249d1c
Merge pull request #153 from hiddenalpha/libraries-must-not-include-l…
hiddenalpha Mar 21, 2024
52b5ced
[SDCISA-13746] Add simple time measuring log.
hiddenalpha Mar 26, 2024
973c589
[SDCISA-13746] Add formatting bloat.
hiddenalpha Mar 27, 2024
4111082
Fix few bad habits in RedisquesHttpRequestHandler
hiddenalpha Mar 27, 2024
e7b4c46
Merge pull request #156 from hiddenalpha/SDCISA-13746-AddSimpleTimeMe…
hiddenalpha Mar 27, 2024
e10d8e1
[SDCISA-13746] ReWrite QueueStats merging from an exponential algorit…
hiddenalpha Mar 28, 2024
7ec89e1
[SDCISA-13746] Apply PR feedback.
hiddenalpha Mar 28, 2024
71d648f
[SDCISA-13746] Apply PR feedback (#2)
hiddenalpha Apr 2, 2024
b9d47aa
Apply PR feedback.
hiddenalpha Apr 2, 2024
91309a9
Update Vert.x to 4.5.2 to solve CVE-2024-1023
mcweba Apr 2, 2024
b8a8eca
Merge pull request #157 from hiddenalpha/FixBadHabitsInRedisquesHttpR…
hiddenalpha Apr 2, 2024
b9308de
Merge pull request #159 from swisspost/feature/Update_vertx_452
mcweba Apr 2, 2024
b529ebe
Merge pull request #158 from hiddenalpha/SDCISA-13746-QueuestatsMustN…
hiddenalpha Apr 2, 2024
de6bdb1
Update pom.xml
mcweba Apr 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.1.2-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down Expand Up @@ -70,11 +70,6 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>${slf4j.version}</version>
</dependency>

<!-- TEST dependencies -->
<dependency>
Expand Down Expand Up @@ -389,15 +384,15 @@
</profile>
</profiles>
<properties>
<vertx.version>4.5.1</vertx.version>
<vertx.version>4.5.2</vertx.version>
<slf4j.version>2.0.10</slf4j.version>
<mockito.version>5.8.0</mockito.version>
<junit.version>4.13.2</junit.version>
<awaitility.version>4.2.0</awaitility.version>
<jedis.version>3.7.0</jedis.version>
<rest-assured.version>5.4.0</rest-assured.version>
<commons-coded.version>1.16.0</commons-coded.version>
<jackson-databind.version>2.15.0</jackson-databind.version>
<jackson-databind.version>2.17.0</jackson-databind.version>
<project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
<sonatypeOssDistMgmtSnapshotsUrl>
https://oss.sonatype.org/content/repositories/snapshots/
Expand Down
225 changes: 225 additions & 0 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package org.swisspush.redisques;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static java.lang.Long.compare;
import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;
import static org.slf4j.LoggerFactory.getLogger;
import static org.swisspush.redisques.util.RedisquesAPI.*;


/**
* <p>Old impl did fetch all queues (take 2000 as an example from PaISA prod) Did
* a nested iteration, so worst case iterate 2000 times 2000 queues to find the
* matching queue (2'000 * 2'000 = 4'000'000 iterations). New impl now does setup a
* dictionary then does indexed access while iterating the 2000 entries
* (2'000 + 2'000 = 4'000 iterations). Keep in mind 2'000 is just some usual value.
* Under load, this value can increase further and so the old, exponential approach
* just did explode in terms of computational efford.</p>
*/
public class QueueStatsService {

private static final Logger log = getLogger(QueueStatsService.class);
private final Vertx vertx;
private final EventBus eventBus;
private final String redisquesAddress;
private final QueueStatisticsCollector queueStatisticsCollector;

public QueueStatsService(Vertx vertx, EventBus eventBus, String redisquesAddress, QueueStatisticsCollector queueStatisticsCollector) {
this.vertx = vertx;
this.eventBus = eventBus;
this.redisquesAddress = redisquesAddress;
this.queueStatisticsCollector = queueStatisticsCollector;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
var req0 = new GetQueueStatsRequest<CTX>();
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; }
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; }
mergeRetryDetailsIntoCollectedData(req2, (ex3, req3) -> {
if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; }
req3.mentor.onQueueStatistics(req3.queues, req3.mCtx);
});
});
});
}

private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
String filter = req.mentor.filter(req.mCtx);
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
if (ev.failed()) {
onDone.accept(new Exception("eventBus.request()", ev.cause()), null);
return;
}
Message<JsonObject> msg = ev.result();
JsonObject body = msg.body();
String status = body.getString(STATUS);
if (!OK.equals(status)) {
onDone.accept(new Exception("Unexpected status " + status), null);
return;
}
JsonArray queuesJsonArr = body.getJsonArray(QUEUES);
if (queuesJsonArr == null || queuesJsonArr.isEmpty()) {
log.debug("result was {}, we return an empty result.", queuesJsonArr == null ? "null" : "empty");
req.queues = emptyList();
onDone.accept(null, req);
return;
}
boolean includeEmptyQueues = req.mentor.includeEmptyQueues(req.mCtx);
List<Queue> queues = new ArrayList<>(queuesJsonArr.size());
for (var it = queuesJsonArr.iterator(); it.hasNext(); ) {
JsonObject queueJson = (JsonObject) it.next();
String name = queueJson.getString(MONITOR_QUEUE_NAME);
Long size = queueJson.getLong(MONITOR_QUEUE_SIZE);
// No need to process empty queues any further if caller is not interested
// in them anyway.
if (!includeEmptyQueues && (size == null || size == 0)) continue;
Queue queue = new Queue(name, size);
queues.add(queue);
}
queues.sort(this::compareLargestFirst);
// Only the part with the most filled queues got requested. Get rid of
// all shorter queues then.
int limit = req.mentor.limit(req.mCtx);
if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit);
req.queues = queues;
onDone.accept(null, req);
});
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> {
req.queueNames = null; // <- no longer needed
long durGetQueueStatsMs = currentTimeMillis() - begGetQueueStatsMs;
log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs);
if (ev.failed()) {
log.warn("queueStatisticsCollector.getQueueStatistics() failed. Fallback to empty result.", ev.cause());
req.queuesJsonArr = new JsonArray();
onDone.accept(null, req);
return;
}
JsonObject queStatsJsonObj = ev.result();
String status = queStatsJsonObj.getString(STATUS);
if (!OK.equals(status)) {
log.warn("queueStatisticsCollector.getQueueStatistics() responded '" + status + "'. Fallback to empty result.", ev.cause());
req.queuesJsonArr = new JsonArray();
onDone.accept(null, req);
return;
}
req.queuesJsonArr = queStatsJsonObj.getJsonArray(QUEUES);
onDone.accept(null, req);
});
}

private <CTX> void mergeRetryDetailsIntoCollectedData(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
// Setup a lookup table as we need to find by name further below.
Map<String, JsonObject> detailsByName = new HashMap<>(req.queuesJsonArr.size());
for (var it = (Iterator<JsonObject>) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) {
JsonObject detailJson = it.next();
String name = detailJson.getString(MONITOR_QUEUE_NAME);
detailsByName.put(name, detailJson);
}
for (Queue queue : req.queues) {
JsonObject detail = detailsByName.get(queue.name);
if (detail == null) continue; // no details to enrich.
JsonObject dequeueStatsJson = detail.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC);
if (dequeueStatsJson == null) continue; // no dequeue stats we could enrich
DequeueStatistic dequeueStats = dequeueStatsJson.mapTo(DequeueStatistic.class);
// Attach whatever details we got.
queue.lastDequeueAttemptEpochMs = dequeueStats.lastDequeueAttemptTimestamp;
queue.lastDequeueSuccessEpochMs = dequeueStats.lastDequeueSuccessTimestamp;
queue.nextDequeueDueTimestampEpochMs = dequeueStats.nextDequeueDueTimestamp;
}
onDone.accept(null, req);
}

private int compareLargestFirst(Queue a, Queue b) {
return compare(b.size, a.size);
}


private static class GetQueueStatsRequest<CTX> {
private CTX mCtx;
private GetQueueStatsMentor<CTX> mentor;
private List<String> queueNames;
private JsonArray queuesJsonArr;
private List<Queue> queues;
}


public static class Queue {
private final String name;
private final long size;
private Long lastDequeueAttemptEpochMs;
private Long lastDequeueSuccessEpochMs;
private Long nextDequeueDueTimestampEpochMs;
private Queue(String name, long size){
assert name != null;
this.name = name;
this.size = size;
}

public String getName() { return name; }
public long getSize() { return size; }
public Long getLastDequeueAttemptEpochMs() { return lastDequeueAttemptEpochMs; }
public Long getLastDequeueSuccessEpochMs() { return lastDequeueSuccessEpochMs; }
public Long getNextDequeueDueTimestampEpochMs() { return nextDequeueDueTimestampEpochMs; }
}


/**
* <p>Mentors fetching operations and so provides the fetcher the required
* information. Finally it also receives the operations result.</p>
*
* @param <CTX>
* The context object of choice handled back to each callback so the mentor
* knows about what request the fetcher is talking.
*/
public static interface GetQueueStatsMentor<CTX> {

/**
* <p>Returning true means that all queues will be present in the result. If
* false, empty queues won't show up the result.</p>
*
* @param ctx See {@link GetQueueStatsMentor}.
*/
public boolean includeEmptyQueues( CTX ctx );

/** <p>Limits the result to the largest N queues.</p> */
public int limit( CTX ctx );

public String filter( CTX ctx);

/** <p>Called ONCE with the final result.</p> */
public void onQueueStatistics(List<Queue> queues, CTX ctx);

/**
* <p>Called as soon an error occurs. After an error occurred, {@link #onQueueStatistics(List, Object)}
* will NOT be called, as the operation did fail.</p>
*/
public void onError(Throwable ex, CTX ctx);
}

}
Loading
Loading