diff --git a/.github/workflows/dev-release-docker-publish.yml b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml similarity index 73% rename from .github/workflows/dev-release-docker-publish.yml rename to .github/workflows/dev-release-java-publish-sonatype-and-docker.yml index 1f878bf..d943c63 100644 --- a/.github/workflows/dev-release-docker-publish.yml +++ b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml @@ -1,18 +1,17 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io +name: Dev release build and publish - Docker and Sonatype on: - workflow_dispatch: push: - branches: - - dev-version-* - paths: - - gradle.properties + tags: + - \d+.\d+.\d+-dev jobs: - build-job: + build: uses: th2-net/.github/.github/workflows/compound-java.yml@main with: build-target: 'Docker,Sonatype' + runsOn: ubuntu-latest + gradleVersion: '7' docker-username: ${{ github.actor }} devRelease: true secrets: diff --git a/README.md b/README.md index b73bd93..e35c6bf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Log Reader User Manual 4.0.1 +# Log Reader User Manual 4.1.0 ## Document Information @@ -28,6 +28,7 @@ spec: custom-config: logDirectory: "log/dir" syncWithCradle: true + useTransport: true aliases: A: regexp: ".*" @@ -91,6 +92,7 @@ spec: + logDirectory - the directory to watch files + syncWithCradle - enables synchronization with Cradle for timestamps and sequences that correspond to the alias ++ useTransport - enables using th2 transport protocol (default value: `false`) + aliases - the mapping between alias and files that correspond to that alias + pathFilter - filter for files that correspond to that alias + regexp - the regular expression to extract data from the source lines @@ -192,10 +194,11 @@ logger..level= You can use this class to see how the log line is parsed by the read-log. Use TRACE level to get the information. -**com.exactpro.th2.readlog.impl.RegexpContentParser** +**com.exactpro.th2.readlog.impl.ProtoRegexpContentParser** +**com.exactpro.th2.readlog.impl.TransportRegexpContentParser** -You can use this class to see the resulted lines produced by **_RegexLogParser_**. -Use TRACE level to get the information. +You can use these classes to see the resulted lines produced by **_RegexLogParser_**. +Produce Proto or Transport messages, respectively. Use TRACE level to get the information. **com.exactpro.th2.read.file.common.AbstractFileReader** @@ -224,6 +227,10 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u ## Changes +### 4.1.0 + ++ Added support for th2 transport protocol + ### 4.0.1 + Added dev-release GitHub workflow @@ -296,4 +303,4 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u ### 3.0.0 -+ Migrate to a common read-core ++ Migrate to a common read-core \ No newline at end of file diff --git a/build.gradle b/build.gradle index 2a0061e..fccc4b1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,225 +1,42 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { - id 'java' - id 'java-library' - id 'application' - id 'maven-publish' - id 'signing' - id 'com.palantir.docker' version '0.25.0' - id 'org.jetbrains.kotlin.jvm' version '1.6.21' - id "org.owasp.dependencycheck" version "8.1.2" - id "io.github.gradle-nexus.publish-plugin" version "1.0.0" - -} - -group 'com.exactpro.th2' -version release_version - -repositories { - mavenCentral() - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - } - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' +buildscript { + repositories { + gradlePluginPortal() + maven { + url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + } } - mavenLocal() - - configurations.all { - resolutionStrategy.cacheChangingModulesFor 0, 'seconds' - resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' + dependencies { + classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915968839-41381e5-SNAPSHOT" } } +apply plugin: "com.exactpro.th2.common-conventions" +apply plugin: "com.exactpro.th2.docker-conventions" +apply plugin: 'kotlin-kapt' + dependencies { - api platform('com.exactpro.th2:bom:4.2.0') - api 'com.exactpro.th2:common:5.2.0' - api 'com.exactpro.th2:read-file-common-core:2.0.0-dev-version-2-4315008828-1ed0f94-SNAPSHOT' + api platform("com.exactpro.th2:bom:4.5.0") + implementation "com.exactpro.th2:common:5.4.2-dev" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" + implementation "com.exactpro.th2:read-file-common-core:3.0.0-dev" - implementation('com.opencsv:opencsv:5.7.0') { + implementation ("com.opencsv:opencsv:5.8") { because("we need to write a correct CSV in case of free pattern is used") } - implementation 'javax.annotation:javax.annotation-api:1.3.2' + implementation "javax.annotation:javax.annotation-api:1.3.2" implementation "org.jetbrains.kotlin:kotlin-stdlib" - implementation "org.slf4j:slf4j-api" - - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' - implementation 'com.fasterxml.jackson.module:jackson-module-kotlin' - - testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - testImplementation 'org.mockito:mockito-core:3.6.0' -} - -test { - useJUnitPlatform() - testLogging.showStandardStreams = true -} - -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) - } -} - -description = 'DataReaderClient' -sourceCompatibility = JavaVersion.VERSION_11 -applicationName = 'service' - -tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile) { - kotlinOptions { - jvmTarget = "11" - } -} - -distTar { - archiveFileName.set("${applicationName}.tar") -} - -dockerPrepare { - dependsOn distTar -} - -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) -} - -application { - mainClass.set("com.exactpro.th2.readlog.Main") -} - -dependencyCheck { - formats=['SARIF'] -} - -dependencyLocking { - lockAllConfigurations() -} - - -java { - withJavadocJar() - withSourcesJar() -} - -// conditionals for publications -tasks.withType(PublishToMavenRepository) { - onlyIf { - (repository == publishing.repositories.nexusRepository && - project.hasProperty('nexus_user') && - project.hasProperty('nexus_password') && - project.hasProperty('nexus_url')) || - (repository == publishing.repositories.sonatype && - project.hasProperty('sonatypeUsername') && - project.hasProperty('sonatypePassword')) - } -} -tasks.withType(Sign) { - onlyIf { project.hasProperty('signingKey') && - project.hasProperty('signingPassword') - } -} -// disable running task 'initializeSonatypeStagingRepository' on a gitlab -tasks.whenTaskAdded {task -> - if(task.name.equals('initializeSonatypeStagingRepository') && - !(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword')) - ) { - task.enabled = false - } -} - -publishing { - publications { - mavenJava(MavenPublication) { - from(components.java) - pom { - name = rootProject.name - packaging = 'jar' - description = rootProject.description - url = vcs_url - scm { - url = vcs_url - } - licenses { - license { - name = 'The Apache License, Version 2.0' - url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' - } - } - developers { - developer { - id = 'developer' - name = 'developer' - email = 'developer@exactpro.com' - } - } - scm { - url = vcs_url - } - } - } - } - repositories { -//Nexus repo to publish from gitlab - maven { - name = 'nexusRepository' - credentials { - username = project.findProperty('nexus_user') - password = project.findProperty('nexus_password') - } - url = project.findProperty('nexus_url') - } - } -} - -nexusPublishing { - repositories { - sonatype { - nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/")) - snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/")) - } - } -} - - -signing { - def signingKey = findProperty("signingKey") - def signingPassword = findProperty("signingPassword") - useInMemoryPgpKeys(signingKey, signingPassword) - sign publishing.publications.mavenJava + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" + implementation "com.fasterxml.jackson.module:jackson-module-kotlin" } test { useJUnitPlatform { excludeTags('integration-test') } + testLogging.showStandardStreams = true } tasks.register('integrationTest', Test) { @@ -229,13 +46,6 @@ tasks.register('integrationTest', Test) { } } -dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 - - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false - } +dependencyLocking { + lockAllConfigurations() } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 524ce60..e874dbd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,4 @@ -release_version=4.0.1 -vcs_url=https://github.com/th2-net/th2-read-log \ No newline at end of file +release_version=4.1.0 +vcs_url=https://github.com/th2-net/th2-read-log +description=DataReaderClient +app_main_class=com.exactpro.th2.readlog.Main \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/LogData.java b/src/main/java/com/exactpro/th2/readlog/LogData.java index 839d7e4..8a1eae6 100644 --- a/src/main/java/com/exactpro/th2/readlog/LogData.java +++ b/src/main/java/com/exactpro/th2/readlog/LogData.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,11 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction; + import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -27,6 +28,7 @@ public final class LogData { private List body; private String rawTimestamp; private Instant parsedTimestamp; + private Direction direction; public LogData() { this(null); @@ -61,9 +63,17 @@ public void setParsedTimestamp(Instant localDateTime) { this.parsedTimestamp = localDateTime; } + public Direction getDirection() { + return direction; + } + + public void setDirection(Direction direction) { + this.direction = direction; + } + private void initIfNeeded() { if (body == null) { body = new ArrayList<>(); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/Main.java b/src/main/java/com/exactpro/th2/readlog/Main.java index ba95d7c..1b58ff9 100644 --- a/src/main/java/com/exactpro/th2/readlog/Main.java +++ b/src/main/java/com/exactpro/th2/readlog/Main.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,11 +21,13 @@ import com.exactpro.th2.common.event.EventUtils; import com.exactpro.th2.common.grpc.EventBatch; import com.exactpro.th2.common.grpc.EventID; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageBatch; import com.exactpro.th2.common.metrics.CommonMetrics; import com.exactpro.th2.common.schema.factory.CommonFactory; import com.exactpro.th2.common.schema.message.MessageRouter; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; import com.exactpro.th2.read.file.common.AbstractFileReader; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState; @@ -41,6 +43,7 @@ import java.io.LineNumberReader; import java.nio.file.Path; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.concurrent.Executors; @@ -65,7 +68,6 @@ public static void main(String[] args) { var boxBookName = commonFactory.getBoxConfiguration().getBookName(); toDispose.add(commonFactory); - MessageRouter rawMessageBatchRouter = commonFactory.getMessageRouterRawBatch(); MessageRouter eventBatchRouter = commonFactory.getEventBatchRouter(); LogReaderConfiguration configuration = commonFactory.getCustomConfiguration(LogReaderConfiguration.class, LogReaderConfiguration.MAPPER); @@ -91,20 +93,41 @@ public static void main(String[] args) { CommonMetrics.READINESS_MONITOR.enable(); - AbstractFileReader reader - = LogFileReader.getLogFileReader( - configuration, - configuration.isSyncWithCradle() - ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), - streamId -> commonFactory.newMessageIDBuilder().getBookName()) - : new InMemoryReaderState(), - streamId -> commonFactory.newMessageIDBuilder().build(), - (streamId, builders) -> publishMessages(rawMessageBatchRouter, streamId, builders), - (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), - (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) - ); - - toDispose.add(reader); + final Runnable processUpdates; + + if (configuration.isUseTransport()) { + AbstractFileReader reader + = LogFileReader.getTransportLogFileReader( + configuration, + configuration.isSyncWithCradle() + ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), + streamId -> commonFactory.newMessageIDBuilder().getBookName(), + CradleReaderState.WRAP_TRANSPORT) + : new InMemoryReaderState(), + streamId -> MessageId.builder(), + (streamId, builders) -> publishTransportMessages(commonFactory.getTransportGroupBatchRouter(), streamId, builders, boxBookName), + (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), + (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) + ); + + processUpdates = reader::processUpdates; + toDispose.add(reader); + } else { + AbstractFileReader reader + = LogFileReader.getProtoLogFileReader( + configuration, + configuration.isSyncWithCradle() + ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), streamId -> boxBookName, CradleReaderState.WRAP_PROTO) + : new InMemoryReaderState(), + streamId -> commonFactory.newMessageIDBuilder().build(), + (streamId, builders) -> publishProtoMessages(commonFactory.getMessageRouterRawBatch(), streamId, builders), + (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId), + (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId) + ); + + processUpdates = reader::processUpdates; + toDispose.add(reader); + } ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); toDispose.add(() -> { @@ -115,8 +138,7 @@ public static void main(String[] args) { } }); - - ScheduledFuture future = executorService.scheduleWithFixedDelay(reader::processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS); + ScheduledFuture future = executorService.scheduleWithFixedDelay(processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS); awaitShutdown(lock, condition); future.cancel(true); } catch (IOException | InterruptedException e) { @@ -157,19 +179,39 @@ private static Unit publishError(MessageRouter eventBatchRouter, Str } @NotNull - private static Unit publishMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders) { + private static Unit publishProtoMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders) { try { - RawMessageBatch.Builder builder = RawMessageBatch.newBuilder(); - for (RawMessage.Builder msg : builders) { + com.exactpro.th2.common.grpc.RawMessageBatch.Builder builder = com.exactpro.th2.common.grpc.RawMessageBatch.newBuilder(); + for (com.exactpro.th2.common.grpc.RawMessage.Builder msg : builders) { builder.addMessages(msg); } - rawMessageBatchRouter.sendAll(builder.build()); + rawMessageBatchRouter.sendAll(builder.build(), "raw"); } catch (Exception e) { LOGGER.error("Cannot publish batch for {}", streamId, e); } return Unit.INSTANCE; } + @NotNull + private static Unit publishTransportMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders, String bookName) { + try { + // messages are grouped by session aliases + String sessionGroup = builders.get(0).idBuilder().getSessionAlias(); + + List groups = new ArrayList<>(builders.size()); + for (RawMessage.Builder msgBuilder : builders) { + groups.add(new MessageGroup(List.of(msgBuilder.build()))); + } + + var batch = new GroupBatch(bookName, sessionGroup, groups); + rawMessageBatchRouter.sendAll(batch, "transport-group"); + } catch (Exception e) { + LOGGER.error("Cannot publish batch for {}", streamId, e); + } + + return Unit.INSTANCE; + } + private static void configureShutdownHook(Deque resources, ReentrantLock lock, Condition condition) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOGGER.info("Shutdown start"); @@ -203,5 +245,4 @@ private static void awaitShutdown(ReentrantLock lock, Condition condition) throw lock.unlock(); } } - -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java index 85b870f..e0133aa 100644 --- a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java +++ b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog; import java.io.StringWriter; @@ -25,14 +26,17 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.exactpro.th2.common.grpc.Direction; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.readlog.cfg.AliasConfiguration; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.opencsv.CSVWriter; import com.opencsv.ICSVWriter; import org.apache.commons.text.StringSubstitutor; @@ -65,17 +69,22 @@ public LogData parse(StreamId streamId, String raw) { throw new IllegalArgumentException("Unknown alias '" + sessionAlias +"'. No configuration found" ); } - LogData resultData = new LogData(); - - Pattern directionPattern = Objects.requireNonNull(configuration.getDirectionToPattern().get(streamId.getDirection()), - () -> "Pattern for direction " + streamId.getDirection() + " and session " + sessionAlias); - Matcher matcher = directionPattern.matcher(raw); - // check if the a string matches the direction from streamId - // skip line if it is not ours direction - if (!matcher.find()) { - return resultData; + Direction direction = null; + for (Entry entry : configuration.getDirectionToPattern().entrySet()) { + if (entry.getValue().matcher(raw).find()) { + direction = entry.getKey(); + break; + } + } + // check whether the line matches any direction regex + // if not it is not our line + if (direction == null) { + return LogData.EMPTY; } + LogData resultData = new LogData(); + resultData.setDirection(TransportUtilsKt.getTransport(direction)); + List regexGroups = configuration.getGroups(); if (configuration.isJoinGroups()) { parseBodyJoined(raw, configuration, resultData); @@ -84,7 +93,7 @@ public LogData parse(StreamId streamId, String raw) { } if (resultData.getBody().isEmpty()) { - // fast way, nothing matches the regexp so we don't need to check for date pattern + // fast way, nothing matches the regexp, so we don't need to check for date pattern return resultData; } @@ -180,7 +189,7 @@ private void parseBodyJoined(String raw, AliasConfiguration configuration, LogDa private void addJoined(LogData data, List> values, char delimiter) { var writer = new StringWriter(); - ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is not IO + ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is no IO for (List value : values) { csvPrinter.writeNext(value.toArray(String[]::new)); } @@ -218,4 +227,4 @@ private Integer tryParse(String value) { return null; } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java index 7ddf5be..6f0b5e7 100644 --- a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java +++ b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,9 @@ public class LogReaderConfiguration { @JsonPropertyDescription("Enables synchronization information about last timestamp and sequence for stream with Cradle") private boolean syncWithCradle = true; + @JsonPropertyDescription("Enables using th2 transport protocol") + private boolean useTransport = false; + @JsonCreator public LogReaderConfiguration(@JsonProperty("logDirectory") Path logDirectory) { this.logDirectory = Objects.requireNonNull(logDirectory, "'Log directory' parameter"); @@ -92,4 +95,12 @@ public boolean isSyncWithCradle() { public void setSyncWithCradle(boolean syncWithCradle) { this.syncWithCradle = syncWithCradle; } -} + + public void setUseTransport(boolean useTransport) { + this.useTransport = useTransport; + } + + public boolean isUseTransport() { + return useTransport; + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java index 04c997b..4f4a7bb 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java @@ -1,5 +1,5 @@ -/******************************************************************************* - * Copyright 2022 Exactpro (Exactpro Systems Limited) +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,24 +12,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + */ package com.exactpro.th2.readlog.impl; -import com.exactpro.th2.common.grpc.MessageID; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; import com.exactpro.th2.read.file.common.AbstractFileReader; import com.exactpro.th2.read.file.common.DirectoryChecker; import com.exactpro.th2.read.file.common.FileSourceWrapper; import com.exactpro.th2.read.file.common.MovedFileTracker; import com.exactpro.th2.read.file.common.StreamId; -import com.exactpro.th2.read.file.common.impl.DefaultFileReader.Builder; +import com.exactpro.th2.read.file.common.impl.ProtoDefaultFileReader; import com.exactpro.th2.read.file.common.impl.RecoverableBufferedReaderWrapper; +import com.exactpro.th2.read.file.common.impl.TransportDefaultFileReader; import com.exactpro.th2.read.file.common.state.ReaderState; import com.exactpro.th2.readlog.RegexLogParser; import com.exactpro.th2.readlog.cfg.LogReaderConfiguration; import com.exactpro.th2.readlog.impl.lambdas.ForOnError; import com.exactpro.th2.readlog.impl.lambdas.ForOnSourceCorrupted; -import com.exactpro.th2.readlog.impl.lambdas.ForOnStreamData; +import com.exactpro.th2.readlog.impl.lambdas.ProtoForOnStreamData; +import com.exactpro.th2.readlog.impl.lambdas.TransportForOnStreamData; import org.apache.commons.lang3.exception.ExceptionUtils; import java.io.IOException; @@ -44,18 +47,18 @@ public class LogFileReader { - public static AbstractFileReader getLogFileReader( + public static AbstractFileReader getProtoLogFileReader( LogReaderConfiguration configuration, ReaderState readerState, - Function initialMessageId, - ForOnStreamData forStream, + Function initialMessageId, + ProtoForOnStreamData forStream, ForOnError forError, ForOnSourceCorrupted forCorrupted ){ - return new Builder<>( + return new ProtoDefaultFileReader.Builder<>( configuration.getCommon(), getDirectoryChecker(configuration), - new RegexpContentParser(new RegexLogParser(configuration.getAliases())), + new ProtoRegexpContentParser(new RegexLogParser(configuration.getAliases())), new MovedFileTracker(configuration.getLogDirectory()), readerState, initialMessageId::apply, @@ -67,7 +70,31 @@ public static AbstractFileReader getLogFileReader( .onError(forError::action) .onSourceCorrupted(forCorrupted::action) .build(); + } + public static AbstractFileReader getTransportLogFileReader( + LogReaderConfiguration configuration, + ReaderState readerState, + Function initialMessageId, + TransportForOnStreamData forStream, + ForOnError forError, + ForOnSourceCorrupted forCorrupted + ){ + return new TransportDefaultFileReader.Builder<>( + configuration.getCommon(), + getDirectoryChecker(configuration), + new TransportRegexpContentParser(new RegexLogParser(configuration.getAliases())), + new MovedFileTracker(configuration.getLogDirectory()), + readerState, + initialMessageId::apply, + LogFileReader::createSource + ) + .readFileImmediately() + .acceptNewerFiles() + .onStreamData(forStream::action) + .onError(forError::action) + .onSourceCorrupted(forCorrupted::action) + .build(); } private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration configuration) { @@ -76,10 +103,8 @@ private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration confi configuration.getLogDirectory(), (Path path) -> configuration.getAliases().entrySet().stream() .filter(entry -> entry.getValue().getPathFilter().matcher(path.getFileName().toString()).matches()) - .flatMap(entry -> entry.getValue().getDirectionToPattern() - .keySet().stream() - .map(direction -> new StreamId(entry.getKey(), direction)) - ).collect(Collectors.toSet()), + .map(entry -> new StreamId(entry.getKey())) + .collect(Collectors.toSet()), files -> files.sort(pathComparator), path -> true ); @@ -92,4 +117,4 @@ private static FileSourceWrapper createSource(StreamId streamI return ExceptionUtils.rethrow(e); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java similarity index 79% rename from src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java rename to src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java index 60c655b..1371093 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,36 +13,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.readlog.impl; import com.exactpro.th2.common.grpc.RawMessageMetadata; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.exactpro.th2.common.message.MessageUtils; +import com.exactpro.th2.read.file.common.impl.LineParser; import com.exactpro.th2.readlog.LogData; import com.google.protobuf.ByteString; import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.List; -import java.util.Objects; - import java.util.stream.Collectors; import javax.annotation.Nonnull; import com.exactpro.th2.common.grpc.RawMessage; import com.exactpro.th2.read.file.common.StreamId; -import com.exactpro.th2.read.file.common.impl.LineParser; import com.exactpro.th2.readlog.RegexLogParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RegexpContentParser extends LineParser { - private static final Logger LOGGER = LoggerFactory.getLogger(RegexpContentParser.class); +import static java.util.Objects.requireNonNull; + +public class ProtoRegexpContentParser extends LineParser { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtoRegexpContentParser.class); private final RegexLogParser parser; - public RegexpContentParser(RegexLogParser parser) { - this.parser = Objects.requireNonNull(parser, "'Parser' parameter"); + public ProtoRegexpContentParser(RegexLogParser parser) { + super(LineParser.PROTO); + this.parser = requireNonNull(parser, "'Parser' parameter"); } @Nonnull @@ -59,6 +59,8 @@ protected List lineToMessages(@Nonnull StreamId streamId, @N } private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) { + builder.getIdBuilder().setDirection(TransportUtilsKt.getProto(requireNonNull(logData.getDirection(), + "direction is not set"))); if (logData.getParsedTimestamp() != null) { builder.getIdBuilder().setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp())); } @@ -66,5 +68,4 @@ private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) builder.putProperties("logTimestamp", logData.getRawTimestamp()); } } - -} +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java new file mode 100644 index 0000000..d183ab4 --- /dev/null +++ b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readlog.impl; + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; +import com.exactpro.th2.read.file.common.StreamId; +import com.exactpro.th2.read.file.common.impl.LineParser; +import com.exactpro.th2.readlog.LogData; +import com.exactpro.th2.readlog.RegexLogParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +public class TransportRegexpContentParser extends LineParser { + private static final Logger LOGGER = LoggerFactory.getLogger(TransportRegexpContentParser.class); + private final RegexLogParser parser; + + public TransportRegexpContentParser(RegexLogParser parser) { + super(LineParser.TRANSPORT); + this.parser = requireNonNull(parser, "'Parser' parameter"); + } + + @Nonnull + @Override + protected List lineToMessages(@Nonnull StreamId streamId, @Nonnull String readLine) { + LogData logData = parser.parse(streamId, readLine); + LOGGER.trace("{} line(s) extracted from {}: {}", logData.getBody().size(), readLine, logData.getBody()); + return logData.getBody().stream().map(it -> { + RawMessage.Builder builder = RawMessage.builder(); + setupMetadata(builder, logData); + builder.setBody(it.getBytes(StandardCharsets.UTF_8)); + return builder; + }).collect(Collectors.toList()); + } + + private void setupMetadata(RawMessage.Builder builder, LogData logData) { + builder.idBuilder().setDirection(requireNonNull(logData.getDirection(), "direction is not set")); + + if (logData.getParsedTimestamp() != null) { + builder.idBuilder().setTimestamp(logData.getParsedTimestamp()); + } + + if (logData.getRawTimestamp() != null) { + builder.addMetadataProperty("logTimestamp", logData.getRawTimestamp()); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java similarity index 70% rename from src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java rename to src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java index c35a662..b36fe48 100644 --- a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnStreamData.java +++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java @@ -1,5 +1,5 @@ -/******************************************************************************* - * Copyright 2022 Exactpro (Exactpro Systems Limited) +/* + * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + */ package com.exactpro.th2.readlog.impl.lambdas; @@ -22,6 +22,6 @@ import java.util.List; -public interface ForOnStreamData { - Unit action(StreamId id, List builder); -} +public interface ProtoForOnStreamData { + Unit action(StreamId id, List builder); +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java new file mode 100644 index 0000000..40fb3c5 --- /dev/null +++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readlog.impl.lambdas; + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage; +import com.exactpro.th2.read.file.common.StreamId; +import kotlin.Unit; + +import java.util.List; + +public interface TransportForOnStreamData { + Unit action(StreamId id, List builder); +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt index 7b13472..9423696 100644 --- a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt +++ b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022. Exactpro (Exactpro Systems Limited) + * Copyright 2022-2023. Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,42 +19,48 @@ package com.exactpro.th2.readlog.impl import com.exactpro.cradle.BookId import com.exactpro.cradle.CradleStorage import com.exactpro.cradle.Order -import com.exactpro.cradle.messages.MessageFilter -import com.exactpro.cradle.messages.StoredMessageId -import com.exactpro.th2.common.grpc.RawMessage -import com.exactpro.th2.common.util.toCradleDirection +import com.exactpro.cradle.messages.GroupedMessageFilter import com.exactpro.th2.read.file.common.StreamId import com.exactpro.th2.read.file.common.state.ReaderState +import com.exactpro.th2.read.file.common.state.Content +import com.exactpro.th2.read.file.common.state.ProtoContent +import com.exactpro.th2.read.file.common.state.TransportContent import com.exactpro.th2.read.file.common.state.StreamData import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState import com.google.protobuf.ByteString import com.google.protobuf.UnsafeByteOperations +import io.netty.buffer.Unpooled import java.time.Instant -class CradleReaderState private constructor( +class CradleReaderState @JvmOverloads constructor( private val cradleStorage: CradleStorage, - private val delegate: ReaderState, + private val delegate: ReaderState = InMemoryReaderState(), private val bookSupplier: (StreamId) -> String, + private val wrapContent: (ByteArray) -> Content ): ReaderState by delegate { - constructor(cradleStorage: CradleStorage, bookSupplier: (StreamId) -> String) - : this(cradleStorage, InMemoryReaderState(), bookSupplier) - override fun get(streamId: StreamId): StreamData? { - return delegate[streamId] ?: cradleStorage.getMessages( - MessageFilter.builder() - .sessionAlias(streamId.sessionAlias) - .direction(streamId.direction.toCradleDirection()) + return delegate[streamId] ?: cradleStorage.getGroupedMessageBatches( + GroupedMessageFilter.builder() + .groupName(streamId.sessionAlias) .bookId(BookId(bookSupplier(streamId))) .timestampTo().isLessThanOrEqualTo(Instant.now()) .limit(1) .order(Order.REVERSE) .build() - ).asSequence().firstOrNull()?.run { + ).asSequence().firstOrNull()?.lastMessage?.run { StreamData( timestamp, sequence, - content?.let(UnsafeByteOperations::unsafeWrap) ?: ByteString.EMPTY, + wrapContent(content) ) } } + + companion object { + @JvmField + val WRAP_PROTO: (ByteArray?) -> Content = { ProtoContent(it?.let(UnsafeByteOperations::unsafeWrap) ?: ByteString.EMPTY) } + + @JvmField + val WRAP_TRANSPORT: (ByteArray?) -> Content = { TransportContent(it?.let(Unpooled::wrappedBuffer) ?: Unpooled.EMPTY_BUFFER) } + } } \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java index 652dda3..9f9485a 100644 --- a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java +++ b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package com.exactpro.th2.readlog; import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt; import com.exactpro.th2.read.file.common.StreamId; import com.exactpro.th2.readlog.cfg.AliasConfiguration; @@ -32,13 +33,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; public class TestLogParser { @@ -46,7 +46,8 @@ public class TestLogParser { static final String TEST_MESSAGE_ALIAS = "tma"; static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT = "tma_wrong_format"; static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN = "tma_wrong_pattern"; - static final String RAW_LOG = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; + static final String RAW_LOG_IN = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; + static final String RAW_LOG_OUT = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - outgoing fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}"; @ParameterizedTest(name = "Has message: {1}, Skips before: {0}") @MethodSource("skipMessageParams") @@ -65,7 +66,10 @@ void skipMessageByTimestamp(Instant skipBefore, boolean hasMessages) { "test", cfg )); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "2021-03-23 13:21:37.991337479 some data"); + LogData data = parser.parse(new StreamId("test"), "2021-03-23 13:21:37.991337479 some data"); + if (hasMessages) { + assertEquals(Direction.FIRST, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction"); + } assertEquals(hasMessages, !data.getBody().isEmpty(), () -> "unexpected data " + data.getBody()); if (hasMessages) { assertEquals(List.of("2021-03-23 13:21:37.991337479 some data"), data.getBody()); @@ -79,23 +83,18 @@ static List skipMessageParams() { ); } - @Test - void parser() { + @ParameterizedTest + @EnumSource(value = Direction.class, mode = EnumSource.Mode.EXCLUDE, names = "UNRECOGNIZED") + void parser(Direction direction) { RegexLogParser logParser = new RegexLogParser(getConfiguration()); - LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.FIRST), RAW_LOG); + LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS), direction == Direction.FIRST ? RAW_LOG_IN : RAW_LOG_OUT); + assertEquals(direction, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction"); assertEquals(1, data.getBody().size()); assertEquals("NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}", data.getBody().get(0)); assertEquals("2021-03-23 13:21:37.991337479", data.getRawTimestamp()); assertEquals(Instant.parse("2021-03-23T13:21:37.991337479Z"), data.getParsedTimestamp()); } - @Test - void skipIfDirectionPatternDoesNotMatch() { - RegexLogParser logParser = new RegexLogParser(getConfiguration()); - LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.SECOND), RAW_LOG); - assertTrue(data.getBody().isEmpty(), () -> "Unexpected data read: " + data.getBody()); - } - @Test void parserErrors() { RegexLogParser logParser = new RegexLogParser(getConfiguration()); @@ -103,21 +102,21 @@ void parserErrors() { assertAll( () -> { var ex = assertThrows(IllegalStateException.class, - () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT, Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("The timestamp '2021-03-23 13:21:37.991337479' cannot be parsed"), () -> "Actual error: " + ex.getMessage()); }, () -> { var ex = assertThrows(IllegalStateException.class, - () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN, Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("The pattern '3012.*' cannot extract the timestamp from the string"), () -> "Actual error: " + ex.getMessage()); }, () -> { var ex = assertThrows(IllegalArgumentException.class, - () -> logParser.parse(new StreamId("wrong_alias", Direction.FIRST), RAW_LOG)); + () -> logParser.parse(new StreamId("wrong_alias"), RAW_LOG_IN)); Assertions.assertTrue( ex.getMessage().startsWith("Unknown alias 'wrong_alias'. No configuration found"), () -> "Actual error: " + ex.getMessage()); @@ -145,4 +144,4 @@ private Map getConfiguration() { return result; } -} +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java index ddac41d..abcc08f 100644 --- a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java +++ b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java @@ -43,7 +43,7 @@ void joinsIfOneMatchFound() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "this a test string, 123 and no more"); + LogData data = parser.parse(new StreamId("test"), "this a test string, 123 and no more"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -67,7 +67,7 @@ void doesNotTryToSubstituteInResultString() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "[test] should not try to process ${3} and ${variable}"); + LogData data = parser.parse(new StreamId("test"), "[test] should not try to process ${3} and ${variable}"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -92,7 +92,7 @@ void joinsIfManyMatchesFound() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -119,7 +119,7 @@ void usesCorrectDelimiter() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42"); + LogData data = parser.parse(new StreamId("test"), "A, 42"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -145,7 +145,7 @@ void usesConstantsFromMap() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals( @@ -171,7 +171,7 @@ void correctlyAcceptsGroupsByName() { )); RegexLogParser parser = new RegexLogParser(Map.of("test", configuration)); - LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53"); + LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53"); List body = data.getBody(); Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body); Assertions.assertEquals(