From dc456d04b99842130374af19348b78c2a02c9ed2 Mon Sep 17 00:00:00 2001 From: Oleg Date: Tue, 13 Jun 2023 18:25:21 +0400 Subject: [PATCH 1/2] [TH2-4951] Set direction in the message. Group by session alias only --- build.gradle | 2 +- .../com/exactpro/th2/readlog/LogData.java | 11 +++++++ .../exactpro/th2/readlog/RegexLogParser.java | 25 +++++++++------ .../th2/readlog/impl/LogFileReader.java | 6 ++-- .../th2/readlog/impl/RegexpContentParser.java | 6 +++- .../th2/readlog/impl/CradleReaderState.kt | 11 ++++--- .../exactpro/th2/readlog/TestLogParser.java | 31 +++++++++---------- .../readlog/TestRegexLogParserJoining.java | 12 +++---- 8 files changed, 62 insertions(+), 42 deletions(-) diff --git a/build.gradle b/build.gradle index 2a0061e..214e4ed 100644 --- a/build.gradle +++ b/build.gradle @@ -51,7 +51,7 @@ repositories { dependencies { api platform('com.exactpro.th2:bom:4.2.0') api 'com.exactpro.th2:common:5.2.0' - api 'com.exactpro.th2:read-file-common-core:2.0.0-dev-version-2-4315008828-1ed0f94-SNAPSHOT' + api 'com.exactpro.th2:read-file-common-core:2.0.0-th2-4951-5256417699-8614fb0-SNAPSHOT' implementation('com.opencsv:opencsv:5.7.0') { because("we need to write a correct CSV in case of free pattern is used") diff --git a/src/main/java/com/exactpro/th2/readlog/LogData.java b/src/main/java/com/exactpro/th2/readlog/LogData.java index 839d7e4..137ae37 100644 --- a/src/main/java/com/exactpro/th2/readlog/LogData.java +++ b/src/main/java/com/exactpro/th2/readlog/LogData.java @@ -15,6 +15,8 @@ */ package com.exactpro.th2.readlog; +import com.exactpro.th2.common.grpc.Direction; + import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -27,6 +29,7 @@ public final class LogData { private List body; private String rawTimestamp; private Instant parsedTimestamp; + private Direction direction; public LogData() { this(null); @@ -61,6 +64,14 @@ public void setParsedTimestamp(Instant localDateTime) { this.parsedTimestamp = localDateTime; } + public Direction getDirection() { + return direction; + } + + public void setDirection(Direction direction) { + this.direction = direction; + } + private void initIfNeeded() { if (body == null) { body = new ArrayList<>(); diff --git a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java index 85b870f..d0f2f1c 100644 --- a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java +++ b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java @@ -25,12 +25,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.readlog.cfg.AliasConfiguration; import com.opencsv.CSVWriter; @@ -65,17 +67,22 @@ public LogData parse(StreamId streamId, String raw) { throw new IllegalArgumentException("Unknown alias '" + sessionAlias +"'. No configuration found" ); } - LogData resultData = new LogData(); - - Pattern directionPattern = Objects.requireNonNull(configuration.getDirectionToPattern().get(streamId.getDirection()), - () -> "Pattern for direction " + streamId.getDirection() + " and session " + sessionAlias); - Matcher matcher = directionPattern.matcher(raw); - // check if the a string matches the direction from streamId - // skip line if it is not ours direction - if (!matcher.find()) { - return resultData; + Direction direction = null; + for (Entry entry : configuration.getDirectionToPattern().entrySet()) { + if (entry.getValue().matcher(raw).find()) { + direction = entry.getKey(); + break; + } + } + // check whether the line matches any direction regex + // if not it is not our line + if (direction == null) { + return LogData.EMPTY; } + LogData resultData = new LogData(); + resultData.setDirection(direction); + List regexGroups = configuration.getGroups(); if (configuration.isJoinGroups()) { parseBodyJoined(raw, configuration, resultData); diff --git a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java index 04c997b..a83537a 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java @@ -76,10 +76,8 @@ private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration confi configuration.getLogDirectory(), (Path path) -> configuration.getAliases().entrySet().stream() .filter(entry -> entry.getValue().getPathFilter().matcher(path.getFileName().toString()).matches()) - .flatMap(entry -> entry.getValue().getDirectionToPattern() - .keySet().stream() - .map(direction -> new StreamId(entry.getKey(), direction)) - ).collect(Collectors.toSet()), + .map(entry -> new StreamId(entry.getKey())) + .collect(Collectors.toSet()), files -> files.sort(pathComparator), path -> true ); diff --git a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java index 60c655b..494a10e 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java @@ -37,12 +37,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + public class RegexpContentParser extends LineParser { private static final Logger LOGGER = LoggerFactory.getLogger(RegexpContentParser.class); private final RegexLogParser parser; public RegexpContentParser(RegexLogParser parser) { - this.parser = Objects.requireNonNull(parser, "'Parser' parameter"); + this.parser = requireNonNull(parser, "'Parser' parameter"); } @Nonnull @@ -59,6 +61,8 @@ protected List lineToMessages(@Nonnull StreamId streamId, @N } private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) { + builder.getIdBuilder().setDirection(requireNonNull(logData.getDirection(), + "direction is not set")); if (logData.getParsedTimestamp() != null) { builder.getIdBuilder().setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp())); } diff --git a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt index 7b13472..2b31972 100644 --- a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt +++ b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt @@ -19,6 +19,7 @@ package com.exactpro.th2.readlog.impl import com.exactpro.cradle.BookId import com.exactpro.cradle.CradleStorage import com.exactpro.cradle.Order +import com.exactpro.cradle.messages.GroupedMessageFilter import com.exactpro.cradle.messages.MessageFilter import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.common.grpc.RawMessage @@ -30,6 +31,7 @@ import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState import com.google.protobuf.ByteString import com.google.protobuf.UnsafeByteOperations import java.time.Instant +import java.time.temporal.ChronoUnit class CradleReaderState private constructor( private val cradleStorage: CradleStorage, @@ -40,16 +42,15 @@ class CradleReaderState private constructor( : this(cradleStorage, InMemoryReaderState(), bookSupplier) override fun get(streamId: StreamId): StreamData? { - return delegate[streamId] ?: cradleStorage.getMessages( - MessageFilter.builder() - .sessionAlias(streamId.sessionAlias) - .direction(streamId.direction.toCradleDirection()) + return delegate[streamId] ?: cradleStorage.getGroupedMessageBatches( + GroupedMessageFilter.builder() + .groupName(streamId.sessionAlias) .bookId(BookId(bookSupplier(streamId))) .timestampTo().isLessThanOrEqualTo(Instant.now()) .limit(1) .order(Order.REVERSE) .build() - ).asSequence().firstOrNull()?.run { + ).asSequence().firstOrNull()?.lastMessage?.run { StreamData( timestamp, sequence, diff --git a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java index 652dda3..4bcb646 100644 --- a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java +++ b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java @@ -32,8 +32,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -46,7 +46,8 @@ public class TestLogParser { static final String TEST_MESSAGE_ALIAS = "tma"; static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT = "tma_wrong_format"; static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN = "tma_wrong_pattern"; - static final String RAW_LOG = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; + static final String RAW_LOG_IN = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; + static final String RAW_LOG_OUT = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - outgoing fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; @ParameterizedTest(name = "Has message: {1}, Skips before: {0}") @MethodSource("skipMessageParams") @@ -65,7 +66,10 @@ void skipMessageByTimestamp(Instant skipBefore, boolean hasMessages) { "test", cfg )); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "2021-03-23 13:21:37.991337479 some data"); + LogData data = parser.parse(new StreamId("test"), "2021-03-23 13:21:37.991337479 some data"); + if (hasMessages) { + assertEquals(Direction.FIRST, data.getDirection(), "unexpected direction"); + } assertEquals(hasMessages, !data.getBody().isEmpty(), () -> "unexpected data " + data.getBody()); if (hasMessages) { assertEquals(List.of("2021-03-23 13:21:37.991337479 some data"), data.getBody()); @@ -79,23 +83,18 @@ static List skipMessageParams() { ); } - @Test - void parser() { + @ParameterizedTest + @EnumSource(value = Direction.class, mode = EnumSource.Mode.EXCLUDE, names = "UNRECOGNIZED") + void parser(Direction direction) { RegexLogParser logParser = new RegexLogParser(getConfiguration()); - LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.FIRST), RAW_LOG); + LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS), direction == Direction.FIRST ? RAW_LOG_IN : RAW_LOG_OUT); + assertEquals(direction, data.getDirection(), "unexpected direction"); assertEquals(1, data.getBody().size()); assertEquals("NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}", data.getBody().get(0)); assertEquals("2021-03-23 13:21:37.991337479", data.getRawTimestamp()); assertEquals(Instant.parse("2021-03-23T13:21:37.991337479Z"), data.getParsedTimestamp()); } - @Test - void skipIfDirectionPatternDoesNotMatch() { - RegexLogParser logParser = new RegexLogParser(getConfiguration()); - LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.SECOND), RAW_LOG); - assertTrue(data.getBody().isEmpty(), () -> "Unexpected data read: " + data.getBody()); - } - @Test void parserErrors() { RegexLogParser logParser = new RegexLogParser(getConfiguration()); @@ -103,21 +102,21 @@ void parserErrors() { assertAll( () -> { var ex = assertThrows(IllegalStateException.class, - () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT, Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("The timestamp '2021-03-23 13:21:37.991337479' cannot be parsed"), () -> "Actual error: " + ex.getMessage()); }, () -> { var ex = assertThrows(IllegalStateException.class, - () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN, Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("The pattern '3012.*' cannot extract the timestamp from the string"), () -> "Actual error: " + ex.getMessage()); }, () -> { var ex = assertThrows(IllegalArgumentException.class, - () -> logParser.parse(new StreamId("wrong_alias", Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId("wrong_alias"), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("Unknown alias 'wrong_alias'. No configuration found"), () -> "Actual error: " + ex.getMessage()); diff --git a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java index ddac41d..abcc08f 100644 --- a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java +++ b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java @@ -43,7 +43,7 @@ void joinsIfOneMatchFound() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "this a test string, 123 and no more"); + LogData data = parser.parse(new StreamId("test"), "this a test string, 123 and no more"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -67,7 +67,7 @@ void doesNotTryToSubstituteInResultString() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "[test] should not try to process ${3} and ${variable}"); + LogData data = parser.parse(new StreamId("test"), "[test] should not try to process ${3} and ${variable}"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -92,7 +92,7 @@ void joinsIfManyMatchesFound() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -119,7 +119,7 @@ void usesCorrectDelimiter() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42"); + LogData data = parser.parse(new StreamId("test"), "A, 42"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -145,7 +145,7 @@ void usesConstantsFromMap() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -171,7 +171,7 @@ void correctlyAcceptsGroupsByName() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( From 70c22d2f709deb643772dc44fa59cd364ee8aa94 Mon Sep 17 00:00:00 2001 From: Oleg Smelov <45400511+lumber1000@users.noreply.github.com> Date: Tue, 21 Nov 2023 10:48:26 +0400 Subject: [PATCH 2/2] [TH2-4954] th2 transport protocol support (#48) * th2 transport protocol support * alias group equals to session alias * dependencies * [th2-4954] Updated dependencies * dependencies, dev release workflow, README.md --------- Co-authored-by: Oleg Smelov Co-authored-by: nikita.smirnov --- ...ease-java-publish-sonatype-and-docker.yml} | 13 +- README.md | 17 +- build.gradle | 238 ++---------------- gradle.properties | 6 +- .../com/exactpro/th2/readlog/LogData.java | 9 +- .../java/com/exactpro/th2/readlog/Main.java | 93 +++++-- .../exactpro/th2/readlog/RegexLogParser.java | 12 +- .../readlog/cfg/LogReaderConfiguration.java | 15 +- .../th2/readlog/impl/LogFileReader.java | 51 +++- ...ser.java => ProtoRegexpContentParser.java} | 25 +- .../impl/TransportRegexpContentParser.java | 67 +++++ ...eamData.java => ProtoForOnStreamData.java} | 12 +- .../lambdas/TransportForOnStreamData.java | 27 ++ .../th2/readlog/impl/CradleReaderState.kt | 29 ++- .../exactpro/th2/readlog/TestLogParser.java | 10 +- 15 files changed, 309 insertions(+), 315 deletions(-) rename .github/workflows/{dev-release-docker-publish.yml => dev-release-java-publish-sonatype-and-docker.yml} (73%) rename src/main/java/com/exactpro/th2/readlog/impl/{RegexpContentParser.java => ProtoRegexpContentParser.java} (83%) create mode 100644 src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java rename src/main/java/com/exactpro/th2/readlog/impl/lambdas/{ForOnStreamData.java => ProtoForOnStreamData.java} (70%) create mode 100644 src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java diff --git a/.github/workflows/dev-release-docker-publish.yml b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml similarity index 73% rename from .github/workflows/dev-release-docker-publish.yml rename to .github/workflows/dev-release-java-publish-sonatype-and-docker.yml index 1f878bf..d943c63 100644 --- a/.github/workflows/dev-release-docker-publish.yml +++ b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml @@ -1,18 +1,17 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io +name: Dev release build and publish - Docker and Sonatype on: - workflow_dispatch: push: - branches: - - dev-version-* - paths: - - gradle.properties + tags: + - \d+.\d+.\d+-dev jobs: - build-job: + build: uses: th2-net/.github/.github/workflows/compound-java.yml@main with: build-target: 'Docker,Sonatype' + runsOn: ubuntu-latest + gradleVersion: '7' docker-username: ${{ github.actor }} devRelease: true secrets: diff --git a/README.md b/README.md index b73bd93..e35c6bf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Log Reader User Manual 4.0.1 +# Log Reader User Manual 4.1.0 ## Document Information @@ -28,6 +28,7 @@ spec: custom-config: logDirectory: "log/dir" syncWithCradle: true + useTransport: true aliases: A: regexp: ".*" @@ -91,6 +92,7 @@ spec: + logDirectory - the directory to watch files + syncWithCradle - enables synchronization with Cradle for timestamps and sequences that correspond to the alias ++ useTransport - enables using th2 transport protocol (default value: `false`) + aliases - the mapping between alias and files that correspond to that alias + pathFilter - filter for files that correspond to that alias + regexp - the regular expression to extract data from the source lines @@ -192,10 +194,11 @@ logger..level= You can use this class to see how the log line is parsed by the read-log. Use TRACE level to get the information. -**com.exactpro.th2.readlog.impl.RegexpContentParser** +**com.exactpro.th2.readlog.impl.ProtoRegexpContentParser** +**com.exactpro.th2.readlog.impl.TransportRegexpContentParser** -You can use this class to see the resulted lines produced by **_RegexLogParser_**. -Use TRACE level to get the information. +You can use these classes to see the resulted lines produced by **_RegexLogParser_**. +Produce Proto or Transport messages, respectively. Use TRACE level to get the information. **com.exactpro.th2.read.file.common.AbstractFileReader** @@ -224,6 +227,10 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u ## Changes +### 4.1.0 + ++ Added support for th2 transport protocol + ### 4.0.1 + Added dev-release GitHub workflow @@ -296,4 +303,4 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u ### 3.0.0 -+ Migrate to a common read-core ++ Migrate to a common read-core \ No newline at end of file diff --git a/build.gradle b/build.gradle index 214e4ed..fccc4b1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,225 +1,42 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { - id 'java' - id 'java-library' - id 'application' - id 'maven-publish' - id 'signing' - id 'com.palantir.docker' version '0.25.0' - id 'org.jetbrains.kotlin.jvm' version '1.6.21' - id "org.owasp.dependencycheck" version "8.1.2" - id "io.github.gradle-nexus.publish-plugin" version "1.0.0" - -} - -group 'com.exactpro.th2' -version release_version - -repositories { - mavenCentral() - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - } - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' +buildscript { + repositories { + gradlePluginPortal() + maven { + url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + } } - mavenLocal() - - configurations.all { - resolutionStrategy.cacheChangingModulesFor 0, 'seconds' - resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' + dependencies { + classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915968839-41381e5-SNAPSHOT" } } +apply plugin: "com.exactpro.th2.common-conventions" +apply plugin: "com.exactpro.th2.docker-conventions" +apply plugin: 'kotlin-kapt' + dependencies { - api platform('com.exactpro.th2:bom:4.2.0') - api 'com.exactpro.th2:common:5.2.0' - api 'com.exactpro.th2:read-file-common-core:2.0.0-th2-4951-5256417699-8614fb0-SNAPSHOT' + api platform("com.exactpro.th2:bom:4.5.0") + implementation "com.exactpro.th2:common:5.4.2-dev" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" + implementation "com.exactpro.th2:read-file-common-core:3.0.0-dev" - implementation('com.opencsv:opencsv:5.7.0') { + implementation ("com.opencsv:opencsv:5.8") { because("we need to write a correct CSV in case of free pattern is used") } - implementation 'javax.annotation:javax.annotation-api:1.3.2' + implementation "javax.annotation:javax.annotation-api:1.3.2" implementation "org.jetbrains.kotlin:kotlin-stdlib" - implementation "org.slf4j:slf4j-api" - - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' - implementation 'com.fasterxml.jackson.module:jackson-module-kotlin' - - testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - testImplementation 'org.mockito:mockito-core:3.6.0' -} - -test { - useJUnitPlatform() - testLogging.showStandardStreams = true -} - -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) - } -} - -description = 'DataReaderClient' -sourceCompatibility = JavaVersion.VERSION_11 -applicationName = 'service' - -tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile) { - kotlinOptions { - jvmTarget = "11" - } -} - -distTar { - archiveFileName.set("${applicationName}.tar") -} - -dockerPrepare { - dependsOn distTar -} - -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) -} - -application { - mainClass.set("com.exactpro.th2.readlog.Main") -} - -dependencyCheck { - formats=['SARIF'] -} - -dependencyLocking { - lockAllConfigurations() -} - - -java { - withJavadocJar() - withSourcesJar() -} - -// conditionals for publications -tasks.withType(PublishToMavenRepository) { - onlyIf { - (repository == publishing.repositories.nexusRepository && - project.hasProperty('nexus_user') && - project.hasProperty('nexus_password') && - project.hasProperty('nexus_url')) || - (repository == publishing.repositories.sonatype && - project.hasProperty('sonatypeUsername') && - project.hasProperty('sonatypePassword')) - } -} -tasks.withType(Sign) { - onlyIf { project.hasProperty('signingKey') && - project.hasProperty('signingPassword') - } -} -// disable running task 'initializeSonatypeStagingRepository' on a gitlab -tasks.whenTaskAdded {task -> - if(task.name.equals('initializeSonatypeStagingRepository') && - !(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword')) - ) { - task.enabled = false - } -} - -publishing { - publications { - mavenJava(MavenPublication) { - from(components.java) - pom { - name = rootProject.name - packaging = 'jar' - description = rootProject.description - url = vcs_url - scm { - url = vcs_url - } - licenses { - license { - name = 'The Apache License, Version 2.0' - url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' - } - } - developers { - developer { - id = 'developer' - name = 'developer' - email = 'developer@exactpro.com' - } - } - scm { - url = vcs_url - } - } - } - } - repositories { -//Nexus repo to publish from gitlab - maven { - name = 'nexusRepository' - credentials { - username = project.findProperty('nexus_user') - password = project.findProperty('nexus_password') - } - url = project.findProperty('nexus_url') - } - } -} - -nexusPublishing { - repositories { - sonatype { - nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/")) - snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/")) - } - } -} - - -signing { - def signingKey = findProperty("signingKey") - def signingPassword = findProperty("signingPassword") - useInMemoryPgpKeys(signingKey, signingPassword) - sign publishing.publications.mavenJava + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" + implementation "com.fasterxml.jackson.module:jackson-module-kotlin" } test { useJUnitPlatform { excludeTags('integration-test') } + testLogging.showStandardStreams = true } tasks.register('integrationTest', Test) { @@ -229,13 +46,6 @@ tasks.register('integrationTest', Test) { } } -dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 - - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false - } +dependencyLocking { + lockAllConfigurations() } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 524ce60..e874dbd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,4 @@ -release_version=4.0.1 -vcs_url=https://github.com/th2-net/th2-read-log \ No newline at end of file +release_version=4.1.0 +vcs_url=https://github.com/th2-net/th2-read-log +description=DataReaderClient +app_main_class=com.exactpro.th2.readlog.Main \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/LogData.java b/src/main/java/com/exactpro/th2/readlog/LogData.java index 137ae37..8a1eae6 100644 --- a/src/main/java/com/exactpro/th2/readlog/LogData.java +++ b/src/main/java/com/exactpro/th2/readlog/LogData.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog; -import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction; import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -77,4 +76,4 @@ private void initIfNeeded() { body = new ArrayList<>(); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/Main.java b/src/main/java/com/exactpro/th2/readlog/Main.java index ba95d7c..1b58ff9 100644 --- a/src/main/java/com/exactpro/th2/readlog/Main.java +++ b/src/main/java/com/exactpro/th2/readlog/Main.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,13 @@ import com.exactpro.th2.common.event.EventUtils; import com.exactpro.th2.common.grpc.EventBatch; import com.exactpro.th2.common.grpc.EventID; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageBatch; import com.exactpro.th2.common.metrics.CommonMetrics; import com.exactpro.th2.common.schema.factory.CommonFactory; import com.exactpro.th2.common.schema.message.MessageRouter; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; import com.exactpro.th2.read.file.common.AbstractFileReader; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState; @@ -41,6 +43,7 @@ import java.io.LineNumberReader; import java.nio.file.Path; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.concurrent.Executors; @@ -65,7 +68,6 @@ public static void main(String[] args) { var boxBookName = commonFactory.getBoxConfiguration().getBookName(); toDispose.add(commonFactory); - MessageRouter rawMessageBatchRouter = commonFactory.getMessageRouterRawBatch(); MessageRouter eventBatchRouter = commonFactory.getEventBatchRouter(); LogReaderConfiguration configuration = commonFactory.getCustomConfiguration(LogReaderConfiguration.class, LogReaderConfiguration.MAPPER); @@ -91,20 +93,41 @@ public static void main(String[] args) { CommonMetrics.READINESS_MONITOR.enable(); - AbstractFileReader reader - = LogFileReader.getLogFileReader( - configuration, - configuration.isSyncWithCradle() - ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), - streamId -> commonFactory.newMessageIDBuilder().getBookName()) - : new InMemoryReaderState(), - streamId -> commonFactory.newMessageIDBuilder().build(), - (streamId, builders) -> publishMessages(rawMessageBatchRouter, streamId, builders), - (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), - (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) - ); - - toDispose.add(reader); + final Runnable processUpdates; + + if (configuration.isUseTransport()) { + AbstractFileReader reader + = LogFileReader.getTransportLogFileReader( + configuration, + configuration.isSyncWithCradle() + ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), + streamId -> commonFactory.newMessageIDBuilder().getBookName(), + CradleReaderState.WRAP_TRANSPORT) + : new InMemoryReaderState(), + streamId -> MessageId.builder(), + (streamId, builders) -> publishTransportMessages(commonFactory.getTransportGroupBatchRouter(), streamId, builders, boxBookName), + (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), + (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) + ); + + processUpdates = reader::processUpdates; + toDispose.add(reader); + } else { + AbstractFileReader reader + = LogFileReader.getProtoLogFileReader( + configuration, + configuration.isSyncWithCradle() + ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), streamId -> boxBookName, CradleReaderState.WRAP_PROTO) + : new InMemoryReaderState(), + streamId -> commonFactory.newMessageIDBuilder().build(), + (streamId, builders) -> publishProtoMessages(commonFactory.getMessageRouterRawBatch(), streamId, builders), + (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), + (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) + ); + + processUpdates = reader::processUpdates; + toDispose.add(reader); + } ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); toDispose.add(() -> { @@ -115,8 +138,7 @@ public static void main(String[] args) { } }); - - ScheduledFuture future = executorService.scheduleWithFixedDelay(reader::processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executorService.scheduleWithFixedDelay(processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS); awaitShutdown(lock, condition); future.cancel(true); } catch (IOException | InterruptedException e) { @@ -157,19 +179,39 @@ private static Unit publishError(MessageRouter eventBatchRouter, Str } @NotNull - private static Unit publishMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders) { + private static Unit publishProtoMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders) { try { - RawMessageBatch.Builder builder = RawMessageBatch.newBuilder(); - for (RawMessage.Builder msg : builders) { + com.exactpro.th2.common.grpc.RawMessageBatch.Builder builder = com.exactpro.th2.common.grpc.RawMessageBatch.newBuilder(); + for (com.exactpro.th2.common.grpc.RawMessage.Builder msg : builders) { builder.addMessages(msg); } - rawMessageBatchRouter.sendAll(builder.build()); + rawMessageBatchRouter.sendAll(builder.build(), "raw"); } catch (Exception e) { LOGGER.error("Cannot publish batch for {}", streamId, e); } return Unit.INSTANCE; } + @NotNull + private static Unit publishTransportMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders, String bookName) { + try { + // messages are grouped by session aliases + String sessionGroup = builders.get(0).idBuilder().getSessionAlias(); + + List groups = new ArrayList<>(builders.size()); + for (RawMessage.Builder msgBuilder : builders) { + groups.add(new MessageGroup(List.of(msgBuilder.build()))); + } + + var batch = new GroupBatch(bookName, sessionGroup, groups); + rawMessageBatchRouter.sendAll(batch, "transport-group"); + } catch (Exception e) { + LOGGER.error("Cannot publish batch for {}", streamId, e); + } + + return Unit.INSTANCE; + } + private static void configureShutdownHook(Deque resources, ReentrantLock lock, Condition condition) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOGGER.info("Shutdown start"); @@ -203,5 +245,4 @@ private static void awaitShutdown(ReentrantLock lock, Condition condition) throw lock.unlock(); } } - -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java index d0f2f1c..e0133aa 100644 --- a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java +++ b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog; import java.io.StringWriter; @@ -35,6 +36,7 @@ import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.readlog.cfg.AliasConfiguration; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.opencsv.CSVWriter; import com.opencsv.ICSVWriter; import org.apache.commons.text.StringSubstitutor; @@ -81,7 +83,7 @@ public LogData parse(StreamId streamId, String raw) { } LogData resultData = new LogData(); - resultData.setDirection(direction); + resultData.setDirection(TransportUtilsKt.getTransport(direction)); List regexGroups = configuration.getGroups(); if (configuration.isJoinGroups()) { @@ -91,7 +93,7 @@ public LogData parse(StreamId streamId, String raw) { } if (resultData.getBody().isEmpty()) { - // fast way, nothing matches the regexp so we don't need to check for date pattern + // fast way, nothing matches the regexp, so we don't need to check for date pattern return resultData; } @@ -187,7 +189,7 @@ private void parseBodyJoined(String raw, AliasConfiguration configuration, LogDa private void addJoined(LogData data, List> values, char delimiter) { var writer = new StringWriter(); - ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is not IO + ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is no IO for (List value : values) { csvPrinter.writeNext(value.toArray(String[]::new)); } @@ -225,4 +227,4 @@ private Integer tryParse(String value) { return null; } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java index 7ddf5be..6f0b5e7 100644 --- a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java +++ b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,9 @@ public class LogReaderConfiguration { @JsonPropertyDescription("Enables synchronization information about last timestamp and sequence for stream with Cradle") private boolean syncWithCradle = true; + @JsonPropertyDescription("Enables using th2 transport protocol") + private boolean useTransport = false; + @JsonCreator public LogReaderConfiguration(@JsonProperty("logDirectory") Path logDirectory) { this.logDirectory = Objects.requireNonNull(logDirectory, "'Log directory' parameter"); @@ -92,4 +95,12 @@ public boolean isSyncWithCradle() { public void setSyncWithCradle(boolean syncWithCradle) { this.syncWithCradle = syncWithCradle; } -} + + public void setUseTransport(boolean useTransport) { + this.useTransport = useTransport; + } + + public boolean isUseTransport() { + return useTransport; + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java index a83537a..4f4a7bb 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java @@ -1,5 +1,5 @@ -/******************************************************************************* - * Copyright 2022 Exactpro (Exactpro Systems Limited) +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,24 +12,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + */ package com.exactpro.th2.readlog.impl; -import com.exactpro.th2.common.grpc.MessageID; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; import com.exactpro.th2.read.file.common.AbstractFileReader; import com.exactpro.th2.read.file.common.DirectoryChecker; import com.exactpro.th2.read.file.common.FileSourceWrapper; import com.exactpro.th2.read.file.common.MovedFileTracker; import com.exactpro.th2.read.file.common.StreamId; -import com.exactpro.th2.read.file.common.impl.DefaultFileReader.Builder; +import com.exactpro.th2.read.file.common.impl.ProtoDefaultFileReader; import com.exactpro.th2.read.file.common.impl.RecoverableBufferedReaderWrapper; +import com.exactpro.th2.read.file.common.impl.TransportDefaultFileReader; import com.exactpro.th2.read.file.common.state.ReaderState; import com.exactpro.th2.readlog.RegexLogParser; import com.exactpro.th2.readlog.cfg.LogReaderConfiguration; import com.exactpro.th2.readlog.impl.lambdas.ForOnError; import com.exactpro.th2.readlog.impl.lambdas.ForOnSourceCorrupted; -import com.exactpro.th2.readlog.impl.lambdas.ForOnStreamData; +import com.exactpro.th2.readlog.impl.lambdas.ProtoForOnStreamData; +import com.exactpro.th2.readlog.impl.lambdas.TransportForOnStreamData; import org.apache.commons.lang3.exception.ExceptionUtils; import java.io.IOException; @@ -44,18 +47,18 @@ public class LogFileReader { - public static AbstractFileReader getLogFileReader( + public static AbstractFileReader getProtoLogFileReader( LogReaderConfiguration configuration, ReaderState readerState, - Function initialMessageId, - ForOnStreamData forStream, + Function initialMessageId, + ProtoForOnStreamData forStream, ForOnError forError, ForOnSourceCorrupted forCorrupted ){ - return new Builder<>( + return new ProtoDefaultFileReader.Builder<>( configuration.getCommon(), getDirectoryChecker(configuration), - new RegexpContentParser(new RegexLogParser(configuration.getAliases())), + new ProtoRegexpContentParser(new RegexLogParser(configuration.getAliases())), new MovedFileTracker(configuration.getLogDirectory()), readerState, initialMessageId::apply, @@ -67,7 +70,31 @@ public static AbstractFileReader getLogFileReader( .onError(forError::action) .onSourceCorrupted(forCorrupted::action) .build(); + } + public static AbstractFileReader getTransportLogFileReader( + LogReaderConfiguration configuration, + ReaderState readerState, + Function initialMessageId, + TransportForOnStreamData forStream, + ForOnError forError, + ForOnSourceCorrupted forCorrupted + ){ + return new TransportDefaultFileReader.Builder<>( + configuration.getCommon(), + getDirectoryChecker(configuration), + new TransportRegexpContentParser(new RegexLogParser(configuration.getAliases())), + new MovedFileTracker(configuration.getLogDirectory()), + readerState, + initialMessageId::apply, + LogFileReader::createSource + ) + .readFileImmediately() + .acceptNewerFiles() + .onStreamData(forStream::action) + .onError(forError::action) + .onSourceCorrupted(forCorrupted::action) + .build(); } private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration configuration) { @@ -90,4 +117,4 @@ private static FileSourceWrapper createSource(StreamId streamI return ExceptionUtils.rethrow(e); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java similarity index 83% rename from src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java rename to src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java index 494a10e..1371093 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,25 +13,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog.impl; import com.exactpro.th2.common.grpc.RawMessageMetadata; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.exactpro.th2.common.message.MessageUtils; +import com.exactpro.th2.read.file.common.impl.LineParser; import com.exactpro.th2.readlog.LogData; import com.google.protobuf.ByteString; import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.List; -import java.util.Objects; - import java.util.stream.Collectors; import javax.annotation.Nonnull; import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.read.file.common.StreamId; -import com.exactpro.th2.read.file.common.impl.LineParser; import com.exactpro.th2.readlog.RegexLogParser; import org.slf4j.Logger; @@ -39,11 +36,12 @@ import static java.util.Objects.requireNonNull; -public class RegexpContentParser extends LineParser { - private static final Logger LOGGER = LoggerFactory.getLogger(RegexpContentParser.class); +public class ProtoRegexpContentParser extends LineParser { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtoRegexpContentParser.class); private final RegexLogParser parser; - public RegexpContentParser(RegexLogParser parser) { + public ProtoRegexpContentParser(RegexLogParser parser) { + super(LineParser.PROTO); this.parser = requireNonNull(parser, "'Parser' parameter"); } @@ -61,8 +59,8 @@ protected List lineToMessages(@Nonnull StreamId streamId, @N } private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) { - builder.getIdBuilder().setDirection(requireNonNull(logData.getDirection(), - "direction is not set")); + builder.getIdBuilder().setDirection(TransportUtilsKt.getProto(requireNonNull(logData.getDirection(), + "direction is not set"))); if (logData.getParsedTimestamp() != null) { builder.getIdBuilder().setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp())); } @@ -70,5 +68,4 @@ private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) builder.putProperties("logTimestamp", logData.getRawTimestamp()); } } - -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java new file mode 100644 index 0000000..d183ab4 --- /dev/null +++ b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readlog.impl; + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; +import com.exactpro.th2.read.file.common.StreamId; +import com.exactpro.th2.read.file.common.impl.LineParser; +import com.exactpro.th2.readlog.LogData; +import com.exactpro.th2.readlog.RegexLogParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +public class TransportRegexpContentParser extends LineParser { + private static final Logger LOGGER = LoggerFactory.getLogger(TransportRegexpContentParser.class); + private final RegexLogParser parser; + + public TransportRegexpContentParser(RegexLogParser parser) { + super(LineParser.TRANSPORT); + this.parser = requireNonNull(parser, "'Parser' parameter"); + } + + @Nonnull + @Override + protected List lineToMessages(@Nonnull StreamId streamId, @Nonnull String readLine) { + LogData logData = parser.parse(streamId, readLine); + LOGGER.trace("{} line(s) extracted from {}: {}", logData.getBody().size(), readLine, logData.getBody()); + return logData.getBody().stream().map(it -> { + RawMessage.Builder builder = RawMessage.builder(); + setupMetadata(builder, logData); + builder.setBody(it.getBytes(StandardCharsets.UTF_8)); + return builder; + }).collect(Collectors.toList()); + } + + private void setupMetadata(RawMessage.Builder builder, LogData logData) { + builder.idBuilder().setDirection(requireNonNull(logData.getDirection(), "direction is not set")); + + if (logData.getParsedTimestamp() != null) { + builder.idBuilder().setTimestamp(logData.getParsedTimestamp()); + } + + if (logData.getRawTimestamp() != null) { + builder.addMetadataProperty("logTimestamp", logData.getRawTimestamp()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java similarity index 70% rename from src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java rename to src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java index c35a662..b36fe48 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java @@ -1,5 +1,5 @@ -/******************************************************************************* - * Copyright 2022 Exactpro (Exactpro Systems Limited) +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + */ package com.exactpro.th2.readlog.impl.lambdas; @@ -22,6 +22,6 @@ import java.util.List; -public interface ForOnStreamData { - Unit action(StreamId id, List builder); -} +public interface ProtoForOnStreamData { + Unit action(StreamId id, List builder); +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java new file mode 100644 index 0000000..40fb3c5 --- /dev/null +++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readlog.impl.lambdas; + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; +import com.exactpro.th2.read.file.common.StreamId; +import kotlin.Unit; + +import java.util.List; + +public interface TransportForOnStreamData { + Unit action(StreamId id, List builder); +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt index 2b31972..9423696 100644 --- a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt +++ b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022. Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023. Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,27 +20,24 @@ import com.exactpro.cradle.BookId import com.exactpro.cradle.CradleStorage import com.exactpro.cradle.Order import com.exactpro.cradle.messages.GroupedMessageFilter -import com.exactpro.cradle.messages.MessageFilter -import com.exactpro.cradle.messages.StoredMessageId -import com.exactpro.th2.common.grpc.RawMessage -import com.exactpro.th2.common.util.toCradleDirection import com.exactpro.th2.read.file.common.StreamId import com.exactpro.th2.read.file.common.state.ReaderState +import com.exactpro.th2.read.file.common.state.Content +import com.exactpro.th2.read.file.common.state.ProtoContent +import com.exactpro.th2.read.file.common.state.TransportContent import com.exactpro.th2.read.file.common.state.StreamData import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState import com.google.protobuf.ByteString import com.google.protobuf.UnsafeByteOperations +import io.netty.buffer.Unpooled import java.time.Instant -import java.time.temporal.ChronoUnit -class CradleReaderState private constructor( +class CradleReaderState @JvmOverloads constructor( private val cradleStorage: CradleStorage, - private val delegate: ReaderState, + private val delegate: ReaderState = InMemoryReaderState(), private val bookSupplier: (StreamId) -> String, + private val wrapContent: (ByteArray) -> Content ): ReaderState by delegate { - constructor(cradleStorage: CradleStorage, bookSupplier: (StreamId) -> String) - : this(cradleStorage, InMemoryReaderState(), bookSupplier) - override fun get(streamId: StreamId): StreamData? { return delegate[streamId] ?: cradleStorage.getGroupedMessageBatches( GroupedMessageFilter.builder() @@ -54,8 +51,16 @@ class CradleReaderState private constructor( StreamData( timestamp, sequence, - content?.let(UnsafeByteOperations::unsafeWrap) ?: ByteString.EMPTY, + wrapContent(content) ) } } + + companion object { + @JvmField + val WRAP_PROTO: (ByteArray?) -> Content = { ProtoContent(it?.let(UnsafeByteOperations::unsafeWrap) ?: ByteString.EMPTY) } + + @JvmField + val WRAP_TRANSPORT: (ByteArray?) -> Content = { TransportContent(it?.let(Unpooled::wrappedBuffer) ?: Unpooled.EMPTY_BUFFER) } + } } \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java index 4bcb646..9f9485a 100644 --- a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java +++ b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package com.exactpro.th2.readlog; import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.readlog.cfg.AliasConfiguration; @@ -38,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; public class TestLogParser { @@ -68,7 +68,7 @@ void skipMessageByTimestamp(Instant skipBefore, boolean hasMessages) { LogData data = parser.parse(new StreamId("test"), "2021-03-23 13:21:37.991337479 some data"); if (hasMessages) { - assertEquals(Direction.FIRST, data.getDirection(), "unexpected direction"); + assertEquals(Direction.FIRST, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction"); } assertEquals(hasMessages, !data.getBody().isEmpty(), () -> "unexpected data " + data.getBody()); if (hasMessages) { @@ -88,7 +88,7 @@ static List skipMessageParams() { void parser(Direction direction) { RegexLogParser logParser = new RegexLogParser(getConfiguration()); LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS), direction == Direction.FIRST ? RAW_LOG_IN : RAW_LOG_OUT); - assertEquals(direction, data.getDirection(), "unexpected direction"); + assertEquals(direction, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction"); assertEquals(1, data.getBody().size()); assertEquals("NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}", data.getBody().get(0)); assertEquals("2021-03-23 13:21:37.991337479", data.getRawTimestamp()); @@ -144,4 +144,4 @@ private Map getConfiguration() { return result; } -} +} \ No newline at end of file