Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge to master to trigger release #135

Merged
merged 21 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.0.31-SNAPSHOT</version>
<version>3.0.32-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
29 changes: 16 additions & 13 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.*;

import java.util.*;
Expand Down Expand Up @@ -92,8 +91,6 @@ private enum QueueState {
private QueueActionFactory queueActionFactory;
private RedisquesConfigurationProvider configurationProvider;

private LuaScriptManager luaScriptManager;

private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

public RedisQues() {
Expand Down Expand Up @@ -195,8 +192,7 @@ public void start(Promise<Void> promise) {

private void initialize() {
RedisquesConfiguration configuration = configurationProvider.configuration();
this.luaScriptManager = new LuaScriptManager(redisProvider);
this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider, luaScriptManager,
this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider,
queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec());

RedisquesHttpRequestHandler.init(vertx, configuration);
Expand All @@ -207,7 +203,7 @@ private void initialize() {
configurationProvider.configuration().getMemoryUsageCheckIntervalSec());
}

queueActionFactory = new QueueActionFactory(luaScriptManager, redisProvider, vertx, log,
queueActionFactory = new QueueActionFactory(redisProvider, vertx, log,
queuesKey, queuesPrefix, consumersPrefix, locksKey, queueStatisticsCollector, memoryUsageProvider,
configurationProvider);

Expand Down Expand Up @@ -356,13 +352,20 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s
}

private void registerQueueCheck() {
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> luaScriptManager.handleQueueCheck(queueCheckLastexecKey,
configurationProvider.configuration().getCheckInterval(), shouldCheck -> {
if (shouldCheck) {
log.info("periodic queue check is triggered now");
checkQueues();
}
}));
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent ->
{
redisProvider.connection().onSuccess(conn -> {
conn.send(Request.cmd(Command.SET, queueCheckLastexecKey, System.currentTimeMillis(),
"NX", "EX", configurationProvider.configuration().getCheckInterval()))
.onFailure(throwable -> log.error("Unexepected queue check result")).onSuccess(response -> {
log.info("periodic queue check is triggered now");
checkQueues();
});
}).onFailure(throwable -> {
log.warn("Redis: Failed to trigger queue check.", throwable);
});;

});
}

private void unsupportedOperation(String operation, Message<JsonObject> event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -25,7 +24,6 @@ public abstract class AbstractQueueAction implements QueueAction {

private static final int MAX_AGE_MILLISECONDS = 120000; // 120 seconds

protected final LuaScriptManager luaScriptManager;
protected final RedisProvider redisProvider;
protected final Vertx vertx;
protected final Logger log;
Expand All @@ -37,11 +35,10 @@ public abstract class AbstractQueueAction implements QueueAction {
protected final List<QueueConfiguration> queueConfigurations;
protected final QueueStatisticsCollector queueStatisticsCollector;

public AbstractQueueAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey,
public AbstractQueueAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey,
String queuesPrefix, String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
this.vertx = vertx;
this.luaScriptManager = luaScriptManager;
this.redisProvider = redisProvider;
this.address = address;
this.queuesKey = queuesKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.handler.AddQueueItemHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -17,10 +16,10 @@

public class AddQueueItemAction extends AbstractQueueAction {

public AddQueueItemAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public AddQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -19,10 +18,10 @@

public class BulkDeleteLocksAction extends AbstractQueueAction {

public BulkDeleteLocksAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public BulkDeleteLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -16,10 +15,10 @@

public class BulkDeleteQueuesAction extends AbstractQueueAction {

public BulkDeleteQueuesAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public BulkDeleteQueuesAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.handler.PutLockHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -18,10 +17,10 @@
public class BulkPutLocksAction extends AbstractQueueAction {


public BulkPutLocksAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public BulkPutLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -16,10 +15,10 @@

public class DeleteAllLocksAction extends AbstractQueueAction {

public DeleteAllLocksAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public DeleteAllLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -19,10 +18,10 @@

public class DeleteAllQueueItemsAction extends AbstractQueueAction {

public DeleteAllQueueItemsAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public DeleteAllQueueItemsAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.handler.DeleteLockHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -19,10 +18,10 @@

public class DeleteLockAction extends AbstractQueueAction {

public DeleteLockAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public DeleteLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -15,10 +14,10 @@

public class DeleteQueueItemAction extends AbstractQueueAction {

public DeleteQueueItemAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public DeleteQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.MemoryUsageProvider;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
Expand All @@ -20,10 +19,10 @@ public class EnqueueAction extends AbstractQueueAction {
private final MemoryUsageProvider memoryUsageProvider;
private final int memoryUsageLimitPercent;

public EnqueueAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public EnqueueAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log, MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
this.memoryUsageProvider = memoryUsageProvider;
this.memoryUsageLimitPercent = memoryUsageLimitPercent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.handler.GetAllLocksHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.*;

import java.util.List;
Expand All @@ -16,10 +15,10 @@

public class GetAllLocksAction extends AbstractQueueAction {

public GetAllLocksAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public GetAllLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.handler.GetLockHandler;
import org.swisspush.redisques.lua.LuaScriptManager;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -18,10 +17,10 @@

public class GetLockAction extends AbstractQueueAction {

public GetLockAction(Vertx vertx, LuaScriptManager luaScriptManager, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
public GetLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, luaScriptManager, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
}

Expand Down
Loading