From 138ed7130e5f2c347b185205ee2e54c4ab775db4 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 12 Jul 2024 11:10:56 +0400 Subject: [PATCH 1/3] [TS-1928] Updated dependencies --- Dockerfile | 2 +- README.md | 5 +++++ build.gradle | 4 +--- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3ee4f84..432f34f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:7.6-jdk11 AS build +FROM gradle:8.7-jdk11 AS build ARG release_version COPY ./ . RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} diff --git a/README.md b/README.md index eeb37ac..e9f314a 100644 --- a/README.md +++ b/README.md @@ -335,6 +335,11 @@ spec: # Changelog +## 1.8.0 + +* Provided configurable session cache to handle server resend request +* Updated th2 gradle plugin `0.1.1` + ## 1.7.0 * Added support for th2 transport protocol * Added configuration option for non-default book per session. diff --git a/build.gradle b/build.gradle index 8c930be..37e0db1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,8 +1,6 @@ -import org.jetbrains.kotlin.gradle.tasks.KotlinCompile - plugins { id "application" - id "com.exactpro.th2.gradle.component" version "0.0.8" + id "com.exactpro.th2.gradle.component" version "0.1.1" id 'org.jetbrains.kotlin.jvm' version '1.8.22' id "org.jetbrains.kotlin.kapt" version "1.8.22" } diff --git a/gradle.properties b/gradle.properties index 538334b..765adff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=1.7.0 +release_version=1.8.0 description='Dirty-TCP client' vcs_url=https://github.com/th2-net/th2-conn-dirty-fix \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 070cb70..48c0a02 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists From aa4ef0f50d3fa775dca5f6d57c512d25ce7b47d4 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 12 Jul 2024 17:51:24 +0400 Subject: [PATCH 2/3] [TS-1928] added empty cache --- README.md | 6 +- .../java/com/exactpro/th2/FixHandler.java | 137 +++++++++++------- .../com/exactpro/th2/FixHandlerSettings.java | 7 + .../com/exactpro/th2/util/EmptyCache.java | 98 +++++++++++++ 4 files changed, 192 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/exactpro/th2/util/EmptyCache.java diff --git a/README.md b/README.md index e9f314a..08d3401 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2-conn-dirty-fix (1.7.0) +# th2-conn-dirty-fix (1.8.0) This microservice allows sending and receiving messages via FIX protocol @@ -56,6 +56,7 @@ This microservice allows sending and receiving messages via FIX protocol The timeout is reset to the original value after a successful sending attempt. If connection is not established within the specified timeout an error will be reported. + *minConnectionTimeoutOnSend* - minimum value for the sending message timeout in milliseconds. _Default value is 1000 mls._ ++ *messageCacheSize* - size of in memory message cache used for fast handling recovery. Cache disabled if value is zero or negative. _Default value is 100_ ### Security settings @@ -264,6 +265,7 @@ spec: testRequestDelay: 60 reconnectDelay": 5 disconnectRequestDelay: 5 + messageCacheSize: 100 mangler: rules: - name: rule-1 @@ -337,7 +339,7 @@ spec: ## 1.8.0 -* Provided configurable session cache to handle server resend request +* Provided configurable in memory message cache to handle server resend request * Updated th2 gradle plugin `0.1.1` ## 1.7.0 diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index cb3b06e..5ca2f01 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,8 @@ package com.exactpro.th2; import com.exactpro.th2.common.event.Event; -import com.exactpro.th2.common.grpc.EventID; import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.common.grpc.EventID; import com.exactpro.th2.common.grpc.MessageID; import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.common.utils.event.transport.EventUtilsKt; @@ -31,8 +31,19 @@ import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil; import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; +import com.exactpro.th2.util.EmptyCache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import kotlin.jvm.functions.Function1; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; @@ -43,7 +54,6 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -59,14 +69,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; -import kotlin.jvm.functions.Function1; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findLastField; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.firstField; @@ -144,6 +146,7 @@ public class FixHandler implements AutoCloseable, IHandler { private static final Logger LOGGER = LoggerFactory.getLogger(FixHandler.class); + private static final Cache EMPTY_MESSAGE_CACHE = EmptyCache.emptyCache(); private static final int DAY_SECONDS = 24 * 60 * 60; private static final String SOH = "\001"; @@ -171,6 +174,7 @@ public class FixHandler implements AutoCloseable, IHandler { private final SendingTimeoutHandler sendingTimeoutHandler; private Future reconnectRequestTimer = CompletableFuture.completedFuture(null); private volatile IChannel channel; + private final Cache messageCache; protected FixHandlerSettings settings; public FixHandler(IHandlerContext context) { @@ -186,6 +190,12 @@ public FixHandler(IHandlerContext context) { this.messageLoader = null; } + if (settings.getMessageCacheSize() > 0) { + this.messageCache = CacheBuilder.newBuilder().maximumSize(settings.getMessageCacheSize()).build(); + } else { + this.messageCache = EMPTY_MESSAGE_CACHE; + } + if(settings.getSessionStartTime() != null) { Objects.requireNonNull(settings.getSessionEndTime(), "Session end is required when session start is presented"); LocalTime resetTime = settings.getSessionStartTime(); @@ -299,11 +309,11 @@ private CompletableFuture send(@NotNull ByteBuf body, @NotNull Map send(@NotNull ByteBuf body, @NotNull Map deadline) { // The method should have checked exception in signature... - ExceptionUtils.rethrow(new TimeoutException(String.format("session was not established within %d mls", + ExceptionUtils.asRuntimeException(new TimeoutException(String.format("session was not established within %d mls", currentTimeout))); } } @@ -380,7 +390,7 @@ public ByteBuf onReceive(@NotNull IChannel channel, ByteBuf buffer) { @NotNull @Override - public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message) { + public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId) { Map metadata = new HashMap<>(); int beginString = indexOf(message, "8=FIX"); @@ -516,16 +526,14 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } - private Map handleTestRequest(ByteBuf message, Map metadata) { + private void handleTestRequest(ByteBuf message, Map metadata) { FixField testReqId = findField(message, TEST_REQ_ID_TAG); if(testReqId == null || testReqId.getValue() == null) { metadata.put(REJECT_REASON, "Test Request message hasn't got TestReqId field."); - return metadata; + return; } sendHeartbeatTestReqId(testReqId.getValue()); - - return null; } private void handleLogout(@NotNull ByteBuf message) { @@ -627,58 +635,79 @@ private void recovery(int beginSeqNo, int endSeqNo) { AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1); try { recoveryLock.lock(); - if (endSeqNo == 0) { endSeqNo = msgSeqNum.get() + 1; } - int endSeq = endSeqNo; - info("Loading messages from %d to %d", beginSeqNo, endSeqNo); - if(settings.isLoadMissedMessagesFromCradle()) { - Function1 processMessage = (buf) -> { - FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); - FixField msgTypeField = findField(buf, MSG_TYPE_TAG); - if(seqNum == null || seqNum.getValue() == null - || msgTypeField == null || msgTypeField.getValue() == null) { - return true; - } - int sequence = Integer.parseInt(seqNum.getValue()); - String msgType = msgTypeField.getValue(); + info("Recovery messages from %d to %d", beginSeqNo, endSeqNo); - if(sequence < beginSeqNo) return true; - if(sequence > endSeq) return false; + int beginSeq = beginSeqNo; + int endSeq = endSeqNo; - if(ADMIN_MESSAGES.contains(msgType)) return true; - FixField possDup = findField(buf, POSS_DUP_TAG); - if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; + Function1 processMessage = (buf) -> { + FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); + FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + if(seqNum == null || seqNum.getValue() == null + || msgTypeField == null || msgTypeField.getValue() == null) { + return true; + } + int sequence = Integer.parseInt(seqNum.getValue()); + String msgType = msgTypeField.getValue(); - if(sequence - 1 != lastProcessedSequence.get() ) { - StringBuilder sequenceReset = - createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); - resetHeartbeatTask(); - } + if(sequence < beginSeqNo) return true; + if(sequence > endSeq) return false; - setTime(buf); - setPossDup(buf); - updateLength(buf); - updateChecksum(buf); - channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); + if(ADMIN_MESSAGES.contains(msgType)) return true; + FixField possDup = findField(buf, POSS_DUP_TAG); + if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; + if(sequence - 1 != lastProcessedSequence.get() ) { + StringBuilder sequenceReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); resetHeartbeatTask(); + } - lastProcessedSequence.set(sequence); - return true; - }; + setTime(buf); + setPossDup(buf); + updateLength(buf); + updateChecksum(buf); + channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); + + resetHeartbeatTask(); + + lastProcessedSequence.set(sequence); + return true; + }; + + if (messageCache != EMPTY_MESSAGE_CACHE) { // check aren't references equal + info("Loading messages from %d to %d from message cache", beginSeq, endSeqNo); + for (int i = beginSeq; i < endSeqNo; i++) { + ByteBuf message = messageCache.getIfPresent(i); + if (message == null) { + info("Messages from %d included to %d excluded have been recovered from message cache", beginSeq, i); + beginSeq = i; + break; + } + if (!processMessage.invoke(message)) { + if (LOGGER.isWarnEnabled()) warn( + "Message from message cache has been rejected by process function, message: %s", + message.toString(US_ASCII)); + } + } + } + + if(settings.isLoadMissedMessagesFromCradle()) { + info("Loading messages from %d to %d from cradle", beginSeq, endSeqNo); messageLoader.processMessagesInRange( channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND, - beginSeqNo, + beginSeq, processMessage ); if(lastProcessedSequence.get() < endSeq) { - String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); + String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeq), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE @@ -686,7 +715,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { } } else { String seqReset = - createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); + createSequenceReset(beginSeq, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index 4e7e0a2..f91d29d 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -51,6 +51,7 @@ public class FixHandlerSettings implements IHandlerSettings { * Value from Java Security Standard Algorithm Names */ private String passwordEncryptAlgorithm = "RSA"; + private int messageCacheSize = 100; private Boolean resetSeqNumFlag = false; private Boolean resetOnLogon = false; private Boolean useNextExpectedSeqNum = false; @@ -196,6 +197,10 @@ public String getPasswordEncryptAlgorithm() { return passwordEncryptAlgorithm; } + public int getMessageCacheSize() { + return messageCacheSize; + } + public Boolean getResetSeqNumFlag() { return resetSeqNumFlag; } public Boolean getResetOnLogon() { return resetOnLogon; } @@ -292,6 +297,8 @@ public void setSessionEndTime(LocalTime sessionEndTime) { this.sessionEndTime = sessionEndTime; } + public void setMessageCacheSize(int messageCacheSize) { this.messageCacheSize = messageCacheSize; } + public void setResetSeqNumFlag(Boolean resetSeqNumFlag) { this.resetSeqNumFlag = resetSeqNumFlag; } public void setResetOnLogon(Boolean resetOnLogon) { this.resetOnLogon = resetOnLogon; } diff --git a/src/main/java/com/exactpro/th2/util/EmptyCache.java b/src/main/java/com/exactpro/th2/util/EmptyCache.java new file mode 100644 index 0000000..1d5d9d0 --- /dev/null +++ b/src/main/java/com/exactpro/th2/util/EmptyCache.java @@ -0,0 +1,98 @@ +/* + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableMap; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +public final class EmptyCache implements Cache { + @SuppressWarnings("rawtypes") + private static final Cache EMPTY_CACHE = new EmptyCache<>(); + private static final CacheStats EMPTY_CACHE_STATS = new CacheStats(0, 0, 0, 0, 0, 0); + + private EmptyCache() { } + + @SuppressWarnings("unchecked") + public static Cache emptyCache() { + return (Cache) EMPTY_CACHE; + } + + @Nullable + @Override + public V getIfPresent(@NotNull Object key) { + return null; + } + + @Override + public @NotNull V get(@NotNull K key, @NotNull Callable loader) throws ExecutionException { + try { + return loader.call(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + @Override + public @NotNull ImmutableMap getAllPresent(@NotNull Iterable keys) { + return ImmutableMap.of(); + } + + @Override + public void put(@NotNull K key, @NotNull V value) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(@NotNull Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public void invalidate(@NotNull Object key) { } + + @Override + public void invalidateAll(@NotNull Iterable keys) { } + + @Override + public void invalidateAll() { } + + @Override + public long size() { + return 0; + } + + @Override + public @NotNull CacheStats stats() { + return EMPTY_CACHE_STATS; + } + + @Override + public @NotNull ConcurrentMap asMap() { + throw new UnsupportedOperationException(); + } + + @Override + public void cleanUp() { } +} From 00767182a37c07d4942aba5e70328424762b3534 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 4 Sep 2024 18:49:39 +0400 Subject: [PATCH 3/3] [TS-1928] --- README.md | 3 +- build.gradle | 9 +- gradle/wrapper/gradle-wrapper.properties | 2 + gradlew | 282 ++++++++------ gradlew.bat | 35 +- .../java/com/exactpro/th2/FixHandler.java | 210 ++++++---- .../com/exactpro/th2/FixHandlerSettings.java | 2 +- .../com/exactpro/th2/constants/Constants.java | 14 +- .../com/exactpro/th2/util/EmptyCache.java | 98 ----- .../java/com/exactpro/th2/FixHandlerTest.java | 30 +- .../java/com/exactpro/th2/RecoveryTest.java | 281 ------------- .../kotlin/com/exactpro/th2/RecoveryTest.kt | 368 ++++++++++++++++++ src/test/kotlin/com/exactpro/th2/TestUtils.kt | 48 +++ 13 files changed, 771 insertions(+), 611 deletions(-) delete mode 100644 src/main/java/com/exactpro/th2/util/EmptyCache.java delete mode 100644 src/test/java/com/exactpro/th2/RecoveryTest.java create mode 100644 src/test/kotlin/com/exactpro/th2/RecoveryTest.kt create mode 100644 src/test/kotlin/com/exactpro/th2/TestUtils.kt diff --git a/README.md b/README.md index 08d3401..8ec235a 100644 --- a/README.md +++ b/README.md @@ -338,8 +338,7 @@ spec: # Changelog ## 1.8.0 - -* Provided configurable in memory message cache to handle server resend request +* Provided configurable in-memory message cache to handle server resend request * Updated th2 gradle plugin `0.1.1` ## 1.7.0 diff --git a/build.gradle b/build.gradle index 37e0db1..1756fe1 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ dependencies { } implementation "com.exactpro.th2:common-utils:2.2.3-dev" implementation 'com.exactpro.th2:netty-bytebuf-utils:0.0.1' - implementation'com.exactpro.th2:conn-dirty-tcp-core:3.6.0-dev' + implementation'com.exactpro.th2:conn-dirty-tcp-core:3.7.0-TS-1928-+' implementation 'com.exactpro.th2:grpc-lw-data-provider:2.3.1-dev' implementation 'org.slf4j:slf4j-api' @@ -53,8 +53,11 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.module:jackson-module-kotlin' - testImplementation 'org.mockito:mockito-core:5.12.0' - testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.8.22' + testImplementation 'org.mockito:mockito-core:5.13.0' + testImplementation 'org.mockito.kotlin:mockito-kotlin:5.4.0' + testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5' + testImplementation 'org.junit.jupiter:junit-jupiter-params:5.11.0' + annotationProcessor 'com.google.auto.service:auto-service:1.1.1' kapt 'com.google.auto.service:auto-service:1.1.1' diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 48c0a02..b82aa23 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 744e882..1aa94a4 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ -#!/usr/bin/env sh +#!/bin/sh # -# Copyright 2015 the original author or authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,67 +17,99 @@ # ############################################################################## -## -## Gradle start up script for UN*X -## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# ############################################################################## # Attempt to set APP_HOME + # Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" +MAX_FD=maximum warn () { echo "$*" -} +} >&2 die () { echo echo "$*" echo exit 1 -} +} >&2 # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false nonstop=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MSYS* | MINGW* ) - msys=true - ;; - NONSTOP* ) - nonstop=true - ;; +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -87,9 +119,9 @@ CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACMD=$JAVA_HOME/jre/sh/java else - JAVACMD="$JAVA_HOME/bin/java" + JAVACMD=$JAVA_HOME/bin/java fi if [ ! -x "$JAVACMD" ] ; then die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME @@ -98,88 +130,120 @@ Please set the JAVA_HOME variable in your environment to match the location of your Java installation." fi else - JAVACMD="java" - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac fi -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. # For Cygwin or MSYS, switch paths to Windows format before running java -if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) fi - i=`expr $i + 1` + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg done - case $i in - 0) set -- ;; - 1) set -- "$args0" ;; - 2) set -- "$args0" "$args1" ;; - 3) set -- "$args0" "$args1" "$args2" ;; - 4) set -- "$args0" "$args1" "$args2" "$args3" ;; - 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac fi -# Escape application args -save () { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " -} -APP_ARGS=`save "$@"` -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index ac1b06f..7101f8e 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -14,7 +14,7 @@ @rem limitations under the License. @rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,7 +25,8 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -40,13 +41,13 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute +if %ERRORLEVEL% equ 0 goto execute -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -56,11 +57,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index 5ca2f01..bd67a4b 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -29,9 +29,9 @@ import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel.SendMode; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandler; import com.exactpro.th2.conn.dirty.tcp.core.api.IHandlerContext; +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannelListener; import com.exactpro.th2.conn.dirty.tcp.core.util.CommonUtil; import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; -import com.exactpro.th2.util.EmptyCache; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import io.netty.buffer.ByteBuf; @@ -54,7 +54,10 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -144,9 +147,8 @@ //todo ring buffer as cache //todo add events -public class FixHandler implements AutoCloseable, IHandler { +public class FixHandler implements AutoCloseable, IHandler, IChannelListener { private static final Logger LOGGER = LoggerFactory.getLogger(FixHandler.class); - private static final Cache EMPTY_MESSAGE_CACHE = EmptyCache.emptyCache(); private static final int DAY_SECONDS = 24 * 60 * 60; private static final String SOH = "\001"; @@ -193,7 +195,7 @@ public FixHandler(IHandlerContext context) { if (settings.getMessageCacheSize() > 0) { this.messageCache = CacheBuilder.newBuilder().maximumSize(settings.getMessageCacheSize()).build(); } else { - this.messageCache = EMPTY_MESSAGE_CACHE; + this.messageCache = null; } if(settings.getSessionStartTime() != null) { @@ -403,14 +405,18 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu FixField msgSeqNumValue = findField(message, MSG_SEQ_NUM_TAG); if (msgSeqNumValue == null) { metadata.put(REJECT_REASON, "No msgSeqNum Field"); - if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgSeqNum in message: %s", null, message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) { + error("Invalid message. No MsgSeqNum in message: " + message.toString(US_ASCII), null); + } return metadata; } FixField msgType = findField(message, MSG_TYPE_TAG); if (msgType == null) { metadata.put(REJECT_REASON, "No msgType Field"); - if(LOGGER.isErrorEnabled()) error("Invalid message. No MsgType in message: %s", null, message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) { + error("Invalid message. No MsgType in message: " + message.toString(US_ASCII), null); + } return metadata; } @@ -441,7 +447,10 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Sending logout with text: MsgSeqNum is too low...", receivedMsgSeqNum, serverMsgSeqNum.get()))); sendLogout(String.format("MsgSeqNum too low, expecting %d but received %d", serverMsgSeqNum.get() + 1, receivedMsgSeqNum)); reconnectRequestTimer = executorService.schedule(this::sendLogon, settings.getReconnectDelay(), TimeUnit.SECONDS); - if (LOGGER.isErrorEnabled()) error("Invalid message. SeqNum is less than expected %d: %s", null, serverMsgSeqNum.get(), message.toString(US_ASCII)); + if (LOGGER.isErrorEnabled()) { + error("Invalid message. SeqNum is less than expected %d: " + message.toString(US_ASCII), + null, serverMsgSeqNum.get()); + } } else { context.send(CommonUtil.toEvent(String.format("Received server sequence %d but expected %d. Correcting server sequence.", receivedMsgSeqNum, serverMsgSeqNum.get() + 1))); serverMsgSeqNum.set(receivedMsgSeqNum - 1); @@ -459,18 +468,20 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu switch (msgTypeValue) { case MSG_TYPE_HEARTBEAT: - if(LOGGER.isInfoEnabled()) info("Heartbeat received - %s", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Heartbeat received - " + message.toString(US_ASCII)); checkHeartbeat(message); break; case MSG_TYPE_LOGON: - if(LOGGER.isInfoEnabled()) info("Logon received - %s", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logon received - " + message.toString(US_ASCII)); boolean connectionSuccessful = checkLogon(message); if (connectionSuccessful) { if(settings.useNextExpectedSeqNum()) { FixField nextExpectedSeqField = findField(message, NEXT_EXPECTED_SEQ_NUMBER_TAG); if(nextExpectedSeqField == null) { metadata.put(REJECT_REASON, "No NextExpectedSeqNum field"); - if(LOGGER.isErrorEnabled()) error("Invalid message. No NextExpectedSeqNum in message: %s", null, message.toString(US_ASCII)); + if(LOGGER.isErrorEnabled()) { + error("Invalid message. No NextExpectedSeqNum in message: " + message.toString(US_ASCII), null); + } return metadata; } @@ -506,15 +517,15 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu break; //extract logout reason case MSG_TYPE_RESEND_REQUEST: - if(LOGGER.isInfoEnabled()) info("Resend request received - %s", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Resend request received - " + message.toString(US_ASCII)); handleResendRequest(message); break; case MSG_TYPE_SEQUENCE_RESET: //gap fill - if(LOGGER.isInfoEnabled()) info("Sequence reset received - %s", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Sequence reset received - " + message.toString(US_ASCII)); resetSequence(message); break; case MSG_TYPE_TEST_REQUEST: - if (LOGGER.isInfoEnabled()) LOGGER.info("Test request received - {}", message.toString(US_ASCII)); + if (LOGGER.isInfoEnabled()) info("Test request received - " + message.toString(US_ASCII)); handleTestRequest(message, metadata); break; } @@ -526,6 +537,11 @@ public Map onIncoming(@NotNull IChannel channel, @NotNull ByteBu return metadata; } + @Override + public void postOutgoingMqPublish(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull MessageID messageId, @NotNull Map metadata, @Nullable EventID eventId) { + putIntoCache(message); + } + private void handleTestRequest(ByteBuf message, Map metadata) { FixField testReqId = findField(message, TEST_REQ_ID_TAG); if(testReqId == null || testReqId.getValue() == null) { @@ -537,7 +553,7 @@ private void handleTestRequest(ByteBuf message, Map metadata) { } private void handleLogout(@NotNull ByteBuf message) { - if(LOGGER.isInfoEnabled()) info("Logout received - %s", message.toString(US_ASCII)); + if(LOGGER.isInfoEnabled()) info("Logout received - " + message.toString(US_ASCII)); boolean isSequenceChanged = false; FixField text = findField(message, TEXT_TAG); if (text != null) { @@ -574,7 +590,9 @@ private void resetSequence(ByteBuf message) { serverMsgSeqNum.set(Integer.parseInt(requireNonNull(seqNumValue.getValue())) - 1); } } else { - if(LOGGER.isWarnEnabled()) warn("Failed to reset servers MsgSeqNum. No such tag in message: %s", message.toString(US_ASCII)); + if(LOGGER.isWarnEnabled()) { + warn("Failed to reset servers MsgSeqNum. No such tag in message: " + message.toString(US_ASCII)); + } } } @@ -635,79 +653,71 @@ private void recovery(int beginSeqNo, int endSeqNo) { AtomicInteger lastProcessedSequence = new AtomicInteger(beginSeqNo - 1); try { recoveryLock.lock(); + if (endSeqNo == 0) { endSeqNo = msgSeqNum.get() + 1; } + int endSeq = endSeqNo; info("Recovery messages from %d to %d", beginSeqNo, endSeqNo); + if(settings.isLoadMissedMessagesFromCradle()) { + Function1 processMessage = (buf) -> { + FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); + FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + if(seqNum == null || seqNum.getValue() == null + || msgTypeField == null || msgTypeField.getValue() == null) { + return true; + } + int sequence = Integer.parseInt(seqNum.getValue()); + String msgType = msgTypeField.getValue(); - int beginSeq = beginSeqNo; - int endSeq = endSeqNo; + if(sequence < beginSeqNo) return true; + if(sequence > endSeq) return false; - Function1 processMessage = (buf) -> { - FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); - FixField msgTypeField = findField(buf, MSG_TYPE_TAG); - if(seqNum == null || seqNum.getValue() == null - || msgTypeField == null || msgTypeField.getValue() == null) { - return true; - } - int sequence = Integer.parseInt(seqNum.getValue()); - String msgType = msgTypeField.getValue(); + if(ADMIN_MESSAGES.contains(msgType)) return true; + FixField possDup = findField(buf, POSS_DUP_TAG); + if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; - if(sequence < beginSeqNo) return true; - if(sequence > endSeq) return false; + if(sequence - 1 != lastProcessedSequence.get() ) { + StringBuilder sequenceReset = + createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); + channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); + resetHeartbeatTask(); + } - if(ADMIN_MESSAGES.contains(msgType)) return true; - FixField possDup = findField(buf, POSS_DUP_TAG); - if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; + setTime(buf); + setPossDup(buf); + updateLength(buf); + updateChecksum(buf); + channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); - if(sequence - 1 != lastProcessedSequence.get() ) { - StringBuilder sequenceReset = - createSequenceReset(Math.max(beginSeqNo, lastProcessedSequence.get() + 1), sequence); - channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE); resetHeartbeatTask(); - } - setTime(buf); - setPossDup(buf); - updateLength(buf); - updateChecksum(buf); - channel.send(buf, createMetadataMap(), null, SendMode.MANGLE); - - resetHeartbeatTask(); - - lastProcessedSequence.set(sequence); - return true; - }; - - if (messageCache != EMPTY_MESSAGE_CACHE) { // check aren't references equal - info("Loading messages from %d to %d from message cache", beginSeq, endSeqNo); - for (int i = beginSeq; i < endSeqNo; i++) { - ByteBuf message = messageCache.getIfPresent(i); - if (message == null) { - info("Messages from %d included to %d excluded have been recovered from message cache", beginSeq, i); - beginSeq = i; - break; - } + lastProcessedSequence.set(sequence); + return true; + }; - if (!processMessage.invoke(message)) { - if (LOGGER.isWarnEnabled()) warn( - "Message from message cache has been rejected by process function, message: %s", - message.toString(US_ASCII)); + List cachedMessages = getFromCache(beginSeqNo, endSeq); + + if (cachedMessages.isEmpty()) { + info("Loading messages from %d to %d from cradle", beginSeqNo, endSeqNo); + messageLoader.processMessagesInRange( + channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND, + beginSeqNo, + processMessage + ); + } else { + for (ByteBuf message : cachedMessages) { + if (!processMessage.invoke(message)) { + if (LOGGER.isWarnEnabled()) warn( + "Message from message cache has been rejected by process function, message: " + + message.toString(US_ASCII)); + } } } - } - - if(settings.isLoadMissedMessagesFromCradle()) { - info("Loading messages from %d to %d from cradle", beginSeq, endSeqNo); - messageLoader.processMessagesInRange( - channel.getSessionGroup(), channel.getSessionAlias(), Direction.SECOND, - beginSeq, - processMessage - ); if(lastProcessedSequence.get() < endSeq) { - String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeq), msgSeqNum.get() + 1).toString(); + String seqReset = createSequenceReset(Math.max(lastProcessedSequence.get() + 1, beginSeqNo), msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE @@ -715,7 +725,7 @@ private void recovery(int beginSeqNo, int endSeqNo) { } } else { String seqReset = - createSequenceReset(beginSeq, msgSeqNum.get() + 1).toString(); + createSequenceReset(beginSeqNo, msgSeqNum.get() + 1).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), createMetadataMap(), null, SendMode.MANGLE @@ -736,6 +746,44 @@ private void recovery(int beginSeqNo, int endSeqNo) { } } + private void putIntoCache(@NotNull ByteBuf message) { + if (messageCache != null) { + FixField possDupField = findField(message, POSS_DUP_TAG); + if (possDupField == null || !IS_POSS_DUP.equals(possDupField.getValue())) { + FixField msgSeqNumField = findField(message, MSG_SEQ_NUM_TAG); + if (msgSeqNumField != null && msgSeqNumField.getValue() != null) { + try { + Integer seqNum = Integer.valueOf(msgSeqNumField.getValue()); + messageCache.put(seqNum, message.copy()); + } catch (NumberFormatException e) { + if (LOGGER.isWarnEnabled()) { + warn("Message can't be put into messages cache " + + "because MsgSeqNum isn't integer, message: " + message.toString(US_ASCII)); + } + } + } + } + } + } + + @NotNull + private List getFromCache(int beginSeqNo, int endSeq) { + if (messageCache != null) { + info("Try to get messages from %d to %d from message cache", beginSeqNo, endSeq); + List cachedMessages = new ArrayList<>(endSeq - beginSeqNo); + for (int i = beginSeqNo; i <= endSeq; i++) { + ByteBuf message = messageCache.getIfPresent(i); + if (message == null) { + info("Messages from %d included to %d excluded have been recovered from message cache", beginSeqNo, i); + return Collections.emptyList(); + } + cachedMessages.add(message); + } + return cachedMessages; + } + return Collections.emptyList(); + } + private void sendSequenceReset() { StringBuilder sequenceReset = new StringBuilder(); String time = getTime(); @@ -781,7 +829,7 @@ private boolean checkLogon(ByteBuf message) { public void onOutgoing(@NotNull IChannel channel, @NotNull ByteBuf message, @NotNull Map metadata) { onOutgoingUpdateTag(message, metadata); - if(LOGGER.isDebugEnabled()) debug("Outgoing message: %s", message.toString(US_ASCII)); + if(LOGGER.isDebugEnabled()) debug("Outgoing message: " + message.toString(US_ASCII)); if(enabled.get()) resetHeartbeatTask(); } @@ -807,7 +855,9 @@ public void onOutgoingUpdateTag(@NotNull ByteBuf message, @NotNull Map> future) { private void info(String message, Object... args) { if(LOGGER.isInfoEnabled()) { - LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + String comment = args.length == 0 ? message : String.format(message, args); + LOGGER.info("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), comment); } } private void error(String message, Throwable throwable, Object... args) { if(LOGGER.isErrorEnabled()) { - LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args), throwable); + String comment = args.length == 0 ? message : String.format(message, args); + LOGGER.error("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), comment, throwable); } } private void warn(String message, Object... args) { if(LOGGER.isWarnEnabled()) { - LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + String comment = args.length == 0 ? message : String.format(message, args); + LOGGER.warn("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), comment); } } private void debug(String message, Object... args) { if(LOGGER.isDebugEnabled()) { - LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), String.format(message, args)); + String comment = args.length == 0 ? message : String.format(message, args); + LOGGER.debug("{} - {}: {}", channel.getSessionGroup(), channel.getSessionAlias(), comment); } } diff --git a/src/main/java/com/exactpro/th2/FixHandlerSettings.java b/src/main/java/com/exactpro/th2/FixHandlerSettings.java index f91d29d..ad44e03 100644 --- a/src/main/java/com/exactpro/th2/FixHandlerSettings.java +++ b/src/main/java/com/exactpro/th2/FixHandlerSettings.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/exactpro/th2/constants/Constants.java b/src/main/java/com/exactpro/th2/constants/Constants.java index de4beed..9eb09da 100644 --- a/src/main/java/com/exactpro/th2/constants/Constants.java +++ b/src/main/java/com/exactpro/th2/constants/Constants.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,6 @@ package com.exactpro.th2.constants; -import java.time.format.DateTimeFormatter; -import java.util.Collections; import java.util.Set; public class Constants { @@ -92,12 +90,10 @@ public class Constants { public static final String MSG_TYPE_RESEND_REQUEST = "2"; public static final String MSG_TYPE_SEQUENCE_RESET = "4"; - public static final Set ADMIN_MESSAGES = Collections.unmodifiableSet( - Set.of( - MSG_TYPE_LOGON, MSG_TYPE_LOGOUT, - MSG_TYPE_HEARTBEAT, MSG_TYPE_RESEND_REQUEST, - MSG_TYPE_SEQUENCE_RESET, MSG_TYPE_TEST_REQUEST - ) + public static final Set ADMIN_MESSAGES = Set.of( + MSG_TYPE_LOGON, MSG_TYPE_LOGOUT, + MSG_TYPE_HEARTBEAT, MSG_TYPE_RESEND_REQUEST, + MSG_TYPE_SEQUENCE_RESET, MSG_TYPE_TEST_REQUEST ); public static final String IS_POSS_DUP = "Y"; diff --git a/src/main/java/com/exactpro/th2/util/EmptyCache.java b/src/main/java/com/exactpro/th2/util/EmptyCache.java deleted file mode 100644 index 1d5d9d0..0000000 --- a/src/main/java/com/exactpro/th2/util/EmptyCache.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.util; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheStats; -import com.google.common.collect.ImmutableMap; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; - -public final class EmptyCache implements Cache { - @SuppressWarnings("rawtypes") - private static final Cache EMPTY_CACHE = new EmptyCache<>(); - private static final CacheStats EMPTY_CACHE_STATS = new CacheStats(0, 0, 0, 0, 0, 0); - - private EmptyCache() { } - - @SuppressWarnings("unchecked") - public static Cache emptyCache() { - return (Cache) EMPTY_CACHE; - } - - @Nullable - @Override - public V getIfPresent(@NotNull Object key) { - return null; - } - - @Override - public @NotNull V get(@NotNull K key, @NotNull Callable loader) throws ExecutionException { - try { - return loader.call(); - } catch (Exception e) { - throw new ExecutionException(e); - } - } - - @Override - public @NotNull ImmutableMap getAllPresent(@NotNull Iterable keys) { - return ImmutableMap.of(); - } - - @Override - public void put(@NotNull K key, @NotNull V value) { - throw new UnsupportedOperationException(); - } - - @Override - public void putAll(@NotNull Map m) { - throw new UnsupportedOperationException(); - } - - @Override - public void invalidate(@NotNull Object key) { } - - @Override - public void invalidateAll(@NotNull Iterable keys) { } - - @Override - public void invalidateAll() { } - - @Override - public long size() { - return 0; - } - - @Override - public @NotNull CacheStats stats() { - return EMPTY_CACHE_STATS; - } - - @Override - public @NotNull ConcurrentMap asMap() { - throw new UnsupportedOperationException(); - } - - @Override - public void cleanUp() { } -} diff --git a/src/test/java/com/exactpro/th2/FixHandlerTest.java b/src/test/java/com/exactpro/th2/FixHandlerTest.java index 0a61f8a..0e9e521 100644 --- a/src/test/java/com/exactpro/th2/FixHandlerTest.java +++ b/src/test/java/com/exactpro/th2/FixHandlerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,14 @@ import com.exactpro.th2.util.MessageUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import kotlin.Unit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -36,14 +44,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import kotlin.Unit; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import static com.exactpro.th2.TestUtilsKt.generateMessageID; import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; import static com.exactpro.th2.constants.Constants.BEGIN_STRING_TAG; import static com.exactpro.th2.constants.Constants.BODY_LENGTH_TAG; @@ -58,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; class FixHandlerTest { - private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); private Channel channel; private FixHandler fixHandler; @@ -79,7 +80,7 @@ void beforeEach() { channel = new Channel(createHandlerSettings(), null); fixHandler = channel.getFixHandler(); fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); + fixHandler.onIncoming(channel, logonResponse, generateMessageID()); } @AfterAll @@ -148,7 +149,7 @@ void sendResendRequestTest() { channel.clearQueue(); fixHandler.sendLogon(); - fixHandler.onIncoming(channel, logonResponse); + fixHandler.onIncoming(channel, logonResponse, generateMessageID()); fixHandler.sendResendRequest(1); assertEquals(expectedLogon, new String(channel.getQueue().get(0).array())); //assertEquals(expectedHeartbeat, new String(client.getQueue().get(1).array())); @@ -215,7 +216,7 @@ void onConnectionTest() { channel.clearQueue(); fixHandler.onOpen(channel); ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); - fixHandler.onIncoming(channel, logonResponse); + fixHandler.onIncoming(channel, logonResponse, generateMessageID()); try { Thread.sleep(10000); } catch (InterruptedException e) { @@ -325,7 +326,7 @@ void handleResendRequestTest() { } ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=70\u000135=2\u000134=2\u00017=1\u000116=0\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u000110=101\u0001".getBytes(StandardCharsets.US_ASCII)); channel.clearQueue(); - fixHandler.onIncoming(channel, resendRequest); + fixHandler.onIncoming(channel, resendRequest, generateMessageID()); ByteBuf sequenceReset = channel.getQueue().get(0); assertEquals("8=FIXT.1.1\u00019=105\u000135=4\u000134=1\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u0001122=2014-12-22T10:15:30Z\u000143=Y\u0001123=Y\u000136=5\u000110=162\u0001", new String(sequenceReset.array())); channel.clearQueue(); @@ -434,6 +435,7 @@ public CompletableFuture open() { @Override public CompletableFuture send(@NotNull ByteBuf byteBuf, @NotNull Map map, EventID eventId, @NotNull IChannel.SendMode sendMode) { queue.add(byteBuf); + this.fixHandler.postOutgoingMqPublish(this, byteBuf, MessageID.getDefaultInstance(), map, eventId); return CompletableFuture.completedFuture(MessageID.getDefaultInstance()); } @@ -443,7 +445,7 @@ public boolean isOpen() { } @Override - public CompletableFuture close() { + public @NotNull CompletableFuture close() { return CompletableFuture.completedFuture(Unit.INSTANCE); } diff --git a/src/test/java/com/exactpro/th2/RecoveryTest.java b/src/test/java/com/exactpro/th2/RecoveryTest.java deleted file mode 100644 index 9399681..0000000 --- a/src/test/java/com/exactpro/th2/RecoveryTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2; - -import com.exactpro.th2.conn.dirty.fix.MessageSearcher; -import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService; -import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse; -import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest; -import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse; -import com.google.protobuf.ByteString; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - -import static com.exactpro.th2.FixHandlerTest.createHandlerSettings; -import static com.exactpro.th2.conn.dirty.fix.FixByteBufUtilKt.findField; -import static com.exactpro.th2.constants.Constants.MSG_SEQ_NUM_TAG; -import static com.exactpro.th2.constants.Constants.MSG_TYPE_SEQUENCE_RESET; -import static com.exactpro.th2.constants.Constants.MSG_TYPE_TAG; -import static com.exactpro.th2.constants.Constants.NEW_SEQ_NO_TAG; -import static com.exactpro.th2.constants.Constants.POSS_DUP_TAG; -import static org.junit.jupiter.api.Assertions.assertEquals; - -@SuppressWarnings("DataFlowIssue") -public class RecoveryTest { - - private static final ByteBuf logonResponse = Unpooled.wrappedBuffer("8=FIXT.1.1\0019=105\00135=A\00134=1\00149=server\00156=client\00150=system\00152=2014-12-22T10:15:30Z\00198=0\001108=30\0011137=9\0011409=0\00110=203\001".getBytes(StandardCharsets.US_ASCII)); - private Channel channel; - private FixHandler fixHandler; - - @Test - void testSequenceResetInRange() { - FixHandlerSettings settings = createHandlerSettings(); - settings.setLoadMissedMessagesFromCradle(true); - DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( - List.of( - messageSearchResponse(2), - messageSearchResponse(3), - messageSearchResponse(4), - messageSearchResponse(5) - ) - ); - Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) - ); - channel = new Channel(settings, dataProviderService); - fixHandler = channel.getFixHandler(); - fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); - // requesting resend from 2 to 5 - ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); - fixHandler.onIncoming(channel, resendRequest); - assertEquals(5, channel.getQueue().size()); - - for(int i = 1; i <= 4; i++) { - ByteBuf buf = channel.getQueue().get(i); - assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); - assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); - assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); - } - } - - @Test - void testSequenceResetInsideRange() { - FixHandlerSettings settings = createHandlerSettings(); - settings.setLoadMissedMessagesFromCradle(true); - DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( - List.of( - messageSearchResponse(4), - messageSearchResponse(5) - ) - ); - Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) - ); - channel = new Channel(settings, dataProviderService); - fixHandler = channel.getFixHandler(); - fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); - // handler sequence after loop is 22 - for(int i = 0; i <= 20; i++) { - fixHandler.onOutgoing( - channel, - Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), - new HashMap<>() - ); - } - // requesting resend from 2 to 8 - ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=8\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); - fixHandler.onIncoming(channel, resendRequest); - assertEquals(channel.getQueue().size(), 5); - - // for missed messages after beginSeqNo to 4 - ByteBuf firstSequenceReset = channel.getQueue().get(1); - assertEquals(findField(firstSequenceReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); - assertEquals(Integer.parseInt(findField(firstSequenceReset, MSG_SEQ_NUM_TAG).getValue()), 2); - assertEquals(Integer.parseInt(findField(firstSequenceReset, NEW_SEQ_NO_TAG).getValue()), 4); - - ByteBuf message4 = channel.getQueue().get(2); - - assertEquals(findField(message4, MSG_TYPE_TAG).getValue(), "C"); - assertEquals(Integer.parseInt(findField(message4, MSG_SEQ_NUM_TAG).getValue()), 4); - assertEquals(findField(message4, POSS_DUP_TAG).getValue(), "Y"); - - ByteBuf message5 = channel.getQueue().get(3); - - assertEquals(findField(message5, MSG_TYPE_TAG).getValue(), "C"); - assertEquals(Integer.parseInt(findField(message5, MSG_SEQ_NUM_TAG).getValue()), 5); - assertEquals(findField(message5, POSS_DUP_TAG).getValue(), "Y"); - - // For missed messages after 4 - ByteBuf seqReset2 = channel.getQueue().get(4); - assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); - assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 6); - assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); - } - - @Test - void testSequenceResetOutOfRange() { - FixHandlerSettings settings = createHandlerSettings(); - settings.setLoadMissedMessagesFromCradle(true); - DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( - List.of( - messageSearchResponse(1), - messageSearchResponse(2), - messageSearchResponse(3), - messageSearchResponse(4), - messageSearchResponse(5), - messageSearchResponse(6) - ) - ); - Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) - ); - channel = new Channel(settings, dataProviderService); - fixHandler = channel.getFixHandler(); - fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); - // requesting resend from 2 to 5 - ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); - fixHandler.onIncoming(channel, resendRequest); - assertEquals(5, channel.getQueue().size()); - for(int i = 1; i <= 4; i++) { - ByteBuf buf = channel.getQueue().get(i); - assertEquals(findField(buf, MSG_TYPE_TAG).getValue(), "C"); - assertEquals(Integer.parseInt(findField(buf, MSG_SEQ_NUM_TAG).getValue()), i + 1); - assertEquals(findField(buf, POSS_DUP_TAG).getValue(), "Y"); - } - } - - @Test - void testSequenceResetAdminMessages() { - FixHandlerSettings settings = createHandlerSettings(); - settings.setLoadMissedMessagesFromCradle(true); - DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - MessageSearcher ms = new MessageSearcher( - List.of( - messageSearchResponseAdmin(2), - messageSearchResponse(4), - messageSearchResponseAdmin(5), - messageSearchResponseAdmin(6) - ) - ); - Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenAnswer( - x -> ms.searchMessages(x.getArgument(0, MessageGroupsSearchRequest.class)) - ); - channel = new Channel(settings, dataProviderService); - fixHandler = channel.getFixHandler(); - fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); - // handler sequence after loop is 22 - for(int i = 0; i <= 20; i++) { - fixHandler.onOutgoing( - channel, - Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), - new HashMap<>() - ); - } - // requesting resend from 1 to 5 - ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); - fixHandler.onIncoming(channel, resendRequest); - - // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) - ByteBuf seqReset1 = channel.getQueue().get(1); - assertEquals(findField(seqReset1, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); - assertEquals(Integer.parseInt(findField(seqReset1, MSG_SEQ_NUM_TAG).getValue()), 1); - assertEquals(Integer.parseInt(findField(seqReset1, NEW_SEQ_NO_TAG).getValue()), 4); - - ByteBuf message = channel.getQueue().get(2); - assertEquals(findField(message, MSG_TYPE_TAG).getValue(), "C"); - assertEquals(Integer.parseInt(findField(message, MSG_SEQ_NUM_TAG).getValue()), 4); - assertEquals(findField(message, POSS_DUP_TAG).getValue(), "Y"); - - // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) - ByteBuf seqReset2 = channel.getQueue().get(3); - assertEquals(findField(seqReset2, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); - assertEquals(Integer.parseInt(findField(seqReset2, MSG_SEQ_NUM_TAG).getValue()), 5); - assertEquals(Integer.parseInt(findField(seqReset2, NEW_SEQ_NO_TAG).getValue()), 23); - - } - - @Test - void allMessagesMissed() { - FixHandlerSettings settings = createHandlerSettings(); - settings.setLoadMissedMessagesFromCradle(true); - DataProviderService dataProviderService = Mockito.mock(DataProviderService.class); - Mockito.when(dataProviderService.searchMessageGroups(Mockito.any())).thenReturn(Collections.emptyIterator()); - channel = new Channel(settings, dataProviderService); - fixHandler = channel.getFixHandler(); - fixHandler.onOpen(channel); - fixHandler.onIncoming(channel, logonResponse); - // handler sequence after loop is 22 - for(int i = 0; i <= 20; i++) { - fixHandler.onOutgoing( - channel, - Unpooled.buffer().writeBytes(messageWithoutSeqNum().getBytes(StandardCharsets.UTF_8)), - new HashMap<>() - ); - } - // requesting resend from 1 to 5 - ByteBuf resendRequest = Unpooled.wrappedBuffer("8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".getBytes(StandardCharsets.UTF_8)); - fixHandler.onIncoming(channel, resendRequest); - - // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) - ByteBuf seqReset = channel.getQueue().get(1); - assertEquals(findField(seqReset, MSG_TYPE_TAG).getValue(), MSG_TYPE_SEQUENCE_RESET); - assertEquals(Integer.parseInt(findField(seqReset, MSG_SEQ_NUM_TAG).getValue()), 1); - assertEquals(Integer.parseInt(findField(seqReset, NEW_SEQ_NO_TAG).getValue()), 23); - } - - private MessageSearchResponse messageSearchResponse(Integer sequence) { - return MessageSearchResponse.newBuilder() - .setMessage( - MessageGroupResponse.newBuilder() - .setBodyRaw(ByteString.copyFromUtf8(message(sequence))) - ).build(); - } - - private MessageSearchResponse messageSearchResponseAdmin(Integer sequence) { - return MessageSearchResponse.newBuilder() - .setMessage( - MessageGroupResponse.newBuilder() - .setBodyRaw(ByteString.copyFromUtf8(adminMessage(sequence))) - ).build(); - } - - private String message(Integer sequence) { - return String.format("8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); - } - - private String messageWithoutSeqNum() { - return "8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001"; - } - - private String adminMessage(Integer sequence) { - return String.format("8=FIXT.1.1\u00019=70\u000135=4\u0001552=1\u000149=client\u000134=%d\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001", sequence); - } -} diff --git a/src/test/kotlin/com/exactpro/th2/RecoveryTest.kt b/src/test/kotlin/com/exactpro/th2/RecoveryTest.kt new file mode 100644 index 0000000..80fa2ab --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/RecoveryTest.kt @@ -0,0 +1,368 @@ +/* + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2 + +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.conn.dirty.fix.MessageSearcher +import com.exactpro.th2.conn.dirty.fix.findField +import com.exactpro.th2.constants.Constants +import com.exactpro.th2.constants.Constants.IS_POSS_DUP +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupResponse +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import org.mockito.kotlin.verifyNoMoreInteractions +import java.nio.charset.StandardCharsets +import java.util.Collections.emptyIterator +import kotlin.test.Test + +class RecoveryTest { + private lateinit var channel: Channel + private lateinit var fixHandler: FixHandler + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun testSequenceResetInRange(useCache: Boolean) { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = if(useCache) 100 else 0 + } + val messages: List = (3 .. 5).map { message(it, it.toString()) } + val ms = MessageSearcher(messages.map(::toMessageSearchResponse)) + val dataProviderService: DataProviderService = mock { + on { searchMessageGroups(any()) }.thenAnswer { + ms.searchMessages(it.getArgument(0)) + } + } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + messages.forEach { + val byteBuf = Unpooled.buffer().writeBytes(it.toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, byteBuf, HashMap()) + fixHandler.postOutgoingMqPublish(channel, byteBuf, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 2 to 5 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=5\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + assertEquals(5, channel.queue.size) + + channel.queue[0].assertLogon(1) + channel.queue[1].assertSequenceReset(2, 3) + channel.queue[2].assertMessage("C", 3, IS_POSS_DUP, "3") + channel.queue[3].assertMessage("C", 4, IS_POSS_DUP, "4") + channel.queue[4].assertMessage("C", 5, IS_POSS_DUP, "5") + + verify(dataProviderService, times(2)).searchMessageGroups(any()) + verifyNoMoreInteractions(dataProviderService) + } + + @Test + fun testSequenceResetInsideRange() { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = 0 + } + val ms = MessageSearcher( + listOf( + messageSearchResponse(4), + messageSearchResponse(5) + ) + ) + val dataProviderService: DataProviderService = mock { + on { searchMessageGroups(any()) }.thenAnswer { + ms.searchMessages(it.getArgument(0)) + } + } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + // handler sequence after loop is 22 + for (i in 0..20) { + val message = Unpooled.buffer().writeBytes(messageWithoutSeqNum().toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, message, HashMap()) + fixHandler.postOutgoingMqPublish(channel, message, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 2 to 8 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=8\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + assertEquals(5, channel.queue.size, channel.queue.asSequence() + .map { it.findField(Constants.MSG_SEQ_NUM_TAG)?.value } + .joinToString() + ) + + channel.queue[0].assertLogon(1) + // for missed messages after beginSeqNo to 4 + channel.queue[1].assertSequenceReset(2, 4) + channel.queue[2].assertMessage("C", 4, IS_POSS_DUP) + channel.queue[3].assertMessage("C", 5, IS_POSS_DUP) + // For missed messages after 4 + channel.queue[4].assertSequenceReset(6, 23) + + verify(dataProviderService, times(2)).searchMessageGroups(any()) + verifyNoMoreInteractions(dataProviderService) + } + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun testSequenceResetOutOfRange(useCache: Boolean) { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = if(useCache) 100 else 0 + } + val ms = MessageSearcher((2 .. 4).map { messageSearchResponse(it, it.toString()) }) + val dataProviderService: DataProviderService = mock { + on { searchMessageGroups(any()) }.thenAnswer { + ms.searchMessages(it.getArgument(0)) + } + } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + for (i in 2..4) { + val byteBuf = Unpooled.buffer().writeBytes(messageWithoutSeqNum(i.toString()).toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, byteBuf, HashMap()) + fixHandler.postOutgoingMqPublish(channel, byteBuf, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 2 to 6 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=2\u000116=6\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + assertEquals(5, channel.queue.size) + + channel.queue[0].assertLogon(1) + channel.queue[1].assertMessage("C", 2, IS_POSS_DUP, "2") + channel.queue[2].assertMessage("C", 3, IS_POSS_DUP, "3") + channel.queue[3].assertMessage("C", 4, IS_POSS_DUP, "4") + channel.queue[4].assertSequenceReset(5, 5) + + verify(dataProviderService, times(2)).searchMessageGroups(any()) + verifyNoMoreInteractions(dataProviderService) + } + + @Test + fun testSequenceResetAdminMessages() { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = 0 + } + val ms = MessageSearcher( + listOf( + messageSearchResponseAdmin(2), + messageSearchResponse(4), + messageSearchResponseAdmin(5), + messageSearchResponseAdmin(6) + ) + ) + val dataProviderService: DataProviderService = mock { + on { searchMessageGroups(any()) }.thenAnswer { + ms.searchMessages(it.getArgument(0)) + } + } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + // handler sequence after loop is 22 + for (i in 0..20) { + val message = Unpooled.buffer().writeBytes(messageWithoutSeqNum().toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, message, HashMap()) + fixHandler.postOutgoingMqPublish(channel, message, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 1 to 5 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + channel.queue[1].assertSequenceReset(1, 4) + channel.queue[2].assertMessage("C", 4, IS_POSS_DUP) + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + channel.queue[3].assertSequenceReset(5, 23) + + verify(dataProviderService, times(2)).searchMessageGroups(any()) + verifyNoMoreInteractions(dataProviderService) + } + + @Test + fun allMessagesMissed() { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = 0 + } + val dataProviderService: DataProviderService = mock { + on { searchMessageGroups(any()) }.thenReturn(emptyIterator()) + } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + // handler sequence after loop is 22 + for (i in 0..20) { + val message = Unpooled.buffer().writeBytes(messageWithoutSeqNum().toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, message, HashMap()) + fixHandler.postOutgoingMqPublish(channel, message, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 1 to 5 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + assertEquals(2, channel.queue.size) + + // sequence reset for messages from 1 to 3 ( 1, 2 - missing, 3 - admin ) + channel.queue[0].assertLogon(1) + channel.queue[1].assertSequenceReset(1, 23) + + verify(dataProviderService).searchMessageGroups(any()) + verifyNoMoreInteractions(dataProviderService) + } + + @Test + fun recoverFromCache() { + val settings = FixHandlerTest.createHandlerSettings().apply { + isLoadMissedMessagesFromCradle = true + messageCacheSize = 100 + } + val dataProviderService: DataProviderService = mock { } + channel = Channel(settings, dataProviderService) + fixHandler = channel.fixHandler + fixHandler.onOpen(channel) + fixHandler.onIncoming(channel, logonResponse, generateMessageID()) + repeat(20) { + val message = Unpooled.buffer().writeBytes(messageWithoutSeqNum(it.toString()).toByteArray(StandardCharsets.UTF_8)) + fixHandler.onOutgoing(channel, message, HashMap()) + fixHandler.postOutgoingMqPublish(channel, message, MessageID.getDefaultInstance(), HashMap(), null) + } + // requesting resend from 1 to 5 + val resendRequest = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=73\u000135=2\u000134=2\u000149=client\u000156=server\u000150=trader\u000152=2014-12-22T10:15:30Z\u00017=1\u000116=5\u000110=226\u0001".toByteArray( + StandardCharsets.UTF_8 + ) + ) + fixHandler.onIncoming(channel, resendRequest, generateMessageID()) + + assertEquals(6, channel.queue.size) + channel.queue[0].assertLogon(1) + channel.queue[1].assertSequenceReset(1, 2) + channel.queue[2].assertMessage("C", 2, IS_POSS_DUP, "0") + channel.queue[3].assertMessage("C", 3, IS_POSS_DUP, "1") + channel.queue[4].assertMessage("C", 4, IS_POSS_DUP, "2") + channel.queue[5].assertMessage("C", 5, IS_POSS_DUP, "3") + + verifyNoInteractions(dataProviderService) + } + + companion object { + private val logonResponse: ByteBuf = Unpooled.wrappedBuffer( + "8=FIXT.1.1\u00019=105\u000135=A\u000134=1\u000149=server\u000156=client\u000150=system\u000152=2014-12-22T10:15:30Z\u000198=0\u0001108=30\u00011137=9\u00011409=0\u000110=203\u0001".toByteArray( + StandardCharsets.US_ASCII + ) + ) + + private fun messageSearchResponse(sequence: Int, test: String = "test"): MessageSearchResponse = MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(message(sequence, test))) + ).build() + + private fun messageSearchResponseAdmin(sequence: Int): MessageSearchResponse = + MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(adminMessage(sequence))) + ).build() + + private fun toMessageSearchResponse(message: String) = MessageSearchResponse.newBuilder() + .setMessage( + MessageGroupResponse.newBuilder() + .setBodyRaw(ByteString.copyFromUtf8(message)) + ).build() + + private fun ByteBuf.assertMessage( + msgType: String, + msgSeqNum: Int, + possDupFlag: String, + text: String? = "test", + ) { + assertAll( + { assertEquals(msgType, findField(Constants.MSG_TYPE_TAG)?.value, "MsgType mismatch") }, + { assertEquals(msgSeqNum, findField(Constants.MSG_SEQ_NUM_TAG)?.value?.toInt(), "SeqNum mismatch") }, + { assertEquals(possDupFlag, findField(Constants.POSS_DUP_TAG)?.value ?: "N", "PostDupFlag mismatch") }, + { assertEquals(text, findField(Constants.TEXT_TAG)?.value, "Text mismatch") }, + ) + } + + private fun ByteBuf.assertLogon( + msgSeqNum: Int, + ) { + assertAll( + { assertEquals(Constants.MSG_TYPE_LOGON, findField(Constants.MSG_TYPE_TAG)?.value, "MsgType mismatch") }, + { assertEquals(msgSeqNum, findField(Constants.MSG_SEQ_NUM_TAG)?.value?.toInt(), "SeqNum mismatch") }, + ) + } + + private fun ByteBuf.assertSequenceReset( + msgSeqNum: Int, + newSeqNum: Int, + ) { + assertAll( + { assertEquals(Constants.MSG_TYPE_SEQUENCE_RESET, findField(Constants.MSG_TYPE_TAG)?.value, "MsgType mismatch") }, + { assertEquals(msgSeqNum, findField(Constants.MSG_SEQ_NUM_TAG)?.value?.toInt(), "SeqNum mismatch") }, + { assertEquals(newSeqNum, findField(Constants.NEW_SEQ_NO_TAG)?.value?.toInt(), "NewSeqNum mismatch") }, + ) + } + + private fun message(sequence: Int, test: String = "test"): String = + "8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000134=${sequence}\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000158=${test}\u000110=132\u0001" + + private fun messageWithoutSeqNum(test: String = "test"): String = + "8=FIXT.1.1\u00019=70\u000135=C\u0001552=1\u000149=client\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000158=${test}\u000110=132\u0001" + + private fun adminMessage(sequence: Int): String = + "8=FIXT.1.1\u00019=70\u000135=4\u0001552=1\u000149=client\u000134=${sequence}\u000156=server\u000152=2014-12-22T10:15:30Z\u000150=trader\u000110=132\u0001" + } +} diff --git a/src/test/kotlin/com/exactpro/th2/TestUtils.kt b/src/test/kotlin/com/exactpro/th2/TestUtils.kt new file mode 100644 index 0000000..ddb62e3 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/TestUtils.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2 + +import com.exactpro.th2.common.grpc.ConnectionID +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.MessageID +import com.google.protobuf.Timestamp +import com.google.protobuf.util.Timestamps +import java.util.concurrent.atomic.AtomicLong + +private val SEQUENCE_COUNTER = AtomicLong() + +@JvmOverloads +fun generateMessageID( + direction: Direction = Direction.FIRST, + sequence: Long = SEQUENCE_COUNTER.incrementAndGet(), + timestamp: Timestamp = Timestamps.now(), + alias: String = "test-session-alias", + group: String = "test-session-group", + book: String = "test-book", +): MessageID { + return MessageID.newBuilder() + .setBookName(book) + .setConnectionId( + ConnectionID.newBuilder() + .setSessionGroup(group) + .setSessionAlias(alias) + .build() + ) + .setTimestamp(timestamp) + .setSequence(sequence) + .setDirection(direction) + .build() +} \ No newline at end of file