diff --git a/Common/src/main/java/me/egg82/antivpn/config/CachedConfig.java b/Common/src/main/java/me/egg82/antivpn/config/CachedConfig.java index a1dc5b97..aaf92166 100644 --- a/Common/src/main/java/me/egg82/antivpn/config/CachedConfig.java +++ b/Common/src/main/java/me/egg82/antivpn/config/CachedConfig.java @@ -29,6 +29,10 @@ private CachedConfig() { } private long sourceCacheTime = new TimeUtil.Time(6L, TimeUnit.HOURS).getMillis(); + private boolean messagingRedundancy = true; + + public boolean getMessagingRedundancy() { return messagingRedundancy; } + public long getSourceCacheTime() { return sourceCacheTime; } private long mcleaksCacheTime = new TimeUtil.Time(1L, TimeUnit.DAYS).getMillis(); @@ -148,6 +152,11 @@ public CachedConfig.Builder messaging(@NotNull List<@NotNull MessagingService> v return this; } + public @NotNull CachedConfig.Builder messagingRedundancy(boolean value) { + values.messagingRedundancy = value; + return this; + } + @NotNull public CachedConfig.Builder sourceCacheTime(@NotNull TimeUtil.Time value) { if (value.getMillis() <= 0L) { diff --git a/Common/src/main/java/me/egg82/antivpn/config/ConfigurationFileUtil.java b/Common/src/main/java/me/egg82/antivpn/config/ConfigurationFileUtil.java index ba9fd2c6..5eb9d248 100644 --- a/Common/src/main/java/me/egg82/antivpn/config/ConfigurationFileUtil.java +++ b/Common/src/main/java/me/egg82/antivpn/config/ConfigurationFileUtil.java @@ -105,11 +105,17 @@ public static , B> void reloadConfig( AlgorithmMethod vpnAlgorithmMethod = getVpnAlgorithmMethod(config, debug, console); + boolean messagingRedundancy = config.node("messaging", "settings", "redundancy").getBoolean(true); + if (debug) { + console.sendMessage("Messaging systems are redundant: " + messagingRedundancy + ""); + } + CachedConfig cachedConfig = CachedConfig.builder() .debug(debug) .language(language) .storage(getStorage(config, dataDirectory, debug, console)) .messaging(getMessaging(config, serverId, messagingHandler, new File(dataDirectory, "packets"), debug, console)) + .messagingRedundancy(messagingRedundancy) .sourceCacheTime(getSourceCacheTime(config, debug, console)) .mcleaksCacheTime(getMcLeaksCacheTime(config, debug, console)) .cacheTime(getCacheTime(config, debug, console)) @@ -459,7 +465,7 @@ private static , B> MessagingService getM .getString("/") + ""); } try { - return RabbitMQMessagingService.builder(name, serverId, handler, packetDirectory) + return RabbitMQMessagingService.builder(name, serverId, handler, poolSettings.delay, packetDirectory) .url(url.address, url.port, connectionNode.node("v-host").getString("/")) .credentials(connectionNode.node("username").getString("guest"), connectionNode.node("password").getString("guest")) .timeout((int) poolSettings.timeout) @@ -475,7 +481,7 @@ private static , B> MessagingService getM console.sendMessage("Creating engine " + name + " of type redis with address " + url.getAddress() + ":" + url.getPort() + ""); } try { - return RedisMessagingService.builder(name, serverId, handler, packetDirectory) + return RedisMessagingService.builder(name, serverId, handler, poolSettings.delay, packetDirectory) .url(url.address, url.port) .credentials(connectionNode.node("password").getString("")) .poolSize(poolSettings.minPoolSize, poolSettings.maxPoolSize) @@ -492,7 +498,7 @@ private static , B> MessagingService getM console.sendMessage("Creating engine " + name + " of type NATS with address " + url.getAddress() + ":" + url.getPort() + ""); } try { - return NATSMessagingService.builder(name, serverId, handler, packetDirectory) + return NATSMessagingService.builder(name, serverId, handler, poolSettings.delay, packetDirectory) .url(url.address, url.port) .credentials(connectionNode.node("file").getString("")) .life((int) poolSettings.timeout) @@ -817,7 +823,7 @@ private static , B> CommentedConfiguratio } private static class AddressPort { - private final @NotNull String address; + private final String address; private final int port; public , B> AddressPort( @@ -841,8 +847,7 @@ public , B> AddressPort( this.port = p; } - @NotNull - public String getAddress() { return address; } + public @NotNull String getAddress() { return address; } public int getPort() { return port; } } @@ -852,6 +857,7 @@ private static class PoolSettings { private final int maxPoolSize; private final long maxLifetime; private final long timeout; + private final long delay; public PoolSettings(ConfigurationNode settingsNode) { minPoolSize = settingsNode.node("min-idle").getInt(); @@ -868,6 +874,13 @@ public PoolSettings(ConfigurationNode settingsNode) { t = new TimeUtil.Time(5L, TimeUnit.SECONDS); } timeout = t.getMillis(); + + String delayStr = settingsNode.node("delay").getString("1second"); + t = delayStr.trim().equals("0") ? new TimeUtil.Time(0L, TimeUnit.SECONDS) : TimeUtil.getTime(delayStr); + if (t == null) { + t = new TimeUtil.Time(1L, TimeUnit.SECONDS); + } + delay = t.getMillis(); } public int getMinPoolSize() { return minPoolSize; } @@ -877,5 +890,7 @@ public PoolSettings(ConfigurationNode settingsNode) { public long getMaxLifetime() { return maxLifetime; } public long getTimeout() { return timeout; } + + public long getDelay() { return delay; } } } diff --git a/Common/src/main/java/me/egg82/antivpn/config/ConfigurationVersionUtil.java b/Common/src/main/java/me/egg82/antivpn/config/ConfigurationVersionUtil.java index c925f3a1..7bbc67c2 100644 --- a/Common/src/main/java/me/egg82/antivpn/config/ConfigurationVersionUtil.java +++ b/Common/src/main/java/me/egg82/antivpn/config/ConfigurationVersionUtil.java @@ -76,6 +76,9 @@ public static void conformVersion( if (config.node("version").getDouble() == 5.1d) { to52(config); } + if (config.node("version").getDouble() == 5.2d) { + to53(config); + } if (config.node("version").getDouble() != oldVersion) { File backupFile = new File(fileOnDisk.getParent(), fileOnDisk.getName() + ".bak"); @@ -715,4 +718,14 @@ private static void to52(@NotNull CommentedConfigurationNode config) throws Seri // Version config.node("version").set(5.2d); } + + private static void to53(@NotNull CommentedConfigurationNode config) throws SerializationException { + // Add messaging->settings->delay + config.node("messaging", "settings", "delay").set("1second"); + // Add messaging->settings->redundancy + config.node("messaging", "settings", "redundancy").set(Boolean.TRUE); + + // Version + config.node("version").set(5.3d); + } } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/AbstractMessagingService.java b/Common/src/main/java/me/egg82/antivpn/messaging/AbstractMessagingService.java index e00c5ea2..b6445422 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/AbstractMessagingService.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/AbstractMessagingService.java @@ -6,6 +6,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import me.egg82.antivpn.config.ConfigUtil; +import me.egg82.antivpn.core.Pair; import me.egg82.antivpn.locale.LocaleUtil; import me.egg82.antivpn.locale.MessageKey; import me.egg82.antivpn.logging.GELFLogger; @@ -14,6 +15,7 @@ import me.egg82.antivpn.messaging.packets.Packet; import me.egg82.antivpn.messaging.packets.server.InitializationPacket; import me.egg82.antivpn.messaging.packets.server.PacketVersionPacket; +import me.egg82.antivpn.services.CollectionProvider; import me.egg82.antivpn.utils.MathUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -26,6 +28,8 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.text.DecimalFormat; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -49,15 +53,28 @@ public abstract class AbstractMessagingService implements MessagingService { protected final File sentPacketDirectory; protected final File receivedPacketDirectory; - protected AbstractMessagingService(@NotNull String name, @NotNull File packetDirectory) { + protected final long startupDelay; + + protected AbstractMessagingService(@NotNull String name, long startupDelay, @NotNull File packetDirectory) { this.name = name; this.sentPacketDirectory = new File(packetDirectory, "sent"); this.receivedPacketDirectory = new File(packetDirectory, "received"); + this.startupDelay = startupDelay; } @Override public @NotNull String getName() { return name; } + @Override + public void flushPacketQueue(@NotNull UUID forServer) { + CollectionProvider.getPacketProcessingQueue().computeIfPresent(forServer, (k, v) -> { + for (Pair p : v) { + handler.handlePacket(p.getT1(), name, p.getT2()); + } + return null; + }); + } + private static final double TOLERANCE = 1.1; // Compression ratio tolerance. Determines when compression should happen protected final byte @NotNull [] compressData(@Nullable ByteBuf data) throws IOException { @@ -89,14 +106,14 @@ protected AbstractMessagingService(@NotNull String name, @NotNull File packetDir nd.readBytes(out, 1, uncompressedBytes); if (ConfigUtil.getDebugOrFalse()) { - logger.info("Sent (no) compression: " + out.length + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) out.length) + ")"); + logger.debug("Sent (no) compression: " + out.length + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) out.length) + ")"); } return out; } if (ConfigUtil.getDebugOrFalse()) { - logger.info("Sent compression: " + (compressedBytes + 5) + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) (compressedBytes + 5)) + ")"); + logger.debug("Sent compression: " + (compressedBytes + 5) + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) (compressedBytes + 5)) + ")"); } dest.put(0, (byte) 0x01); @@ -126,7 +143,7 @@ protected AbstractMessagingService(@NotNull String name, @NotNull File packetDir data.readBytes(retVal); if (ConfigUtil.getDebugOrFalse()) { - logger.info("Received (no) compression: " + compressedBytes + "/" + (compressedBytes - 1) + " (" + ratioFormat.format((double) (compressedBytes - 1) / (double) compressedBytes) + ")"); + logger.debug("Received (no) compression: " + compressedBytes + "/" + (compressedBytes - 1) + " (" + ratioFormat.format((double) (compressedBytes - 1) / (double) compressedBytes) + ")"); } return retVal; @@ -147,7 +164,7 @@ protected AbstractMessagingService(@NotNull String name, @NotNull File packetDir } if (ConfigUtil.getDebugOrFalse()) { - logger.info("Received compression: " + compressedBytes + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) compressedBytes) + ")"); + logger.debug("Received compression: " + compressedBytes + "/" + uncompressedBytes + " (" + ratioFormat.format((double) uncompressedBytes / (double) compressedBytes) + ")"); } dest.rewind(); @@ -278,17 +295,40 @@ protected static boolean hasVersion(@NotNull Packet packet) { return true; } + int i = 0; if (packet instanceof MultiPacket) { MultiPacket mult = (MultiPacket) packet; for (Packet p : mult.getPackets()) { if (p instanceof InitializationPacket || p instanceof PacketVersionPacket) { + if (i > 0) { + reorder(mult); + } return true; } + i++; } } return false; } + private static void reorder(@NotNull MultiPacket packet) { + // TODO: There is definitely a more efficient way to do this, probably using streams + + Set removedPackets = new LinkedHashSet<>(); + Set keptPackets = new LinkedHashSet<>(); + + for (Packet p : packet.getPackets()) { + if (p instanceof InitializationPacket || p instanceof PacketVersionPacket) { + removedPackets.add(p); + } else { + keptPackets.add(p); + } + } + + removedPackets.addAll(keptPackets); + packet.setPackets(removedPackets); + } + private void printBytes(@NotNull ByteBuf buffer) { StringBuilder sb = new StringBuilder(); diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/MessagingService.java b/Common/src/main/java/me/egg82/antivpn/messaging/MessagingService.java index e62e6eb7..66048b99 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/MessagingService.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/MessagingService.java @@ -15,4 +15,6 @@ public interface MessagingService { boolean isClosed(); void sendPacket(@NotNull UUID messageId, @NotNull Packet packet) throws IOException, TimeoutException; + + void flushPacketQueue(@NotNull UUID forServer); } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/NATSMessagingService.java b/Common/src/main/java/me/egg82/antivpn/messaging/NATSMessagingService.java index a82d688c..84fda07a 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/NATSMessagingService.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/NATSMessagingService.java @@ -6,17 +6,23 @@ import io.nats.client.Options; import io.netty.buffer.ByteBuf; import me.egg82.antivpn.config.ConfigUtil; +import me.egg82.antivpn.core.Pair; import me.egg82.antivpn.locale.LocaleUtil; import me.egg82.antivpn.locale.MessageKey; import me.egg82.antivpn.messaging.handler.MessagingHandler; import me.egg82.antivpn.messaging.packets.Packet; +import me.egg82.antivpn.messaging.packets.server.KeepAlivePacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionRequestPacket; import me.egg82.antivpn.services.CollectionProvider; +import me.egg82.antivpn.utils.PacketUtil; import org.jetbrains.annotations.NotNull; import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -29,8 +35,8 @@ public class NATSMessagingService extends AbstractMessagingService { private static final String SUBJECT_NAME = "avpn-data"; - private NATSMessagingService(@NotNull String name, @NotNull File packetDirectory) { - super(name, packetDirectory); + private NATSMessagingService(@NotNull String name, long startupDelay, @NotNull File packetDirectory) { + super(name, startupDelay, packetDirectory); } @Override @@ -56,15 +62,16 @@ public void close() { @NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, + long startupDelay, @NotNull File packetDirectory - ) { return new Builder(name, serverId, handler, packetDirectory); } + ) { return new Builder(name, serverId, handler, startupDelay, packetDirectory); } public static class Builder { private final NATSMessagingService service; private final Options.Builder config = new Options.Builder(); - public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, @NotNull File packetDirectory) { - service = new NATSMessagingService(name, packetDirectory); + public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, long startupDelay, @NotNull File packetDirectory) { + service = new NATSMessagingService(name, startupDelay, packetDirectory); service.serverId = serverId; service.serverIdString = serverId.toString(); ByteBuf buffer = alloc.buffer(16, 16); @@ -102,7 +109,18 @@ public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingH public @NotNull NATSMessagingService build() throws IOException, InterruptedException { service.connection = Nats.connect(config.build()); // Indefinite subscription - subscribe(); + if (service.startupDelay == 0L) { + subscribe(); + } else { + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(service.startupDelay); + } catch (InterruptedException ex) { + service.logger.error(ex.getClass().getName() + ": " + ex.getMessage(), ex); + Thread.currentThread().interrupt(); + } + }).thenRun(this::subscribe); + } return service; } @@ -110,7 +128,7 @@ private void subscribe() { service.dispatcher = service.connection.createDispatcher(message -> { String subject = message.getSubject(); if (ConfigUtil.getDebugOrFalse()) { - service.logger.info("Got message from subject: " + subject); + service.logger.debug("Got message from subject: " + subject); } try { @@ -170,8 +188,33 @@ private void handleMessage(byte @NotNull [] body) throws IOException { return; } + if (packetVersion == -1 && packet instanceof KeepAlivePacket) { + // Don't send warning + return; + } + if (packetVersion == -1 && !hasVersion(packet)) { service.logger.warn("Server " + sender + " packet version is unknown, and packet type is of " + packet.getClass().getName() + ". Skipping packet."); + // There's a potential race condition here with double-sending a request, but it doesn't really matter + ByteBuf finalData = data; + CollectionProvider.getPacketProcessingQueue().compute(sender, (k, v) -> { + if (v == null) { + v = new CopyOnWriteArrayList<>(); + } + + if (v.isEmpty()) { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(messageId, packet)); + } + PacketUtil.queuePacket(new PacketVersionRequestPacket(sender, service.serverId)); + } else { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(messageId, packet)); + } + } + + return v; + }); return; } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/PacketManager.java b/Common/src/main/java/me/egg82/antivpn/messaging/PacketManager.java index ccd60ac0..1deaaf15 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/PacketManager.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/PacketManager.java @@ -28,7 +28,7 @@ public static void register(@NotNull Class clazz, @NotNull throw new IllegalStateException("Packet " + clazz.getName() + " has already been registered."); } - int id = currentId.getAndIncrement(); + int id = currentId.incrementAndGet(); // A packet ID should never be 0, as that's the "id" assigned to an end-of-stream on a MultiPacket packetsById.put(id, clazz); packets.put(clazz, id); suppliersById.put(id, supplier); diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/RabbitMQMessagingService.java b/Common/src/main/java/me/egg82/antivpn/messaging/RabbitMQMessagingService.java index 5957bc35..98cf1919 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/RabbitMQMessagingService.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/RabbitMQMessagingService.java @@ -3,11 +3,15 @@ import com.rabbitmq.client.*; import io.netty.buffer.ByteBuf; import me.egg82.antivpn.config.ConfigUtil; +import me.egg82.antivpn.core.Pair; import me.egg82.antivpn.locale.LocaleUtil; import me.egg82.antivpn.locale.MessageKey; import me.egg82.antivpn.messaging.handler.MessagingHandler; import me.egg82.antivpn.messaging.packets.Packet; +import me.egg82.antivpn.messaging.packets.server.KeepAlivePacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionRequestPacket; import me.egg82.antivpn.services.CollectionProvider; +import me.egg82.antivpn.utils.PacketUtil; import me.egg82.antivpn.utils.ValidationUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -17,6 +21,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -33,8 +40,8 @@ public class RabbitMQMessagingService extends AbstractMessagingService { private static final String EXCHANGE_NAME = "avpn-data"; - private RabbitMQMessagingService(@NotNull String name, @NotNull File packetDirectory) { - super(name, packetDirectory); + private RabbitMQMessagingService(@NotNull String name, long startupDelay, @NotNull File packetDirectory) { + super(name, startupDelay, packetDirectory); } @Override @@ -58,15 +65,16 @@ public void close() { @NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, + long startupDelay, @NotNull File packetDirectory - ) { return new Builder(name, serverId, handler, packetDirectory); } + ) { return new Builder(name, serverId, handler, startupDelay, packetDirectory); } public static class Builder { private final RabbitMQMessagingService service; private final ConnectionFactory config = new ConnectionFactory(); - public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, @NotNull File packetDirectory) { - service = new RabbitMQMessagingService(name, packetDirectory); + public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, long startupDelay, @NotNull File packetDirectory) { + service = new RabbitMQMessagingService(name, startupDelay, packetDirectory); service.serverId = serverId; service.serverIdString = serverId.toString(); ByteBuf buffer = alloc.buffer(16, 16); @@ -110,7 +118,25 @@ public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingH public @NotNull RabbitMQMessagingService build() throws IOException, TimeoutException { service.factory = config; service.connection = service.getConnection(); - service.bind(); + if (service.startupDelay == 0L) { + service.bind(); + } else { + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(service.startupDelay); + } catch (InterruptedException ex) { + service.logger.error(ex.getClass().getName() + ": " + ex.getMessage(), ex); + Thread.currentThread().interrupt(); + } + }).thenRun(() -> { + try { + service.bind(); + } catch (IOException ex) { + service.logger.error(ex.getClass().getName() + ": " + ex.getMessage(), ex); + throw new CompletionException(ex); + } + }); + } return service; } } @@ -124,7 +150,7 @@ private void bind() throws IOException { @Override public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (ConfigUtil.getDebugOrFalse()) { - logger.info("Got message from exchange: " + envelope.getExchange()); + logger.debug("Got message from exchange: " + envelope.getExchange()); } UUID sender = validateProperties(properties); @@ -168,8 +194,33 @@ public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties p return; } + if (packetVersion == -1 && packet instanceof KeepAlivePacket) { + // Don't send warning + return; + } + if (packetVersion == -1 && !hasVersion(packet)) { logger.warn("Server " + sender + " packet version is unknown, and packet type is of " + packet.getClass().getName() + ". Skipping packet."); + // There's a potential race condition here with double-sending a request, but it doesn't really matter + ByteBuf finalData = data; + CollectionProvider.getPacketProcessingQueue().compute(sender, (k, v) -> { + if (v == null) { + v = new CopyOnWriteArrayList<>(); + } + + if (v.isEmpty()) { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(UUID.fromString(properties.getMessageId()), packet)); + } + PacketUtil.queuePacket(new PacketVersionRequestPacket(sender, serverId)); + } else { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(UUID.fromString(properties.getMessageId()), packet)); + } + } + + return v; + }); return; } @@ -267,9 +318,7 @@ private enum DeliveryMode { private final int mode; - DeliveryMode(int mode) { - this.mode = mode; - } + DeliveryMode(int mode) { this.mode = mode; } public int getMode() { return mode; } } @@ -282,9 +331,7 @@ private enum ExchangeType { private final String type; - ExchangeType(@NotNull String type) { - this.type = type; - } + ExchangeType(@NotNull String type) { this.type = type; } public @NotNull String getType() { return type; } } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/RedisMessagingService.java b/Common/src/main/java/me/egg82/antivpn/messaging/RedisMessagingService.java index 87d2e48e..f84e1a32 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/RedisMessagingService.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/RedisMessagingService.java @@ -3,11 +3,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; import me.egg82.antivpn.config.ConfigUtil; +import me.egg82.antivpn.core.Pair; import me.egg82.antivpn.locale.LocaleUtil; import me.egg82.antivpn.locale.MessageKey; import me.egg82.antivpn.messaging.handler.MessagingHandler; import me.egg82.antivpn.messaging.packets.Packet; +import me.egg82.antivpn.messaging.packets.server.KeepAlivePacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionRequestPacket; import me.egg82.antivpn.services.CollectionProvider; +import me.egg82.antivpn.utils.PacketUtil; import org.jetbrains.annotations.NotNull; import redis.clients.jedis.BinaryJedisPubSub; import redis.clients.jedis.Jedis; @@ -19,6 +23,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,8 +42,8 @@ public class RedisMessagingService extends AbstractMessagingService { private static final String CHANNEL_NAME = "avpn-data"; private static final byte[] CHANNEL_NAME_BYTES = CHANNEL_NAME.getBytes(StandardCharsets.UTF_8); - private RedisMessagingService(@NotNull String name, @NotNull File packetDirectory) { - super(name, packetDirectory); + private RedisMessagingService(@NotNull String name, long startupDelay, @NotNull File packetDirectory) { + super(name, startupDelay, packetDirectory); } @Override @@ -67,8 +72,9 @@ public void close() { @NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, + long startupDelay, @NotNull File packetDirectory - ) { return new Builder(name, serverId, handler, packetDirectory); } + ) { return new Builder(name, serverId, handler, startupDelay, packetDirectory); } public static class Builder { private final RedisMessagingService service; @@ -79,8 +85,8 @@ public static class Builder { private int timeout = 5000; private String pass = ""; - public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, @NotNull File packetDirectory) { - service = new RedisMessagingService(name, packetDirectory); + public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingHandler handler, long startupDelay, @NotNull File packetDirectory) { + service = new RedisMessagingService(name, startupDelay, packetDirectory); service.serverId = serverId; service.serverIdString = serverId.toString(); ByteBuf buffer = alloc.buffer(16, 16); @@ -136,6 +142,15 @@ public Builder(@NotNull String name, @NotNull UUID serverId, @NotNull MessagingH private void subscribe() { service.workPool.execute(() -> { + if (service.startupDelay > 0) { + try { + Thread.sleep(service.startupDelay); + } catch (InterruptedException ex) { + service.logger.error(ex.getClass().getName() + ": " + ex.getMessage(), ex); + Thread.currentThread().interrupt(); + } + } + while (!service.isClosed()) { try (Jedis redis = service.pool.getResource()) { redis.subscribe( @@ -172,15 +187,13 @@ private void warmup(@NotNull JedisPool pool) { private static class PubSub extends BinaryJedisPubSub { private final RedisMessagingService service; - private PubSub(@NotNull RedisMessagingService service) { - this.service = service; - } + private PubSub(@NotNull RedisMessagingService service) { this.service = service; } @Override public void onMessage(byte @NotNull [] c, byte @NotNull [] m) { String channel = new String(c, StandardCharsets.UTF_8); if (ConfigUtil.getDebugOrFalse()) { - service.logger.info("Got message from channel: " + channel); + service.logger.debug("Got message from channel: " + channel); } try { @@ -201,7 +214,7 @@ private void handleMessage(byte @NotNull [] body) throws IOException { b.writeBytes(body); data = service.decompressData(b); - if (ConfigUtil.getHiddenConfig().doPacketDump()) { + if (ConfigUtil.getHiddenConfig() != null && ConfigUtil.getHiddenConfig().doPacketDump()) { service.dumpReceivedPacket(data); } @@ -238,8 +251,33 @@ private void handleMessage(byte @NotNull [] body) throws IOException { return; } + if (packetVersion == -1 && packet instanceof KeepAlivePacket) { + // Don't send warning + return; + } + if (packetVersion == -1 && !hasVersion(packet)) { service.logger.warn("Server " + sender + " packet version is unknown, and packet type is of " + packet.getClass().getName() + ". Skipping packet."); + // There's a potential race condition here with double-sending a request, but it doesn't really matter + ByteBuf finalData = data; + CollectionProvider.getPacketProcessingQueue().compute(sender, (k, v) -> { + if (v == null) { + v = new CopyOnWriteArrayList<>(); + } + + if (v.isEmpty()) { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(messageId, packet)); + } + PacketUtil.queuePacket(new PacketVersionRequestPacket(sender, service.serverId)); + } else { + if (packet.verifyFullRead(finalData)) { + v.add(new Pair<>(messageId, packet)); + } + } + + return v; + }); return; } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/handler/MessagingHandlerImpl.java b/Common/src/main/java/me/egg82/antivpn/messaging/handler/MessagingHandlerImpl.java index f2209918..ea309b84 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/handler/MessagingHandlerImpl.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/handler/MessagingHandlerImpl.java @@ -49,6 +49,8 @@ public void handlePacket(@NotNull UUID messageId, @NotNull String fromService, @ return; } + logger.debug("Handling " + packet.getClass().getSimpleName() + " from " + fromService); + try { if (!handlePacket(packet)) { logger.warn("Did not handle packet: " + packet.getClass().getName()); @@ -56,7 +58,9 @@ public void handlePacket(@NotNull UUID messageId, @NotNull String fromService, @ } catch (Exception ex) { logger.error(ex.getClass().getName() + ": " + ex.getMessage(), ex); } finally { - PacketUtil.repeatPacket(messageId, packet, fromService); + if (ConfigUtil.getCachedConfig().getMessagingRedundancy()) { + PacketUtil.repeatPacket(messageId, packet, fromService); + } } } @@ -78,7 +82,7 @@ protected boolean handlePacket(@NotNull Packet packet) { private void handleMulti(@NotNull MultiPacket packet) { if (ConfigUtil.getDebugOrFalse()) { - logger.info("Handling multi-packet."); + logger.debug("Handling multi-packet."); } for (Packet p : packet.getPackets()) { diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/handler/ServerMessagingHandler.java b/Common/src/main/java/me/egg82/antivpn/messaging/handler/ServerMessagingHandler.java index c6c67815..ce3d9c64 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/handler/ServerMessagingHandler.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/handler/ServerMessagingHandler.java @@ -1,25 +1,42 @@ package me.egg82.antivpn.messaging.handler; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; import me.egg82.antivpn.config.ConfigUtil; +import me.egg82.antivpn.messaging.MessagingService; import me.egg82.antivpn.messaging.packets.Packet; -import me.egg82.antivpn.messaging.packets.server.InitializationPacket; -import me.egg82.antivpn.messaging.packets.server.PacketVersionPacket; -import me.egg82.antivpn.messaging.packets.server.ShutdownPacket; +import me.egg82.antivpn.messaging.packets.server.*; import me.egg82.antivpn.services.CollectionProvider; import me.egg82.antivpn.utils.PacketUtil; import org.jetbrains.annotations.NotNull; import java.util.UUID; +import java.util.concurrent.TimeUnit; public class ServerMessagingHandler extends AbstractMessagingHandler { + private final Cache aliveServers = Caffeine.newBuilder().expireAfterWrite(20L, TimeUnit.SECONDS) + .evictionListener((RemovalListener) (uuid, timestamp, cause) -> { + logger.warn("Server " + uuid + " has either shut down or timed out. Clearing its data."); + if (uuid != null) { + handleShutdown(uuid); + } + }).build(); + @Override protected boolean handlePacket(@NotNull Packet packet) { - if (packet instanceof InitializationPacket) { + if (packet instanceof KeepAlivePacket) { + handleKeepalive((KeepAlivePacket) packet); + return true; + } else if (packet instanceof InitializationPacket) { handleInitialization((InitializationPacket) packet); return true; } else if (packet instanceof PacketVersionPacket) { handlePacketVersion((PacketVersionPacket) packet); return true; + } else if (packet instanceof PacketVersionRequestPacket) { + handlePacketVersionRequest((PacketVersionRequestPacket) packet); + return true; } else if (packet instanceof ShutdownPacket) { handleShutdown((ShutdownPacket) packet); return true; @@ -28,6 +45,14 @@ protected boolean handlePacket(@NotNull Packet packet) { return false; } + private void handleKeepalive(@NotNull KeepAlivePacket packet) { + if (ConfigUtil.getDebugOrFalse()) { + logger.debug("Handling keep alive for " + packet.getSender()); + } + + aliveServers.put(packet.getSender(), System.currentTimeMillis()); + } + private void handleInitialization(@NotNull InitializationPacket packet) { if (ConfigUtil.getDebugOrFalse()) { logger.info("Handling initialization for server " + packet.getServer()); @@ -47,6 +72,21 @@ private void handlePacketVersion(@NotNull PacketVersionPacket packet) { } CollectionProvider.getServerVersions().put(packet.getServer(), packet.getPacketVersion()); + + MessagingService firstService = ConfigUtil.getCachedConfig().getMessaging().get(0); + firstService.flushPacketQueue(packet.getServer()); + } + + private void handlePacketVersionRequest(@NotNull PacketVersionRequestPacket packet) { + if (!packet.getIntendedRecipient().equals(ConfigUtil.getCachedConfig().getServerId())) { + return; + } + + if (ConfigUtil.getDebugOrFalse()) { + logger.debug("Handling packet version request from server " + packet.getServer()); + } + + PacketUtil.queuePacket(new PacketVersionPacket(packet.getServer(), packet.getIntendedRecipient(), Packet.VERSION)); } private void handleShutdown(@NotNull ShutdownPacket packet) { @@ -54,6 +94,7 @@ private void handleShutdown(@NotNull ShutdownPacket packet) { logger.info("Handling shutdown for server " + packet.getServer()); } + aliveServers.invalidate(packet.getServer()); handleShutdown(packet.getServer()); } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/MultiPacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/MultiPacket.java index 8a1e8823..572f6690 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/packets/MultiPacket.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/MultiPacket.java @@ -62,7 +62,7 @@ public void read(@NotNull ByteBuf buffer) { @Override public void write(@NotNull ByteBuf buffer) { if (packets.isEmpty()) { - buffer.writeByte((byte) 0x00); // End of multi-packet + buffer.writeByte(0x00); // End of multi-packet return; } @@ -78,14 +78,12 @@ public void write(@NotNull ByteBuf buffer) { buffer.setInt(start, buffer.writerIndex() - start - 4); // Write the packet length to the int at the head } - buffer.writeByte((byte) 0x00); // End of multi-packet + buffer.writeByte(0x00); // End of multi-packet } public @NotNull Set<@NotNull Packet> getPackets() { return packets; } - public void setPackets(@NotNull Set<@NotNull Packet> packets) { - this.packets = packets; - } + public void setPackets(@NotNull Set<@NotNull Packet> packets) { this.packets = packets; } @Override public boolean equals(Object o) { @@ -105,8 +103,7 @@ public boolean equals(Object o) { @Override public String toString() { return "MultiPacket{" + - "sender=" + sender + - ", packets=" + packets + + "packets=" + packets + '}'; } } diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/Packet.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/Packet.java index 9ee33864..80e4bc3b 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/packets/Packet.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/Packet.java @@ -7,7 +7,7 @@ import java.util.UUID; public interface Packet extends Serializable { - byte VERSION = (byte) 3; + byte VERSION = (byte) 4; void read(@NotNull ByteBuf buffer); diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/InitializationPacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/InitializationPacket.java index 04ddac20..d0b5f2ff 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/InitializationPacket.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/InitializationPacket.java @@ -41,15 +41,11 @@ public void write(@NotNull ByteBuf buffer) { public @NotNull UUID getServer() { return server; } - public void setServer(@NotNull UUID server) { - this.server = server; - } + public void setServer(@NotNull UUID server) { this.server = server; } public byte getPacketVersion() { return packetVersion; } - public void setPacketVersion(byte packetVersion) { - this.packetVersion = packetVersion; - } + public void setPacketVersion(byte packetVersion) { this.packetVersion = packetVersion; } @Override public boolean equals(Object o) { diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/KeepAlivePacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/KeepAlivePacket.java new file mode 100644 index 00000000..1ec9239c --- /dev/null +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/KeepAlivePacket.java @@ -0,0 +1,64 @@ +package me.egg82.antivpn.messaging.packets.server; + +import io.netty.buffer.ByteBuf; +import me.egg82.antivpn.messaging.packets.AbstractPacket; +import me.egg82.antivpn.utils.UUIDUtil; +import org.jetbrains.annotations.NotNull; + +import java.util.Objects; +import java.util.UUID; + +public class KeepAlivePacket extends AbstractPacket { + private UUID server; + + public KeepAlivePacket(@NotNull UUID sender, @NotNull ByteBuf data) { + super(sender); + read(data); + } + + public KeepAlivePacket() { + super(UUIDUtil.EMPTY_UUID); + } + + public KeepAlivePacket(@NotNull UUID server) { + super(UUIDUtil.EMPTY_UUID); + this.server = server; + } + + @Override + public void read(@NotNull ByteBuf buffer) { + this.server = readUUID(buffer); + } + + @Override + public void write(@NotNull ByteBuf buffer) { + writeUUID(this.server, buffer); + } + + public @NotNull UUID getServer() { return server; } + + public void setServer(@NotNull UUID server) { this.server = server; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KeepAlivePacket)) { + return false; + } + KeepAlivePacket that = (KeepAlivePacket) o; + return server.equals(that.server); + } + + @Override + public int hashCode() { return Objects.hash(server); } + + @Override + public String toString() { + return "KeepAlivePacket{" + + "server=" + server + + ", sender=" + sender + + '}'; + } +} diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionPacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionPacket.java index d25c3bfa..5d24f7a8 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionPacket.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionPacket.java @@ -45,21 +45,15 @@ public void write(@NotNull ByteBuf buffer) { public @NotNull UUID getIntendedRecipient() { return intendedRecipient; } - public void setIntendedRecipient(@NotNull UUID intendedRecipient) { - this.intendedRecipient = intendedRecipient; - } + public void setIntendedRecipient(@NotNull UUID intendedRecipient) { this.intendedRecipient = intendedRecipient; } public @NotNull UUID getServer() { return server; } - public void setServer(@NotNull UUID server) { - this.server = server; - } + public void setServer(@NotNull UUID server) { this.server = server; } public byte getPacketVersion() { return packetVersion; } - public void setPacketVersion(byte packetVersion) { - this.packetVersion = packetVersion; - } + public void setPacketVersion(byte packetVersion) { this.packetVersion = packetVersion; } @Override public boolean equals(Object o) { diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionRequestPacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionRequestPacket.java new file mode 100644 index 00000000..929e4de2 --- /dev/null +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/PacketVersionRequestPacket.java @@ -0,0 +1,73 @@ +package me.egg82.antivpn.messaging.packets.server; + +import io.netty.buffer.ByteBuf; +import me.egg82.antivpn.messaging.packets.AbstractPacket; +import me.egg82.antivpn.utils.UUIDUtil; +import org.jetbrains.annotations.NotNull; + +import java.util.Objects; +import java.util.UUID; + +public class PacketVersionRequestPacket extends AbstractPacket { + private UUID intendedRecipient; + private UUID server; + + public PacketVersionRequestPacket(@NotNull UUID sender, @NotNull ByteBuf data) { + super(sender); + read(data); + } + + public PacketVersionRequestPacket() { + super(UUIDUtil.EMPTY_UUID); + } + + public PacketVersionRequestPacket(@NotNull UUID intendedRecipient, @NotNull UUID server) { + super(UUIDUtil.EMPTY_UUID); + this.intendedRecipient = intendedRecipient; + this.server = server; + } + + @Override + public void read(@NotNull ByteBuf buffer) { + this.intendedRecipient = readUUID(buffer); + this.server = readUUID(buffer); + } + + @Override + public void write(@NotNull ByteBuf buffer) { + writeUUID(this.intendedRecipient, buffer); + writeUUID(this.server, buffer); + } + + public @NotNull UUID getIntendedRecipient() { return intendedRecipient; } + + public void setIntendedRecipient(@NotNull UUID intendedRecipient) { this.intendedRecipient = intendedRecipient; } + + public @NotNull UUID getServer() { return server; } + + public void setServer(@NotNull UUID server) { this.server = server; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PacketVersionRequestPacket)) { + return false; + } + PacketVersionRequestPacket that = (PacketVersionRequestPacket) o; + return intendedRecipient.equals(that.intendedRecipient) && server.equals(that.server); + } + + @Override + public int hashCode() { return Objects.hash(intendedRecipient, server); } + + @Override + public String toString() { + return "PacketVersionRequestPacket{" + + "sender=" + sender + + ", intendedRecipient=" + intendedRecipient + + ", server=" + server + + '}'; + } +} diff --git a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/ShutdownPacket.java b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/ShutdownPacket.java index f7be3c1c..3e5b9a11 100644 --- a/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/ShutdownPacket.java +++ b/Common/src/main/java/me/egg82/antivpn/messaging/packets/server/ShutdownPacket.java @@ -37,9 +37,7 @@ public void write(@NotNull ByteBuf buffer) { public @NotNull UUID getServer() { return server; } - public void setServer(@NotNull UUID server) { - this.server = server; - } + public void setServer(@NotNull UUID server) { this.server = server; } @Override public boolean equals(Object o) { diff --git a/Common/src/main/java/me/egg82/antivpn/services/CollectionProvider.java b/Common/src/main/java/me/egg82/antivpn/services/CollectionProvider.java index 3d08a9c1..d8c478bc 100644 --- a/Common/src/main/java/me/egg82/antivpn/services/CollectionProvider.java +++ b/Common/src/main/java/me/egg82/antivpn/services/CollectionProvider.java @@ -4,9 +4,14 @@ import com.github.benmanes.caffeine.cache.Caffeine; import it.unimi.dsi.fastutil.objects.Object2ByteArrayMap; import it.unimi.dsi.fastutil.objects.Object2ByteMap; +import me.egg82.antivpn.core.Pair; +import me.egg82.antivpn.messaging.packets.Packet; import org.jetbrains.annotations.NotNull; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,4 +37,8 @@ public static boolean isDuplicateMessage(@NotNull UUID messageId) { }); return retVal.get(); } + + private static final ConcurrentMap>> packetProcessingQueue = new ConcurrentHashMap<>(); + + public static ConcurrentMap>> getPacketProcessingQueue() { return packetProcessingQueue; } } diff --git a/Common/src/main/java/me/egg82/antivpn/utils/PacketUtil.java b/Common/src/main/java/me/egg82/antivpn/utils/PacketUtil.java index 5e16cec7..9df56d2c 100644 --- a/Common/src/main/java/me/egg82/antivpn/utils/PacketUtil.java +++ b/Common/src/main/java/me/egg82/antivpn/utils/PacketUtil.java @@ -22,12 +22,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class PacketUtil { private static final Logger logger = new GELFLogger(LoggerFactory.getLogger(PacketUtil.class)); private static ExecutorService workPool = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("Anti-VPN_Messaging_%d").build()); + private static AtomicInteger currentIndex = new AtomicInteger(-1); + private PacketUtil() { } public static void setPoolSize(int size) { @@ -63,9 +66,7 @@ public static void queuePacket(@NotNull Packet packet) { requiresSending.set(true); } - public static void repeatPacket(@NotNull UUID messageId, @NotNull Packet packet, @NotNull String fromService) { - sendPacket(messageId, packet, fromService); - } + public static void repeatPacket(@NotNull UUID messageId, @NotNull Packet packet, @NotNull String fromService) { sendPacket(messageId, packet, fromService); } public static void trySendQueue() { if (!requiresSending.compareAndSet(true, false)) { @@ -100,18 +101,56 @@ public static void trySendQueue() { private static void sendPacket(@NotNull UUID messageId, @NotNull Packet packet, @Nullable String fromService) { CachedConfig cachedConfig = ConfigUtil.getCachedConfig(); - for (MessagingService service : cachedConfig.getMessaging()) { - if (service.getName().equals(fromService)) { - continue; + if (cachedConfig.getMessagingRedundancy()) { + for (MessagingService service : cachedConfig.getMessaging()) { + if (service.getName().equals(fromService)) { + continue; + } + + workPool.execute(() -> { + try { + logger.debug("Sending " + packet.getClass().getSimpleName() + " through " + service.getName()); + service.sendPacket(messageId, packet); + } catch (IOException | TimeoutException ex) { + logger.warn("Could not broadcast packet " + packet.getClass().getSimpleName() + " through " + service.getName(), ex); + } + }); } + } else { + if (fromService != null) { + return; + } + + int index = getNextService(cachedConfig); + int initialIndex = index; + boolean sent = false; + + do { + MessagingService service = cachedConfig.getMessaging().get(index); - workPool.execute(() -> { try { + logger.debug("Sending " + packet.getClass().getSimpleName() + " through " + service.getName()); service.sendPacket(messageId, packet); + sent = true; + break; } catch (IOException | TimeoutException ex) { logger.warn("Could not broadcast packet " + packet.getClass().getSimpleName() + " through " + service.getName(), ex); + index = getNextService(cachedConfig); } - }); + } while (index != initialIndex); // This will be true if we've run through all of our services and wrapped around to the start again + + if (!sent) { + logger.error("Could not broadcast packet " + packet.getClass().getSimpleName() + " through any available messaging service."); + } } } + + private static int getNextService(CachedConfig cachedConfig) { + return currentIndex.updateAndGet(v -> { + if (v >= cachedConfig.getMessaging().size() - 1) { + return 0; + } + return v + 1; + }); + } } diff --git a/Common/src/main/resources/config.yml b/Common/src/main/resources/config.yml index c06296d9..3036556b 100644 --- a/Common/src/main/resources/config.yml +++ b/Common/src/main/resources/config.yml @@ -121,6 +121,16 @@ messaging: max-lifetime: '30minutes' # The maximum amount of time that the plugin will wait for a new connection from the pool before timing out timeout: '5seconds' + # The delay at which to load messaging services + # 0 is instant, anything else will allow other plugins to startup before messaging begins + delay: '1second' + # Whether or not the messaging systems should be redundant + # With redundancy on, backend packets will be passed to all messaging services + # With redundancy off, backend packets will be rotated through each messaging system in a round-robin manner + # Note: If a backend packet fails on a non-redundant configuration, another messaging system will be chosen automatically + # However, a backend packet will not be re-broadcasted to any other messaging systems connected to this service + # This means that if redundancy is turned off you will need to ensure each server has exactly the same messaging systems + redundancy: true # Where VPN-checking sources are defined # Beware the more sources that are included (and fail) the worse the performance and the more the lag @@ -413,4 +423,4 @@ update: notify: true # Config version, no touchy plz -version: 5.2 \ No newline at end of file +version: 5.3 \ No newline at end of file diff --git a/Paper/src/main/java/me/egg82/antivpn/AntiVPN.java b/Paper/src/main/java/me/egg82/antivpn/AntiVPN.java index 4f2ddec8..93625f33 100644 --- a/Paper/src/main/java/me/egg82/antivpn/AntiVPN.java +++ b/Paper/src/main/java/me/egg82/antivpn/AntiVPN.java @@ -35,6 +35,8 @@ import me.egg82.antivpn.messaging.packets.MultiPacket; import me.egg82.antivpn.messaging.packets.Packet; import me.egg82.antivpn.messaging.packets.server.InitializationPacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionPacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionRequestPacket; import me.egg82.antivpn.messaging.packets.server.ShutdownPacket; import me.egg82.antivpn.messaging.packets.vpn.DeleteIPPacket; import me.egg82.antivpn.messaging.packets.vpn.DeletePlayerPacket; @@ -43,6 +45,7 @@ import me.egg82.antivpn.storage.StorageService; import me.egg82.antivpn.utils.EventUtil; import me.egg82.antivpn.utils.PacketUtil; +import me.egg82.antivpn.utils.TimeUtil; import me.egg82.antivpn.utils.VersionUtil; import me.egg82.antivpn.api.platform.AbstractPluginMetadata; import net.kyori.event.SimpleEventBus; @@ -56,6 +59,7 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.spongepowered.configurate.serialize.SerializationException; import java.util.ArrayList; import java.util.List; @@ -148,8 +152,10 @@ public void onDisable() { } private void loadPackets() { - PacketManager.register(InitializationPacket.class, InitializationPacket::new); // Ensure InitializationPacket always has a packet ID of 1 - PacketManager.register(MultiPacket.class, MultiPacket::new); // Ensure MultiPacket always has a packet ID of 2 + PacketManager.register(InitializationPacket.class, InitializationPacket::new); // Ensure InitializationPacket always has a packet ID of 1 (NOT 0) + PacketManager.register(PacketVersionPacket.class, PacketVersionPacket::new); // Ensure PacketVersionPacket always has a packet ID of 2 + PacketManager.register(MultiPacket.class, MultiPacket::new); // Ensure MultiPacket always has a packet ID of 3 + PacketManager.register(PacketVersionRequestPacket.class, PacketVersionRequestPacket::new); // Ensure PacketVersionRequestPacket always has a packet ID of 4 PacketManager.register(ShutdownPacket.class, ShutdownPacket::new); @@ -177,7 +183,26 @@ private void loadServices() { APIRegistrationUtil.register(api); EventUtil.post(new APILoadedEventImpl(api), api.getEventBus()); - PacketUtil.queuePacket(new InitializationPacket(ConfigUtil.getCachedConfig().getServerId(), Packet.VERSION)); + String delayString; + try { + delayString = ConfigUtil.getConfig().node("messaging", "settings", "delay").get(String.class); + } catch (SerializationException ignored) { + delayString = null; + } + TimeUtil.Time delay = delayString == null ? null : TimeUtil.getTime(delayString); + + if (delay != null && delay.getTime() > 0) { + new Thread(() -> { + try { + Thread.sleep(delay.getMillis() + 500L); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + PacketUtil.queuePacket(new InitializationPacket(cachedConfig.getServerId(), Packet.VERSION)); + }).start(); + } else { + PacketUtil.queuePacket(new InitializationPacket(cachedConfig.getServerId(), Packet.VERSION)); + } } private void loadCommands() { diff --git a/Paper/src/main/java/me/egg82/antivpn/commands/internal/ReloadCommand.java b/Paper/src/main/java/me/egg82/antivpn/commands/internal/ReloadCommand.java index 45c65448..4b2f4875 100644 --- a/Paper/src/main/java/me/egg82/antivpn/commands/internal/ReloadCommand.java +++ b/Paper/src/main/java/me/egg82/antivpn/commands/internal/ReloadCommand.java @@ -17,10 +17,15 @@ import me.egg82.antivpn.messaging.MessagingService; import me.egg82.antivpn.messaging.handler.MessagingHandler; import me.egg82.antivpn.messaging.handler.MessagingHandlerImpl; +import me.egg82.antivpn.messaging.packets.Packet; +import me.egg82.antivpn.messaging.packets.server.InitializationPacket; import me.egg82.antivpn.storage.StorageService; import me.egg82.antivpn.utils.EventUtil; +import me.egg82.antivpn.utils.PacketUtil; +import me.egg82.antivpn.utils.TimeUtil; import org.bukkit.plugin.Plugin; import org.jetbrains.annotations.NotNull; +import org.spongepowered.configurate.serialize.SerializationException; import java.io.File; @@ -65,6 +70,28 @@ public void execute(@NotNull CommandContext comman APIRegistrationUtil.register(api); EventUtil.post(new APILoadedEventImpl(api), api.getEventBus()); + String delayString; + try { + delayString = ConfigUtil.getConfig().node("messaging", "settings", "delay").get(String.class); + } catch (SerializationException ignored) { + delayString = null; + } + TimeUtil.Time delay = delayString == null ? null : TimeUtil.getTime(delayString); + + if (delay != null && delay.getTime() > 0) { + CachedConfig finalCachedConfig = cachedConfig; + new Thread(() -> { + try { + Thread.sleep(delay.getMillis() + 500L); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + PacketUtil.queuePacket(new InitializationPacket(finalCachedConfig.getServerId(), Packet.VERSION)); + }).start(); + } else { + PacketUtil.queuePacket(new InitializationPacket(cachedConfig.getServerId(), Packet.VERSION)); + } + c.getSender().sendMessage(MessageKey.COMMAND__RELOAD__END); }) .execute(); diff --git a/Spigot/src/main/java/me/egg82/antivpn/AntiVPN.java b/Spigot/src/main/java/me/egg82/antivpn/AntiVPN.java index 87cae2b0..ff6b4d4c 100644 --- a/Spigot/src/main/java/me/egg82/antivpn/AntiVPN.java +++ b/Spigot/src/main/java/me/egg82/antivpn/AntiVPN.java @@ -36,6 +36,8 @@ import me.egg82.antivpn.messaging.packets.MultiPacket; import me.egg82.antivpn.messaging.packets.Packet; import me.egg82.antivpn.messaging.packets.server.InitializationPacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionPacket; +import me.egg82.antivpn.messaging.packets.server.PacketVersionRequestPacket; import me.egg82.antivpn.messaging.packets.server.ShutdownPacket; import me.egg82.antivpn.messaging.packets.vpn.DeleteIPPacket; import me.egg82.antivpn.messaging.packets.vpn.DeletePlayerPacket; @@ -44,6 +46,7 @@ import me.egg82.antivpn.storage.StorageService; import me.egg82.antivpn.utils.EventUtil; import me.egg82.antivpn.utils.PacketUtil; +import me.egg82.antivpn.utils.TimeUtil; import me.egg82.antivpn.utils.VersionUtil; import me.egg82.antivpn.api.platform.AbstractPluginMetadata; import net.kyori.event.SimpleEventBus; @@ -57,6 +60,7 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.spongepowered.configurate.serialize.SerializationException; import java.util.ArrayList; import java.util.List; @@ -150,8 +154,10 @@ public void onDisable() { } private void loadPackets() { - PacketManager.register(InitializationPacket.class, InitializationPacket::new); // Ensure InitializationPacket always has a packet ID of 1 - PacketManager.register(MultiPacket.class, MultiPacket::new); // Ensure MultiPacket always has a packet ID of 2 + PacketManager.register(InitializationPacket.class, InitializationPacket::new); // Ensure InitializationPacket always has a packet ID of 1 (NOT 0) + PacketManager.register(PacketVersionPacket.class, PacketVersionPacket::new); // Ensure PacketVersionPacket always has a packet ID of 2 + PacketManager.register(MultiPacket.class, MultiPacket::new); // Ensure MultiPacket always has a packet ID of 3 + PacketManager.register(PacketVersionRequestPacket.class, PacketVersionRequestPacket::new); // Ensure PacketVersionRequestPacket always has a packet ID of 4 PacketManager.register(ShutdownPacket.class, ShutdownPacket::new); @@ -179,7 +185,26 @@ private void loadServices() { APIRegistrationUtil.register(api); EventUtil.post(new APILoadedEventImpl(api), api.getEventBus()); - PacketUtil.queuePacket(new InitializationPacket(ConfigUtil.getCachedConfig().getServerId(), Packet.VERSION)); + String delayString; + try { + delayString = ConfigUtil.getConfig().node("messaging", "settings", "delay").get(String.class); + } catch (SerializationException ignored) { + delayString = null; + } + TimeUtil.Time delay = delayString == null ? null : TimeUtil.getTime(delayString); + + if (delay != null && delay.getTime() > 0) { + new Thread(() -> { + try { + Thread.sleep(delay.getMillis() + 500L); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + PacketUtil.queuePacket(new InitializationPacket(cachedConfig.getServerId(), Packet.VERSION)); + }).start(); + } else { + PacketUtil.queuePacket(new InitializationPacket(cachedConfig.getServerId(), Packet.VERSION)); + } } private void loadCommands() {