Skip to content

Commit

Permalink
[TH2-4951] Set direction in the message. Group by session alias only
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Jun 13, 2023
1 parent 0ea5317 commit dc456d0
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 42 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ repositories {
dependencies {
api platform('com.exactpro.th2:bom:4.2.0')
api 'com.exactpro.th2:common:5.2.0'
api 'com.exactpro.th2:read-file-common-core:2.0.0-dev-version-2-4315008828-1ed0f94-SNAPSHOT'
api 'com.exactpro.th2:read-file-common-core:2.0.0-th2-4951-5256417699-8614fb0-SNAPSHOT'

implementation('com.opencsv:opencsv:5.7.0') {
because("we need to write a correct CSV in case of free pattern is used")
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/exactpro/th2/readlog/LogData.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.exactpro.th2.readlog;

import com.exactpro.th2.common.grpc.Direction;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -27,6 +29,7 @@ public final class LogData {
private List<String> body;
private String rawTimestamp;
private Instant parsedTimestamp;
private Direction direction;

public LogData() {
this(null);
Expand Down Expand Up @@ -61,6 +64,14 @@ public void setParsedTimestamp(Instant localDateTime) {
this.parsedTimestamp = localDateTime;
}

public Direction getDirection() {
return direction;
}

public void setDirection(Direction direction) {
this.direction = direction;
}

private void initIfNeeded() {
if (body == null) {
body = new ArrayList<>();
Expand Down
25 changes: 16 additions & 9 deletions src/main/java/com/exactpro/th2/readlog/RegexLogParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.exactpro.th2.common.grpc.Direction;
import com.exactpro.th2.read.file.common.StreamId;
import com.exactpro.th2.readlog.cfg.AliasConfiguration;
import com.opencsv.CSVWriter;
Expand Down Expand Up @@ -65,17 +67,22 @@ public LogData parse(StreamId streamId, String raw) {
throw new IllegalArgumentException("Unknown alias '" + sessionAlias +"'. No configuration found" );
}

LogData resultData = new LogData();

Pattern directionPattern = Objects.requireNonNull(configuration.getDirectionToPattern().get(streamId.getDirection()),
() -> "Pattern for direction " + streamId.getDirection() + " and session " + sessionAlias);
Matcher matcher = directionPattern.matcher(raw);
// check if the a string matches the direction from streamId
// skip line if it is not ours direction
if (!matcher.find()) {
return resultData;
Direction direction = null;
for (Entry<Direction, Pattern> entry : configuration.getDirectionToPattern().entrySet()) {
if (entry.getValue().matcher(raw).find()) {
direction = entry.getKey();
break;
}
}
// check whether the line matches any direction regex
// if not it is not our line
if (direction == null) {
return LogData.EMPTY;
}

LogData resultData = new LogData();
resultData.setDirection(direction);

List<Integer> regexGroups = configuration.getGroups();
if (configuration.isJoinGroups()) {
parseBodyJoined(raw, configuration, resultData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration confi
configuration.getLogDirectory(),
(Path path) -> configuration.getAliases().entrySet().stream()
.filter(entry -> entry.getValue().getPathFilter().matcher(path.getFileName().toString()).matches())
.flatMap(entry -> entry.getValue().getDirectionToPattern()
.keySet().stream()
.map(direction -> new StreamId(entry.getKey(), direction))
).collect(Collectors.toSet()),
.map(entry -> new StreamId(entry.getKey()))
.collect(Collectors.toSet()),
files -> files.sort(pathComparator),
path -> true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

public class RegexpContentParser extends LineParser {
private static final Logger LOGGER = LoggerFactory.getLogger(RegexpContentParser.class);
private final RegexLogParser parser;

public RegexpContentParser(RegexLogParser parser) {
this.parser = Objects.requireNonNull(parser, "'Parser' parameter");
this.parser = requireNonNull(parser, "'Parser' parameter");
}

@Nonnull
Expand All @@ -59,6 +61,8 @@ protected List<RawMessage.Builder> lineToMessages(@Nonnull StreamId streamId, @N
}

private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) {
builder.getIdBuilder().setDirection(requireNonNull(logData.getDirection(),
"direction is not set"));
if (logData.getParsedTimestamp() != null) {
builder.getIdBuilder().setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.exactpro.th2.readlog.impl
import com.exactpro.cradle.BookId
import com.exactpro.cradle.CradleStorage
import com.exactpro.cradle.Order
import com.exactpro.cradle.messages.GroupedMessageFilter
import com.exactpro.cradle.messages.MessageFilter
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.grpc.RawMessage
Expand All @@ -30,6 +31,7 @@ import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState
import com.google.protobuf.ByteString
import com.google.protobuf.UnsafeByteOperations
import java.time.Instant
import java.time.temporal.ChronoUnit

class CradleReaderState private constructor(
private val cradleStorage: CradleStorage,
Expand All @@ -40,16 +42,15 @@ class CradleReaderState private constructor(
: this(cradleStorage, InMemoryReaderState(), bookSupplier)

override fun get(streamId: StreamId): StreamData? {
return delegate[streamId] ?: cradleStorage.getMessages(
MessageFilter.builder()
.sessionAlias(streamId.sessionAlias)
.direction(streamId.direction.toCradleDirection())
return delegate[streamId] ?: cradleStorage.getGroupedMessageBatches(
GroupedMessageFilter.builder()
.groupName(streamId.sessionAlias)
.bookId(BookId(bookSupplier(streamId)))
.timestampTo().isLessThanOrEqualTo(Instant.now())
.limit(1)
.order(Order.REVERSE)
.build()
).asSequence().firstOrNull()?.run {
).asSequence().firstOrNull()?.lastMessage?.run {
StreamData(
timestamp,
sequence,
Expand Down
31 changes: 15 additions & 16 deletions src/test/java/com/exactpro/th2/readlog/TestLogParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -46,7 +46,8 @@ public class TestLogParser {
static final String TEST_MESSAGE_ALIAS = "tma";
static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT = "tma_wrong_format";
static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN = "tma_wrong_pattern";
static final String RAW_LOG = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";
static final String RAW_LOG_IN = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";
static final String RAW_LOG_OUT = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - outgoing fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";

@ParameterizedTest(name = "Has message: {1}, Skips before: {0}")
@MethodSource("skipMessageParams")
Expand All @@ -65,7 +66,10 @@ void skipMessageByTimestamp(Instant skipBefore, boolean hasMessages) {
"test", cfg
));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "2021-03-23 13:21:37.991337479 some data");
LogData data = parser.parse(new StreamId("test"), "2021-03-23 13:21:37.991337479 some data");
if (hasMessages) {
assertEquals(Direction.FIRST, data.getDirection(), "unexpected direction");
}
assertEquals(hasMessages, !data.getBody().isEmpty(), () -> "unexpected data " + data.getBody());
if (hasMessages) {
assertEquals(List.of("2021-03-23 13:21:37.991337479 some data"), data.getBody());
Expand All @@ -79,45 +83,40 @@ static List<Arguments> skipMessageParams() {
);
}

@Test
void parser() {
@ParameterizedTest
@EnumSource(value = Direction.class, mode = EnumSource.Mode.EXCLUDE, names = "UNRECOGNIZED")
void parser(Direction direction) {
RegexLogParser logParser = new RegexLogParser(getConfiguration());
LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.FIRST), RAW_LOG);
LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS), direction == Direction.FIRST ? RAW_LOG_IN : RAW_LOG_OUT);
assertEquals(direction, data.getDirection(), "unexpected direction");
assertEquals(1, data.getBody().size());
assertEquals("NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}", data.getBody().get(0));
assertEquals("2021-03-23 13:21:37.991337479", data.getRawTimestamp());
assertEquals(Instant.parse("2021-03-23T13:21:37.991337479Z"), data.getParsedTimestamp());
}

@Test
void skipIfDirectionPatternDoesNotMatch() {
RegexLogParser logParser = new RegexLogParser(getConfiguration());
LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.SECOND), RAW_LOG);
assertTrue(data.getBody().isEmpty(), () -> "Unexpected data read: " + data.getBody());
}

@Test
void parserErrors() {
RegexLogParser logParser = new RegexLogParser(getConfiguration());

assertAll(
() -> {
var ex = assertThrows(IllegalStateException.class,
() -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT, Direction.FIRST), RAW_LOG));
() -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("The timestamp '2021-03-23 13:21:37.991337479' cannot be parsed"),
() -> "Actual error: " + ex.getMessage());
},
() -> {
var ex = assertThrows(IllegalStateException.class,
() -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN, Direction.FIRST), RAW_LOG));
() -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("The pattern '3012.*' cannot extract the timestamp from the string"),
() -> "Actual error: " + ex.getMessage());
},
() -> {
var ex = assertThrows(IllegalArgumentException.class,
() -> logParser.parse(new StreamId("wrong_alias", Direction.FIRST), RAW_LOG));
() -> logParser.parse(new StreamId("wrong_alias"), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("Unknown alias 'wrong_alias'. No configuration found"),
() -> "Actual error: " + ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void joinsIfOneMatchFound() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "this a test string, 123 and no more");
LogData data = parser.parse(new StreamId("test"), "this a test string, 123 and no more");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand All @@ -67,7 +67,7 @@ void doesNotTryToSubstituteInResultString() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "[test] should not try to process ${3} and ${variable}");
LogData data = parser.parse(new StreamId("test"), "[test] should not try to process ${3} and ${variable}");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand All @@ -92,7 +92,7 @@ void joinsIfManyMatchesFound() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand All @@ -119,7 +119,7 @@ void usesCorrectDelimiter() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42");
LogData data = parser.parse(new StreamId("test"), "A, 42");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand All @@ -145,7 +145,7 @@ void usesConstantsFromMap() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand All @@ -171,7 +171,7 @@ void correctlyAcceptsGroupsByName() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));

LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List<String> body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
Expand Down

0 comments on commit dc456d0

Please sign in to comment.