diff --git a/.github/workflows/build-dev-release.yml b/.github/workflows/build-dev-release.yml
new file mode 100644
index 0000000..b438d11
--- /dev/null
+++ b/.github/workflows/build-dev-release.yml
@@ -0,0 +1,15 @@
+name: Build and publish dev release Docker image to Github Container Registry ghcr.io
+
+on: workflow_dispatch
+
+jobs:
+ build:
+ uses: th2-net/.github/.github/workflows/compound-java.yml@main
+ with:
+ build-target: 'Docker'
+ devRelease: true
+ createTag: true
+ docker-username: ${{ github.actor }}
+ secrets:
+ docker-password: ${{ secrets.GITHUB_TOKEN }}
+ nvd-api-key: ${{ secrets.NVD_APIKEY }}
\ No newline at end of file
diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml
new file mode 100644
index 0000000..dcf70be
--- /dev/null
+++ b/.github/workflows/build-release.yml
@@ -0,0 +1,15 @@
+name: Build and publish release Docker image to Github Container Registry ghcr.io
+
+on: workflow_dispatch
+
+jobs:
+ build:
+ uses: th2-net/.github/.github/workflows/compound-java.yml@main
+ with:
+ build-target: 'Docker'
+ devRelease: false
+ createTag: true
+ docker-username: ${{ github.actor }}
+ secrets:
+ docker-password: ${{ secrets.GITHUB_TOKEN }}
+ nvd-api-key: ${{ secrets.NVD_APIKEY }}
\ No newline at end of file
diff --git a/.github/workflows/build-sanpshot.yml b/.github/workflows/build-sanpshot.yml
new file mode 100644
index 0000000..9366a44
--- /dev/null
+++ b/.github/workflows/build-sanpshot.yml
@@ -0,0 +1,20 @@
+name: Build and publish Docker image to Github Container Registry ghcr.io
+
+on:
+ push:
+ branches-ignore:
+ - master
+ - version-*
+ - dependabot**
+ paths-ignore:
+ - README.md
+
+jobs:
+ build-job:
+ uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main
+ with:
+ build-target: 'Docker'
+ docker-username: ${{ github.actor }}
+ secrets:
+ docker-password: ${{ secrets.GITHUB_TOKEN }}
+ nvd-api-key: ${{ secrets.NVD_APIKEY }}
\ No newline at end of file
diff --git a/.github/workflows/ci-unwelcome-words.yml b/.github/workflows/ci-unwelcome-words.yml
index cd7adcf..4e5f3a6 100644
--- a/.github/workflows/ci-unwelcome-words.yml
+++ b/.github/workflows/ci-unwelcome-words.yml
@@ -7,11 +7,11 @@ jobs:
test:
runs-on: ubuntu-20.04
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
with:
ref: ${{ github.sha }}
- name: Checkout tool
- uses: actions/checkout@v2
+ uses: actions/checkout@v4
with:
repository: exactpro-th2/ci-github-action
ref: master
diff --git a/.github/workflows/dev-publish.yml b/.github/workflows/dev-publish.yml
deleted file mode 100644
index 181a8f8..0000000
--- a/.github/workflows/dev-publish.yml
+++ /dev/null
@@ -1,20 +0,0 @@
-name: Dev build and publish - Docker and Sonatype
-
-on:
- push:
- branches-ignore:
- - master
- - version-*
-
-jobs:
- build-job:
- uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main
- with:
- build-target: 'Docker,Sonatype'
- docker-username: ${{ github.actor }}
- secrets:
- docker-password: ${{ secrets.GITHUB_TOKEN }}
- sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }}
- sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }}
- sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }}
- sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
diff --git a/.github/workflows/release-publish.yml b/.github/workflows/release-publish.yml
deleted file mode 100644
index 6e709f4..0000000
--- a/.github/workflows/release-publish.yml
+++ /dev/null
@@ -1,21 +0,0 @@
-name: Release build and publish - Docker and Sonatype
-on:
- push:
- branches:
- - master
- - version-*
- paths:
- - gradle.properties
-
-jobs:
- build-job:
- uses: th2-net/.github/.github/workflows/compound-java.yml@main
- with:
- build-target: 'Docker,Sonatype'
- docker-username: ${{ github.actor }}
- secrets:
- docker-password: ${{ secrets.GITHUB_TOKEN }}
- sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }}
- sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }}
- sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }}
- sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
diff --git a/.gitignore b/.gitignore
index 4420ec0..1c91b7c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+/out/
/build/
/.idea/
/.gradle/
diff --git a/.project b/.project
deleted file mode 100644
index b28f978..0000000
--- a/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
- read-log
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.m2e.core.maven2Builder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.m2e.core.maven2Nature
-
-
diff --git a/README.md b/README.md
index 3b9427c..9856845 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Log Reader User Manual 3.5.3
+# Log Reader User Manual 4.2.0
## Document Information
@@ -27,6 +27,8 @@ spec:
type: th2-read
custom-config:
logDirectory: "log/dir"
+ syncWithCradle: true
+ useTransport: true
aliases:
A:
regexp: ".*"
@@ -44,6 +46,7 @@ spec:
timestampRegexp: "^202.+?(?= QUICK)"
timestampFormat: "yyyy-MM-dd HH:mm:ss"
timestampZone: UTC
+ skipBefore: "2022-10-31T12:00:00Z"
D:
regexp: ".*"
pathFilter: "fileC.*\\.log"
@@ -88,6 +91,8 @@ spec:
##### Reader configuration
+ logDirectory - the directory to watch files
++ syncWithCradle - enables synchronization with Cradle for timestamps and sequences that correspond to the alias
++ useTransport - enables using th2 transport protocol (default value: `false`)
+ aliases - the mapping between alias and files that correspond to that alias
+ pathFilter - filter for files that correspond to that alias
+ regexp - the regular expression to extract data from the source lines
@@ -104,6 +109,9 @@ spec:
+ timestampFormat - the format for the timestamp extract from the log's line. **Works only with specified _timestampRegexp_ parameter**.
If _timestampFormat_ is specified the timestamp extract with _timestampRegexp_ will be parsed using this format and used as a message's timestamp.
+ timestampZone - the zone which should be used to process the timestamp from the log file
+ + skipBefore - the parameter defines the minimum timestamp in UTC (ISO format) for log messages.
+ If log message has timestamp less than the specified one it will be dropped.
+ **NOTE: the parameter only works if 'timestampRegexp' and 'timestampFormat' are specified**
+ joinGroups - enables joining groups into a message in CSV format. Can be used to extract generic data from the log. Disabled by default.
+ groupsJoinDelimiter - the delimiter that will be used to join groups from the _regexp_ parameter. **Works only if _joinGroups_ is enabled**. The default value is `,`.
+ headersFormat - the headers' definition. The reader uses the keys as headers. The value to the key will be converted to a value for each match in the current line.
@@ -148,6 +156,55 @@ spec:
attributes: ['raw', 'publish', 'store']
```
+##### Logging
+
+This block describes which classes can be used to get more information about read-log work.
+You can tweak their logging level and you will see extra information in the read-log logs.
+
+###### Log4j2 config
+
+Here is an example of log4j2 logging configuration that can be set in **loggingConfig** parameter for the component:
+
+```yaml
+loggingConfig: |
+ name=Th2Logger
+ # Console appender configuration
+ appender.console.type=Console
+ appender.console.name=consoleLogger
+ appender.console.layout.type=PatternLayout
+ appender.console.layout.pattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-6p [%-15t] %c - %m%n
+ logger.th2.name=com.exactpro.th2
+ logger.th2.level=TRACE
+ rootLogger.level=INFO
+ rootLogger.appenderRef.stdout.ref=consoleLogger
+```
+
+If you need to add extra logging you can do it by adding a line in the following format:
+```properties
+logger..name=
+logger..level=
+```
+
+**NOTE: the _logger_name_ must be unique**.
+
+###### Classes
+
+**com.exactpro.th2.readlog.RegexLogParser**
+
+You can use this class to see how the log line is parsed by the read-log.
+Use TRACE level to get the information.
+
+**com.exactpro.th2.readlog.impl.ProtoRegexpContentParser**
+**com.exactpro.th2.readlog.impl.TransportRegexpContentParser**
+
+You can use these classes to see the resulted lines produced by **_RegexLogParser_**.
+Produce Proto or Transport messages, respectively. Use TRACE level to get the information.
+
+**com.exactpro.th2.read.file.common.AbstractFileReader**
+
+You can use this class to get information about how the reader processes files.
+Use either DEBUG or TRACE level
+
### Examples
#### Example 1
@@ -170,6 +227,34 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u
## Changes
+### 4.2.0
+
++ Migrate to th2 gradle plugin `0.0.6`
++ Updated bom: `4.6.1`
++ Updated common: `5.11.0-dev`
++ Updated read-file-common-core: `3.3.0-dev`
++ Updated jakarta.annotation-api: `3.0.0`
++ Updated opencsv: `5.9`
+
+### 4.1.1
+
++ Update core version to 3.1.0-dev
++ Update common to 5.7.1-dev
+
+### 4.1.0
+
++ Added support for th2 transport protocol
+
+### 4.0.1
+
++ Added dev-release GitHub workflow
+
+### 4.0.0
+
++ Updated common to 5.0.0
+ + Migration to books-pages
++ Updated read-file-common-core to 2.0.0
+
### 3.5.3
+ Updated common to 3.44.1
@@ -183,6 +268,16 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u
+ Fixed release workflow
+
+### 3.5.0
+
++ Update dependencies with vulnerabilities
+ + log4j 1.2 is excluded
+ + kotlin updated to 1.6.21
++ Parameter `skipBefore` for filtering log messages by timestamp from the file
++ Parameter `syncWithCradle` for timestamp and sequence synchronization with Cradle.
+ Enabled by default.
+
### 3.5.0
+ Updated `kotlin` to 1.6.21
@@ -222,4 +317,4 @@ Output: 8=FIXT.1.1\u00019=66\u000135=A\u000134=1\u000149=NFT2_FIX1\u000156=FGW\u
### 3.0.0
-+ Migrate to a common read-core
++ Migrate to a common read-core
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 6b91606..3e19f9f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,35 +1,17 @@
-/*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
plugins {
- id 'java'
- id 'java-library'
id 'application'
- id 'maven-publish'
- id 'signing'
- id 'com.palantir.docker' version '0.25.0'
- id 'org.jetbrains.kotlin.jvm' version '1.6.21'
- id "org.owasp.dependencycheck" version "8.1.0"
- id "io.github.gradle-nexus.publish-plugin" version "1.0.0"
-
+ id 'org.jetbrains.kotlin.jvm' version '1.8.22'
+ id 'org.jetbrains.kotlin.kapt' version '1.8.22'
+ id 'com.exactpro.th2.gradle.component' version '0.0.6'
}
group 'com.exactpro.th2'
version release_version
+kotlin {
+ jvmToolchain(11)
+}
+
repositories {
mavenCentral()
maven {
@@ -42,178 +24,37 @@ repositories {
}
mavenLocal()
- configurations.all {
+ configurations.configureEach {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds'
}
}
dependencies {
- api platform('com.exactpro.th2:bom:4.2.0')
- api ('com.exactpro.th2:common:3.44.1')
- api 'com.exactpro.th2:read-file-common-core:1.5.1'
+ implementation "com.exactpro.th2:common:5.11.0-dev"
+ implementation "com.exactpro.th2:common-utils:2.2.3-dev"
+ implementation "com.exactpro.th2:read-file-common-core:3.3.0-dev"
- implementation('com.opencsv:opencsv:5.7.0') {
+ implementation ("com.opencsv:opencsv:5.9") {
because("we need to write a correct CSV in case of free pattern is used")
}
- implementation 'javax.annotation:javax.annotation-api:1.3.2'
- implementation "org.jetbrains.kotlin:kotlin-stdlib"
-
- implementation "org.slf4j:slf4j-api"
-
- implementation 'com.fasterxml.jackson.core:jackson-databind'
- implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
- implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'
-
- testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
- testImplementation 'org.mockito:mockito-core:3.6.0'
-}
-
-test {
- useJUnitPlatform()
- testLogging.showStandardStreams = true
-}
-
-jar {
- manifest {
- attributes(
- 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})",
- 'Specification-Title': '',
- 'Specification-Vendor': 'Exactpro Systems LLC',
- 'Implementation-Title': project.archivesBaseName,
- 'Implementation-Vendor': 'Exactpro Systems LLC',
- 'Implementation-Vendor-Id': 'com.exactpro',
- 'Implementation-Version': project.version
- )
- }
-}
-
-description = 'DataReaderClient'
-sourceCompatibility = '11'
-applicationName = 'service'
-
-distTar {
- archiveName "${applicationName}.tar"
-}
-
-dockerPrepare {
- dependsOn distTar
-}
-
-docker {
- copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
-}
-
-application {
- mainClassName = "com.exactpro.th2.readlog.Main"
-}
-
-dependencyCheck {
- formats=['SARIF']
-}
-
-dependencyLocking {
- lockAllConfigurations()
-}
-
-
-java {
- withJavadocJar()
- withSourcesJar()
-}
-
-// conditionals for publications
-tasks.withType(PublishToMavenRepository) {
- onlyIf {
- (repository == publishing.repositories.nexusRepository &&
- project.hasProperty('nexus_user') &&
- project.hasProperty('nexus_password') &&
- project.hasProperty('nexus_url')) ||
- (repository == publishing.repositories.sonatype &&
- project.hasProperty('sonatypeUsername') &&
- project.hasProperty('sonatypePassword'))
- }
-}
-tasks.withType(Sign) {
- onlyIf { project.hasProperty('signingKey') &&
- project.hasProperty('signingPassword')
- }
-}
-// disable running task 'initializeSonatypeStagingRepository' on a gitlab
-tasks.whenTaskAdded {task ->
- if(task.name.equals('initializeSonatypeStagingRepository') &&
- !(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword'))
- ) {
- task.enabled = false
- }
-}
-
-publishing {
- publications {
- mavenJava(MavenPublication) {
- from(components.java)
- pom {
- name = rootProject.name
- packaging = 'jar'
- description = rootProject.description
- url = vcs_url
- scm {
- url = vcs_url
- }
- licenses {
- license {
- name = 'The Apache License, Version 2.0'
- url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
- }
- }
- developers {
- developer {
- id = 'developer'
- name = 'developer'
- email = 'developer@exactpro.com'
- }
- }
- scm {
- url = vcs_url
- }
- }
- }
- }
- repositories {
-//Nexus repo to publish from gitlab
- maven {
- name = 'nexusRepository'
- credentials {
- username = project.findProperty('nexus_user')
- password = project.findProperty('nexus_password')
- }
- url = project.findProperty('nexus_url')
- }
- }
-}
+ implementation 'jakarta.annotation:jakarta.annotation-api:3.0.0'
+ implementation "io.github.microutils:kotlin-logging:3.0.5"
-nexusPublishing {
- repositories {
- sonatype {
- nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/"))
- snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/"))
- }
- }
-}
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
+ implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
-
-signing {
- def signingKey = findProperty("signingKey")
- def signingPassword = findProperty("signingPassword")
- useInMemoryPgpKeys(signingKey, signingPassword)
- sign publishing.publications.mavenJava
+ testImplementation "org.junit.jupiter:junit-jupiter:5.10.2"
+ testImplementation "org.mockito:mockito-core:5.12.0"
}
test {
useJUnitPlatform {
excludeTags('integration-test')
}
+ testLogging.showStandardStreams = true
}
tasks.register('integrationTest', Test) {
@@ -223,13 +64,6 @@ tasks.register('integrationTest', Test) {
}
}
-dependencyCheck {
- formats=['SARIF', 'JSON', 'HTML']
- failBuildOnCVSS=5
-
- analyzers {
- assemblyEnabled = false
- nugetconfEnabled = false
- nodeEnabled = false
- }
+application {
+ mainClassName = "com.exactpro.th2.readlog.Main"
}
\ No newline at end of file
diff --git a/gradle.properties b/gradle.properties
index 04cb2a5..b81539e 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,2 +1,3 @@
-release_version=3.5.3
+release_version=4.2.0
vcs_url=https://github.com/th2-net/th2-read-log
+description=DataReaderClient
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/LogData.java b/src/main/java/com/exactpro/th2/readlog/LogData.java
index ebe362f..8a1eae6 100644
--- a/src/main/java/com/exactpro/th2/readlog/LogData.java
+++ b/src/main/java/com/exactpro/th2/readlog/LogData.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,20 +13,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.exactpro.th2.readlog;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction;
+
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-final public class LogData {
+public final class LogData {
+ public static LogData EMPTY = new LogData(List.of());
private List body;
private String rawTimestamp;
- private LocalDateTime parsedTimestamp;
+ private Instant parsedTimestamp;
+ private Direction direction;
- private ZoneId timestampZone;
+ public LogData() {
+ this(null);
+ }
+
+ private LogData(List body) {
+ this.body = body;
+ }
public void addBody(String item) {
initIfNeeded();
@@ -45,20 +55,20 @@ public void setRawTimestamp(String rawTimestamp) {
this.rawTimestamp = rawTimestamp;
}
- public LocalDateTime getParsedTimestamp() {
+ public Instant getParsedTimestamp() {
return parsedTimestamp;
}
- public void setParsedTimestamp(LocalDateTime localDateTime) {
+ public void setParsedTimestamp(Instant localDateTime) {
this.parsedTimestamp = localDateTime;
}
- public ZoneId getTimestampZone() {
- return timestampZone;
+ public Direction getDirection() {
+ return direction;
}
- public void setTimestampZone(ZoneId timestampZone) {
- this.timestampZone = timestampZone;
+ public void setDirection(Direction direction) {
+ this.direction = direction;
}
private void initIfNeeded() {
@@ -66,4 +76,4 @@ private void initIfNeeded() {
body = new ArrayList<>();
}
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/Main.java b/src/main/java/com/exactpro/th2/readlog/Main.java
index b027a17..1b58ff9 100644
--- a/src/main/java/com/exactpro/th2/readlog/Main.java
+++ b/src/main/java/com/exactpro/th2/readlog/Main.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,48 +16,42 @@
package com.exactpro.th2.readlog;
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayDeque;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
import com.exactpro.th2.common.event.Event;
+import com.exactpro.th2.common.event.Event.Status;
import com.exactpro.th2.common.event.EventUtils;
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.grpc.EventID;
-import com.exactpro.th2.common.grpc.RawMessage;
-import com.exactpro.th2.common.grpc.RawMessageBatch;
import com.exactpro.th2.common.metrics.CommonMetrics;
import com.exactpro.th2.common.schema.factory.CommonFactory;
import com.exactpro.th2.common.schema.message.MessageRouter;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage;
import com.exactpro.th2.read.file.common.AbstractFileReader;
-import com.exactpro.th2.read.file.common.DirectoryChecker;
-import com.exactpro.th2.read.file.common.FileSourceWrapper;
-import com.exactpro.th2.read.file.common.MovedFileTracker;
import com.exactpro.th2.read.file.common.StreamId;
-import com.exactpro.th2.read.file.common.impl.DefaultFileReader;
-import com.exactpro.th2.read.file.common.impl.RecoverableBufferedReaderWrapper;
import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState;
import com.exactpro.th2.readlog.cfg.LogReaderConfiguration;
-import com.exactpro.th2.readlog.impl.RegexpContentParser;
+import com.exactpro.th2.readlog.impl.CradleReaderState;
+import com.exactpro.th2.readlog.impl.LogFileReader;
import kotlin.Unit;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Comparator.comparing;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class Main {
@@ -69,11 +63,11 @@ public static void main(String[] args) {
var condition = lock.newCondition();
configureShutdownHook(toDispose, lock, condition);
- CommonMetrics.setLiveness(true);
+ CommonMetrics.LIVENESS_MONITOR.enable();
CommonFactory commonFactory = CommonFactory.createFromArguments(args);
+ var boxBookName = commonFactory.getBoxConfiguration().getBookName();
toDispose.add(commonFactory);
- MessageRouter rawMessageBatchRouter = commonFactory.getMessageRouterRawBatch();
MessageRouter eventBatchRouter = commonFactory.getEventBatchRouter();
LogReaderConfiguration configuration = commonFactory.getCustomConfiguration(LogReaderConfiguration.class, LogReaderConfiguration.MAPPER);
@@ -82,19 +76,6 @@ public static void main(String[] args) {
throw new IllegalArgumentException("Alias " + alias + " has parameter joinGroups = true but does not have any headers defined");
}
});
- Comparator pathComparator = comparing(it -> it.getFileName().toString(), String.CASE_INSENSITIVE_ORDER);
- var directoryChecker = new DirectoryChecker(
- configuration.getLogDirectory(),
- (Path path) -> configuration.getAliases().entrySet().stream()
- .filter(entry -> entry.getValue().getPathFilter().matcher(path.getFileName().toString()).matches())
- .flatMap(entry -> entry.getValue().getDirectionToPattern()
- .keySet().stream()
- .map(direction -> new StreamId(entry.getKey(), direction))
- ).collect(Collectors.toSet()),
- files -> files.sort(pathComparator),
- path -> true
- );
- RegexLogParser logParser = new RegexLogParser(configuration.getAliases());
if (configuration.getPullingInterval().isNegative()) {
throw new IllegalArgumentException("Pulling interval " + configuration.getPullingInterval() + " must not be negative");
@@ -103,28 +84,50 @@ public static void main(String[] args) {
try {
Event rootEvent = Event.start().endTimestamp()
.name("Log reader for " + String.join(",", configuration.getAliases().keySet()))
- .type("Microservice");
- var protoEvent = rootEvent.toProto(null);
+ .type("ReadLog")
+ .status(Status.PASSED);
+ EventID componentRootEvent = commonFactory.getRootEventId();
+ var protoEvent = rootEvent.toProto(componentRootEvent);
eventBatchRouter.sendAll(EventBatch.newBuilder().addEvents(protoEvent).build());
EventID rootId = protoEvent.getId();
- CommonMetrics.setReadiness(true);
- AbstractFileReader reader = new DefaultFileReader.Builder<>(
- configuration.getCommon(),
- directoryChecker,
- new RegexpContentParser(logParser),
- new MovedFileTracker(configuration.getLogDirectory()),
- new InMemoryReaderState(),
- Main::createSource
- )
- .readFileImmediately()
- .acceptNewerFiles()
- .onStreamData((streamId, builders) -> publishMessages(rawMessageBatchRouter, streamId, builders))
- .onError((streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId))
- .onSourceCorrupted((streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId))
- .build();
-
- toDispose.add(reader);
+ CommonMetrics.READINESS_MONITOR.enable();
+
+ final Runnable processUpdates;
+
+ if (configuration.isUseTransport()) {
+ AbstractFileReader reader
+ = LogFileReader.getTransportLogFileReader(
+ configuration,
+ configuration.isSyncWithCradle()
+ ? new CradleReaderState(commonFactory.getCradleManager().getStorage(),
+ streamId -> commonFactory.newMessageIDBuilder().getBookName(),
+ CradleReaderState.WRAP_TRANSPORT)
+ : new InMemoryReaderState(),
+ streamId -> MessageId.builder(),
+ (streamId, builders) -> publishTransportMessages(commonFactory.getTransportGroupBatchRouter(), streamId, builders, boxBookName),
+ (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId),
+ (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId)
+ );
+
+ processUpdates = reader::processUpdates;
+ toDispose.add(reader);
+ } else {
+ AbstractFileReader reader
+ = LogFileReader.getProtoLogFileReader(
+ configuration,
+ configuration.isSyncWithCradle()
+ ? new CradleReaderState(commonFactory.getCradleManager().getStorage(), streamId -> boxBookName, CradleReaderState.WRAP_PROTO)
+ : new InMemoryReaderState(),
+ streamId -> commonFactory.newMessageIDBuilder().build(),
+ (streamId, builders) -> publishProtoMessages(commonFactory.getMessageRouterRawBatch(), streamId, builders),
+ (streamId, message, ex) -> publishErrorEvent(eventBatchRouter, streamId, message, ex, rootId),
+ (streamId, path, e) -> publishSourceCorruptedEvent(eventBatchRouter, path, streamId, e, rootId)
+ );
+
+ processUpdates = reader::processUpdates;
+ toDispose.add(reader);
+ }
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
toDispose.add(() -> {
@@ -135,8 +138,7 @@ public static void main(String[] args) {
}
});
-
- ScheduledFuture> future = executorService.scheduleWithFixedDelay(reader::processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS);
+ ScheduledFuture> future = executorService.scheduleWithFixedDelay(processUpdates, 0, configuration.getPullingInterval().toMillis(), TimeUnit.MILLISECONDS);
awaitShutdown(lock, condition);
future.cancel(true);
} catch (IOException | InterruptedException e) {
@@ -177,31 +179,43 @@ private static Unit publishError(MessageRouter eventBatchRouter, Str
}
@NotNull
- private static Unit publishMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List builders) {
+ private static Unit publishProtoMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List extends com.exactpro.th2.common.grpc.RawMessage.Builder> builders) {
try {
- RawMessageBatch.Builder builder = RawMessageBatch.newBuilder();
- for (RawMessage.Builder msg : builders) {
+ com.exactpro.th2.common.grpc.RawMessageBatch.Builder builder = com.exactpro.th2.common.grpc.RawMessageBatch.newBuilder();
+ for (com.exactpro.th2.common.grpc.RawMessage.Builder msg : builders) {
builder.addMessages(msg);
}
- rawMessageBatchRouter.sendAll(builder.build());
+ rawMessageBatchRouter.sendAll(builder.build(), "raw");
} catch (Exception e) {
LOGGER.error("Cannot publish batch for {}", streamId, e);
}
return Unit.INSTANCE;
}
- private static FileSourceWrapper createSource(StreamId streamId, Path path) {
+ @NotNull
+ private static Unit publishTransportMessages(MessageRouter rawMessageBatchRouter, StreamId streamId, List extends RawMessage.Builder> builders, String bookName) {
try {
- return new RecoverableBufferedReaderWrapper(new LineNumberReader(Files.newBufferedReader(path)));
- } catch (IOException e) {
- return ExceptionUtils.rethrow(e);
+ // messages are grouped by session aliases
+ String sessionGroup = builders.get(0).idBuilder().getSessionAlias();
+
+ List groups = new ArrayList<>(builders.size());
+ for (RawMessage.Builder msgBuilder : builders) {
+ groups.add(new MessageGroup(List.of(msgBuilder.build())));
+ }
+
+ var batch = new GroupBatch(bookName, sessionGroup, groups);
+ rawMessageBatchRouter.sendAll(batch, "transport-group");
+ } catch (Exception e) {
+ LOGGER.error("Cannot publish batch for {}", streamId, e);
}
+
+ return Unit.INSTANCE;
}
private static void configureShutdownHook(Deque resources, ReentrantLock lock, Condition condition) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutdown start");
- CommonMetrics.setReadiness(false);
+ CommonMetrics.READINESS_MONITOR.disable();
try {
lock.lock();
condition.signalAll();
@@ -216,7 +230,7 @@ private static void configureShutdownHook(Deque resources, Reentr
}
});
- CommonMetrics.setLiveness(false);
+ CommonMetrics.LIVENESS_MONITOR.disable();
LOGGER.info("Shutdown end");
}, "Shutdown hook"));
}
@@ -231,5 +245,4 @@ private static void awaitShutdown(ReentrantLock lock, Condition condition) throw
lock.unlock();
}
}
-
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java
index 0a077b2..e0133aa 100644
--- a/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java
+++ b/src/main/java/com/exactpro/th2/readlog/RegexLogParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,23 +13,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.exactpro.th2.readlog;
import java.io.StringWriter;
import java.time.DateTimeException;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import com.exactpro.th2.common.grpc.Direction;
import com.exactpro.th2.read.file.common.StreamId;
import com.exactpro.th2.readlog.cfg.AliasConfiguration;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt;
import com.opencsv.CSVWriter;
import com.opencsv.ICSVWriter;
import org.apache.commons.text.StringSubstitutor;
@@ -62,16 +69,21 @@ public LogData parse(StreamId streamId, String raw) {
throw new IllegalArgumentException("Unknown alias '" + sessionAlias +"'. No configuration found" );
}
- LogData resultData = new LogData();
-
- Pattern directionPattern = Objects.requireNonNull(configuration.getDirectionToPattern().get(streamId.getDirection()),
- () -> "Pattern for direction " + streamId.getDirection() + " and session " + sessionAlias);
- Matcher matcher = directionPattern.matcher(raw);
- // check if the a string matches the direction from streamId
- // skip line if it is not ours direction
- if (!matcher.find()) {
- return resultData;
+ Direction direction = null;
+ for (Entry entry : configuration.getDirectionToPattern().entrySet()) {
+ if (entry.getValue().matcher(raw).find()) {
+ direction = entry.getKey();
+ break;
+ }
}
+ // check whether the line matches any direction regex
+ // if not it is not our line
+ if (direction == null) {
+ return LogData.EMPTY;
+ }
+
+ LogData resultData = new LogData();
+ resultData.setDirection(TransportUtilsKt.getTransport(direction));
List regexGroups = configuration.getGroups();
if (configuration.isJoinGroups()) {
@@ -81,7 +93,7 @@ public LogData parse(StreamId streamId, String raw) {
}
if (resultData.getBody().isEmpty()) {
- // fast way, nothing matches the regexp so we don't need to check for date pattern
+ // fast way, nothing matches the regexp, so we don't need to check for date pattern
return resultData;
}
@@ -96,17 +108,27 @@ public LogData parse(StreamId streamId, String raw) {
// DateTime from log
DateTimeFormatter timestampFormat = configuration.getTimestampFormat();
if (timestampFormat != null) {
- parseTimestamp(timestampFormat, resultData);
- resultData.setTimestampZone(configuration.getTimestampZone());
+ ZoneOffset offset = Objects.requireNonNullElse(configuration.getTimestampZone(), ZoneId.systemDefault())
+ .getRules().getOffset(Instant.now());
+ parseTimestamp(timestampFormat, resultData, offset);
+ }
+
+ if (resultData.getParsedTimestamp() != null && configuration.getSkipBefore() != null) {
+ if (resultData.getParsedTimestamp().isBefore(configuration.getSkipBefore())) {
+ logger.trace("Content dropped because of 'skipBefore' condition. Log timestamp: {}, Skip before: {}",
+ resultData.getParsedTimestamp(), configuration.getSkipBefore()
+ );
+ return LogData.EMPTY;
+ }
}
return resultData;
}
- private void parseTimestamp(DateTimeFormatter formatter, LogData data) {
+ private void parseTimestamp(DateTimeFormatter formatter, LogData data, ZoneOffset offset) {
String rawTimestamp = data.getRawTimestamp();
try {
- LocalDateTime dateTime = LocalDateTime.parse(rawTimestamp, formatter);
+ Instant dateTime = LocalDateTime.parse(rawTimestamp, formatter).toInstant(offset);
data.setParsedTimestamp(dateTime);
logger.trace("ParsedTimestamp: {}", dateTime);
} catch (DateTimeException e) {
@@ -167,7 +189,7 @@ private void parseBodyJoined(String raw, AliasConfiguration configuration, LogDa
private void addJoined(LogData data, List> values, char delimiter) {
var writer = new StringWriter();
- ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is not IO
+ ICSVWriter csvPrinter = createCsvWriter(writer, delimiter); // we can ignore closing because there is no IO
for (List value : values) {
csvPrinter.writeNext(value.toArray(String[]::new));
}
@@ -205,4 +227,4 @@ private Integer tryParse(String value) {
return null;
}
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/cfg/AliasConfiguration.java b/src/main/java/com/exactpro/th2/readlog/cfg/AliasConfiguration.java
index a2212cc..98b27ed 100644
--- a/src/main/java/com/exactpro/th2/readlog/cfg/AliasConfiguration.java
+++ b/src/main/java/com/exactpro/th2/readlog/cfg/AliasConfiguration.java
@@ -19,6 +19,7 @@
import com.exactpro.th2.common.grpc.Direction;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@@ -60,6 +61,12 @@ public class AliasConfiguration {
+ "It not set the time zone from the local machine will be taken")
private ZoneId timestampZone;
+ @JsonPropertyDescription(
+ "The date time in ISO format that will be used to filter log messages if 'timestampRegexp' and 'timestampFormat' are set."
+ + "Otherwise, the filtering won't be applied"
+ )
+ private Instant skipBefore;
+
@JsonCreator
public AliasConfiguration(
@JsonProperty(value = "regexp", required = true) String regexp,
@@ -149,4 +156,13 @@ public ZoneId getTimestampZone() {
public void setTimestampZone(ZoneOffset timestampZone) {
this.timestampZone = timestampZone;
}
+
+ @Nullable
+ public Instant getSkipBefore() {
+ return skipBefore;
+ }
+
+ public void setSkipBefore(@Nullable Instant skipBefore) {
+ this.skipBefore = skipBefore;
+ }
}
diff --git a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java
index 6fe6410..6f0b5e7 100644
--- a/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java
+++ b/src/main/java/com/exactpro/th2/readlog/cfg/LogReaderConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,11 +28,14 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.fasterxml.jackson.module.kotlin.KotlinFeature;
import com.fasterxml.jackson.module.kotlin.KotlinModule;
public class LogReaderConfiguration {
public static final ObjectMapper MAPPER = new ObjectMapper()
- .registerModule(new KotlinModule())
+ .registerModule(new KotlinModule.Builder()
+ .enable(KotlinFeature.NullIsSameAsDefault)
+ .build())
.registerModule(new JavaTimeModule());
@JsonProperty(required = true)
@@ -46,6 +49,12 @@ public class LogReaderConfiguration {
private Duration pullingInterval = Duration.ofSeconds(5);
+ @JsonPropertyDescription("Enables synchronization information about last timestamp and sequence for stream with Cradle")
+ private boolean syncWithCradle = true;
+
+ @JsonPropertyDescription("Enables using th2 transport protocol")
+ private boolean useTransport = false;
+
@JsonCreator
public LogReaderConfiguration(@JsonProperty("logDirectory") Path logDirectory) {
this.logDirectory = Objects.requireNonNull(logDirectory, "'Log directory' parameter");
@@ -78,4 +87,20 @@ public Duration getPullingInterval() {
public void setPullingInterval(Duration pullingInterval) {
this.pullingInterval = pullingInterval;
}
-}
+
+ public boolean isSyncWithCradle() {
+ return syncWithCradle;
+ }
+
+ public void setSyncWithCradle(boolean syncWithCradle) {
+ this.syncWithCradle = syncWithCradle;
+ }
+
+ public void setUseTransport(boolean useTransport) {
+ this.useTransport = useTransport;
+ }
+
+ public boolean isUseTransport() {
+ return useTransport;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java
new file mode 100644
index 0000000..4f4a7bb
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/LogFileReader.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.readlog.impl;
+
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage;
+import com.exactpro.th2.read.file.common.AbstractFileReader;
+import com.exactpro.th2.read.file.common.DirectoryChecker;
+import com.exactpro.th2.read.file.common.FileSourceWrapper;
+import com.exactpro.th2.read.file.common.MovedFileTracker;
+import com.exactpro.th2.read.file.common.StreamId;
+import com.exactpro.th2.read.file.common.impl.ProtoDefaultFileReader;
+import com.exactpro.th2.read.file.common.impl.RecoverableBufferedReaderWrapper;
+import com.exactpro.th2.read.file.common.impl.TransportDefaultFileReader;
+import com.exactpro.th2.read.file.common.state.ReaderState;
+import com.exactpro.th2.readlog.RegexLogParser;
+import com.exactpro.th2.readlog.cfg.LogReaderConfiguration;
+import com.exactpro.th2.readlog.impl.lambdas.ForOnError;
+import com.exactpro.th2.readlog.impl.lambdas.ForOnSourceCorrupted;
+import com.exactpro.th2.readlog.impl.lambdas.ProtoForOnStreamData;
+import com.exactpro.th2.readlog.impl.lambdas.TransportForOnStreamData;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Comparator.comparing;
+
+public class LogFileReader {
+
+ public static AbstractFileReader getProtoLogFileReader(
+ LogReaderConfiguration configuration,
+ ReaderState readerState,
+ Function initialMessageId,
+ ProtoForOnStreamData forStream,
+ ForOnError forError,
+ ForOnSourceCorrupted forCorrupted
+ ){
+ return new ProtoDefaultFileReader.Builder<>(
+ configuration.getCommon(),
+ getDirectoryChecker(configuration),
+ new ProtoRegexpContentParser(new RegexLogParser(configuration.getAliases())),
+ new MovedFileTracker(configuration.getLogDirectory()),
+ readerState,
+ initialMessageId::apply,
+ LogFileReader::createSource
+ )
+ .readFileImmediately()
+ .acceptNewerFiles()
+ .onStreamData(forStream::action)
+ .onError(forError::action)
+ .onSourceCorrupted(forCorrupted::action)
+ .build();
+ }
+
+ public static AbstractFileReader getTransportLogFileReader(
+ LogReaderConfiguration configuration,
+ ReaderState readerState,
+ Function initialMessageId,
+ TransportForOnStreamData forStream,
+ ForOnError forError,
+ ForOnSourceCorrupted forCorrupted
+ ){
+ return new TransportDefaultFileReader.Builder<>(
+ configuration.getCommon(),
+ getDirectoryChecker(configuration),
+ new TransportRegexpContentParser(new RegexLogParser(configuration.getAliases())),
+ new MovedFileTracker(configuration.getLogDirectory()),
+ readerState,
+ initialMessageId::apply,
+ LogFileReader::createSource
+ )
+ .readFileImmediately()
+ .acceptNewerFiles()
+ .onStreamData(forStream::action)
+ .onError(forError::action)
+ .onSourceCorrupted(forCorrupted::action)
+ .build();
+ }
+
+ private static DirectoryChecker getDirectoryChecker(LogReaderConfiguration configuration) {
+ Comparator pathComparator = comparing(it -> it.getFileName().toString(), String.CASE_INSENSITIVE_ORDER);
+ return new DirectoryChecker(
+ configuration.getLogDirectory(),
+ (Path path) -> configuration.getAliases().entrySet().stream()
+ .filter(entry -> entry.getValue().getPathFilter().matcher(path.getFileName().toString()).matches())
+ .map(entry -> new StreamId(entry.getKey()))
+ .collect(Collectors.toSet()),
+ files -> files.sort(pathComparator),
+ path -> true
+ );
+ }
+
+ private static FileSourceWrapper createSource(StreamId streamId, Path path) {
+ try {
+ return new RecoverableBufferedReaderWrapper(new LineNumberReader(Files.newBufferedReader(path)));
+ } catch (IOException e) {
+ return ExceptionUtils.rethrow(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java
similarity index 75%
rename from src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java
rename to src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java
index a5f4a9e..1371093 100644
--- a/src/main/java/com/exactpro/th2/readlog/impl/RegexpContentParser.java
+++ b/src/main/java/com/exactpro/th2/readlog/impl/ProtoRegexpContentParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,36 +13,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.exactpro.th2.readlog.impl;
import com.exactpro.th2.common.grpc.RawMessageMetadata;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt;
import com.exactpro.th2.common.message.MessageUtils;
+import com.exactpro.th2.read.file.common.impl.LineParser;
import com.exactpro.th2.readlog.LogData;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZoneOffset;
import java.util.List;
-import java.util.Objects;
-
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import com.exactpro.th2.common.grpc.RawMessage;
import com.exactpro.th2.read.file.common.StreamId;
-import com.exactpro.th2.read.file.common.impl.LineParser;
import com.exactpro.th2.readlog.RegexLogParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RegexpContentParser extends LineParser {
- private static final Logger LOGGER = LoggerFactory.getLogger(RegexpContentParser.class);
+import static java.util.Objects.requireNonNull;
+
+public class ProtoRegexpContentParser extends LineParser {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProtoRegexpContentParser.class);
private final RegexLogParser parser;
- public RegexpContentParser(RegexLogParser parser) {
- this.parser = Objects.requireNonNull(parser, "'Parser' parameter");
+ public ProtoRegexpContentParser(RegexLogParser parser) {
+ super(LineParser.PROTO);
+ this.parser = requireNonNull(parser, "'Parser' parameter");
}
@Nonnull
@@ -59,16 +59,13 @@ protected List lineToMessages(@Nonnull StreamId streamId, @N
}
private void setupMetadata(RawMessageMetadata.Builder builder, LogData logData) {
+ builder.getIdBuilder().setDirection(TransportUtilsKt.getProto(requireNonNull(logData.getDirection(),
+ "direction is not set")));
if (logData.getParsedTimestamp() != null) {
- ZoneOffset currentOffsetForMyZone = Objects.requireNonNullElse(
- logData.getTimestampZone(),
- ZoneId.systemDefault()
- ).getRules().getOffset(Instant.now());
- builder.setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp(),currentOffsetForMyZone));
+ builder.getIdBuilder().setTimestamp(MessageUtils.toTimestamp(logData.getParsedTimestamp()));
}
if (logData.getRawTimestamp() != null) {
builder.putProperties("logTimestamp", logData.getRawTimestamp());
}
}
-
-}
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java
new file mode 100644
index 0000000..d183ab4
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/TransportRegexpContentParser.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2023 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.readlog.impl;
+
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage;
+import com.exactpro.th2.read.file.common.StreamId;
+import com.exactpro.th2.read.file.common.impl.LineParser;
+import com.exactpro.th2.readlog.LogData;
+import com.exactpro.th2.readlog.RegexLogParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+public class TransportRegexpContentParser extends LineParser {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TransportRegexpContentParser.class);
+ private final RegexLogParser parser;
+
+ public TransportRegexpContentParser(RegexLogParser parser) {
+ super(LineParser.TRANSPORT);
+ this.parser = requireNonNull(parser, "'Parser' parameter");
+ }
+
+ @Nonnull
+ @Override
+ protected List lineToMessages(@Nonnull StreamId streamId, @Nonnull String readLine) {
+ LogData logData = parser.parse(streamId, readLine);
+ LOGGER.trace("{} line(s) extracted from {}: {}", logData.getBody().size(), readLine, logData.getBody());
+ return logData.getBody().stream().map(it -> {
+ RawMessage.Builder builder = RawMessage.builder();
+ setupMetadata(builder, logData);
+ builder.setBody(it.getBytes(StandardCharsets.UTF_8));
+ return builder;
+ }).collect(Collectors.toList());
+ }
+
+ private void setupMetadata(RawMessage.Builder builder, LogData logData) {
+ builder.idBuilder().setDirection(requireNonNull(logData.getDirection(), "direction is not set"));
+
+ if (logData.getParsedTimestamp() != null) {
+ builder.idBuilder().setTimestamp(logData.getParsedTimestamp());
+ }
+
+ if (logData.getRawTimestamp() != null) {
+ builder.addMetadataProperty("logTimestamp", logData.getRawTimestamp());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnError.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnError.java
new file mode 100644
index 0000000..dcdee15
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnError.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Copyright 2022 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package com.exactpro.th2.readlog.impl.lambdas;
+
+import com.exactpro.th2.read.file.common.StreamId;
+import kotlin.Unit;
+
+public interface ForOnError {
+ Unit action(StreamId id, String message, Exception ex);
+}
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnSourceCorrupted.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnSourceCorrupted.java
new file mode 100644
index 0000000..2d6344b
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ForOnSourceCorrupted.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Copyright 2022 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package com.exactpro.th2.readlog.impl.lambdas;
+
+import com.exactpro.th2.read.file.common.StreamId;
+import kotlin.Unit;
+
+import java.nio.file.Path;
+
+public interface ForOnSourceCorrupted {
+ Unit action(StreamId id, Path path, Exception ex);
+}
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java
new file mode 100644
index 0000000..b36fe48
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/ProtoForOnStreamData.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.readlog.impl.lambdas;
+
+import com.exactpro.th2.common.grpc.RawMessage;
+import com.exactpro.th2.read.file.common.StreamId;
+import kotlin.Unit;
+
+import java.util.List;
+
+public interface ProtoForOnStreamData {
+ Unit action(StreamId id, List extends RawMessage.Builder> builder);
+}
\ No newline at end of file
diff --git a/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java
new file mode 100644
index 0000000..40fb3c5
--- /dev/null
+++ b/src/main/java/com/exactpro/th2/readlog/impl/lambdas/TransportForOnStreamData.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2023 Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.readlog.impl.lambdas;
+
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage;
+import com.exactpro.th2.read.file.common.StreamId;
+import kotlin.Unit;
+
+import java.util.List;
+
+public interface TransportForOnStreamData {
+ Unit action(StreamId id, List extends RawMessage.Builder> builder);
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt
new file mode 100644
index 0000000..9423696
--- /dev/null
+++ b/src/main/kotlin/com/exactpro/th2/readlog/impl/CradleReaderState.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2022-2023. Exactpro (Exactpro Systems Limited)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.exactpro.th2.readlog.impl
+
+import com.exactpro.cradle.BookId
+import com.exactpro.cradle.CradleStorage
+import com.exactpro.cradle.Order
+import com.exactpro.cradle.messages.GroupedMessageFilter
+import com.exactpro.th2.read.file.common.StreamId
+import com.exactpro.th2.read.file.common.state.ReaderState
+import com.exactpro.th2.read.file.common.state.Content
+import com.exactpro.th2.read.file.common.state.ProtoContent
+import com.exactpro.th2.read.file.common.state.TransportContent
+import com.exactpro.th2.read.file.common.state.StreamData
+import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState
+import com.google.protobuf.ByteString
+import com.google.protobuf.UnsafeByteOperations
+import io.netty.buffer.Unpooled
+import java.time.Instant
+
+class CradleReaderState @JvmOverloads constructor(
+ private val cradleStorage: CradleStorage,
+ private val delegate: ReaderState = InMemoryReaderState(),
+ private val bookSupplier: (StreamId) -> String,
+ private val wrapContent: (ByteArray) -> Content
+): ReaderState by delegate {
+ override fun get(streamId: StreamId): StreamData? {
+ return delegate[streamId] ?: cradleStorage.getGroupedMessageBatches(
+ GroupedMessageFilter.builder()
+ .groupName(streamId.sessionAlias)
+ .bookId(BookId(bookSupplier(streamId)))
+ .timestampTo().isLessThanOrEqualTo(Instant.now())
+ .limit(1)
+ .order(Order.REVERSE)
+ .build()
+ ).asSequence().firstOrNull()?.lastMessage?.run {
+ StreamData(
+ timestamp,
+ sequence,
+ wrapContent(content)
+ )
+ }
+ }
+
+ companion object {
+ @JvmField
+ val WRAP_PROTO: (ByteArray?) -> Content = { ProtoContent(it?.let(UnsafeByteOperations::unsafeWrap) ?: ByteString.EMPTY) }
+
+ @JvmField
+ val WRAP_TRANSPORT: (ByteArray?) -> Content = { TransportContent(it?.let(Unpooled::wrappedBuffer) ?: Unpooled.EMPTY_BUFFER) }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java
index dc8db2d..9f9485a 100644
--- a/src/test/java/com/exactpro/th2/readlog/TestLogParser.java
+++ b/src/test/java/com/exactpro/th2/readlog/TestLogParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
+ * Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,8 +17,12 @@
package com.exactpro.th2.readlog;
import com.exactpro.th2.common.grpc.Direction;
+import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportUtilsKt;
import com.exactpro.th2.read.file.common.StreamId;
import com.exactpro.th2.readlog.cfg.AliasConfiguration;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -27,40 +31,68 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
public class TestLogParser {
static final String TEST_MESSAGE_ALIAS = "tma";
static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT = "tma_wrong_format";
static final String TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN = "tma_wrong_pattern";
- static final String RAW_LOG = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";
+ static final String RAW_LOG_IN = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - incoming fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";
+ static final String RAW_LOG_OUT = "2021-03-23 13:21:37.991337479 QUICK.TEST INFO quicktest (Test.cpp:99) - outgoing fix message fix NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}";
- @Test
- void parser() {
+ @ParameterizedTest(name = "Has message: {1}, Skips before: {0}")
+ @MethodSource("skipMessageParams")
+ void skipMessageByTimestamp(Instant skipBefore, boolean hasMessages) {
+ AliasConfiguration cfg = new AliasConfiguration(
+ ".+",
+ ".*",
+ Map.of(),
+ "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3,9}",
+ "yyyy-MM-dd HH:mm:ss.SSSSSSSSS"
+ );
+ cfg.setGroups(List.of(0));
+ cfg.setTimestampZone(ZoneOffset.UTC);
+ cfg.setSkipBefore(skipBefore);
+ RegexLogParser parser = new RegexLogParser(Map.of(
+ "test", cfg
+ ));
+
+ LogData data = parser.parse(new StreamId("test"), "2021-03-23 13:21:37.991337479 some data");
+ if (hasMessages) {
+ assertEquals(Direction.FIRST, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction");
+ }
+ assertEquals(hasMessages, !data.getBody().isEmpty(), () -> "unexpected data " + data.getBody());
+ if (hasMessages) {
+ assertEquals(List.of("2021-03-23 13:21:37.991337479 some data"), data.getBody());
+ }
+ }
+
+ static List skipMessageParams() {
+ return List.of(
+ arguments(Instant.parse("2021-03-23T13:21:38Z"), false),
+ arguments(Instant.parse("2021-03-23T13:21:36Z"), true)
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = Direction.class, mode = EnumSource.Mode.EXCLUDE, names = "UNRECOGNIZED")
+ void parser(Direction direction) {
RegexLogParser logParser = new RegexLogParser(getConfiguration());
- LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.FIRST), RAW_LOG);
+ LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS), direction == Direction.FIRST ? RAW_LOG_IN : RAW_LOG_OUT);
+ assertEquals(direction, TransportUtilsKt.getProto(data.getDirection()), "unexpected direction");
assertEquals(1, data.getBody().size());
assertEquals("NewOrderSingle={ AuthenticationBlock={ UserID=\"qwrqwrq\" SessionKey=123456 } Header={ MsgTime=2021-Mar-21 21:21:21.210000000 CreationTime=2021-Mar-21 21:21:21.210000000 } NewOrder={ InstrumentBlock={ InstrSymbol=\"TEST_SYMBOL\" SecurityID=\"212121\" SecurityIDSource=TestSource SecurityExchange=\"test\" }}}", data.getBody().get(0));
assertEquals("2021-03-23 13:21:37.991337479", data.getRawTimestamp());
- assertEquals(2021, data.getParsedTimestamp().getYear());
- assertEquals(23, data.getParsedTimestamp().getDayOfMonth());
- assertEquals(3, data.getParsedTimestamp().getMonthValue());
- assertEquals(13, data.getParsedTimestamp().getHour());
- assertEquals(21, data.getParsedTimestamp().getMinute());
- assertEquals(37, data.getParsedTimestamp().getSecond());
- assertEquals(991337479, data.getParsedTimestamp().getNano());
- }
-
- @Test
- void skipIfDirectionPatternDoesNotMatch() {
- RegexLogParser logParser = new RegexLogParser(getConfiguration());
- LogData data = logParser.parse(new StreamId(TEST_MESSAGE_ALIAS, Direction.SECOND), RAW_LOG);
- assertTrue(data.getBody().isEmpty(), () -> "Unexpected data read: " + data.getBody());
+ assertEquals(Instant.parse("2021-03-23T13:21:37.991337479Z"), data.getParsedTimestamp());
}
@Test
@@ -70,21 +102,21 @@ void parserErrors() {
assertAll(
() -> {
var ex = assertThrows(IllegalStateException.class,
- () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT, Direction.FIRST), RAW_LOG));
+ () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("The timestamp '2021-03-23 13:21:37.991337479' cannot be parsed"),
() -> "Actual error: " + ex.getMessage());
},
() -> {
var ex = assertThrows(IllegalStateException.class,
- () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN, Direction.FIRST), RAW_LOG));
+ () -> logParser.parse(new StreamId(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("The pattern '3012.*' cannot extract the timestamp from the string"),
() -> "Actual error: " + ex.getMessage());
},
() -> {
var ex = assertThrows(IllegalArgumentException.class,
- () -> logParser.parse(new StreamId("wrong_alias", Direction.FIRST), RAW_LOG));
+ () -> logParser.parse(new StreamId("wrong_alias"), RAW_LOG_IN));
Assertions.assertTrue(
ex.getMessage().startsWith("Unknown alias 'wrong_alias'. No configuration found"),
() -> "Actual error: " + ex.getMessage());
@@ -98,9 +130,11 @@ private Map getConfiguration() {
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSSSSS";
Map result = new HashMap<>();
- result.put(TEST_MESSAGE_ALIAS, new AliasConfiguration(regexp, "",
+ AliasConfiguration cfg = new AliasConfiguration(regexp, "",
Map.of(Direction.FIRST, "incoming", Direction.SECOND, "outgoing"),
- timextampRegexp, timestampFormat));
+ timextampRegexp, timestampFormat);
+ cfg.setTimestampZone(ZoneOffset.UTC);
+ result.put(TEST_MESSAGE_ALIAS, cfg);
result.put(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_FORMAT, new AliasConfiguration(regexp, "", Collections.emptyMap(), timextampRegexp, "123"));
result.put(TEST_MESSAGE_ALIAS_WRONG_TIMESTAMP_PATTERN, new AliasConfiguration(regexp, "", Collections.emptyMap(), "3012.*", timestampFormat));
@@ -110,4 +144,4 @@ private Map getConfiguration() {
return result;
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java
index ddac41d..abcc08f 100644
--- a/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java
+++ b/src/test/java/com/exactpro/th2/readlog/TestRegexLogParserJoining.java
@@ -43,7 +43,7 @@ void joinsIfOneMatchFound() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "this a test string, 123 and no more");
+ LogData data = parser.parse(new StreamId("test"), "this a test string, 123 and no more");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
@@ -67,7 +67,7 @@ void doesNotTryToSubstituteInResultString() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "[test] should not try to process ${3} and ${variable}");
+ LogData data = parser.parse(new StreamId("test"), "[test] should not try to process ${3} and ${variable}");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
@@ -92,7 +92,7 @@ void joinsIfManyMatchesFound() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
+ LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
@@ -119,7 +119,7 @@ void usesCorrectDelimiter() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42");
+ LogData data = parser.parse(new StreamId("test"), "A, 42");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
@@ -145,7 +145,7 @@ void usesConstantsFromMap() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
+ LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
@@ -171,7 +171,7 @@ void correctlyAcceptsGroupsByName() {
));
RegexLogParser parser = new RegexLogParser(Map.of("test", configuration));
- LogData data = parser.parse(new StreamId("test", Direction.FIRST), "A, 42; B, 53");
+ LogData data = parser.parse(new StreamId("test"), "A, 42; B, 53");
List body = data.getBody();
Assertions.assertEquals(1, body.size(), () -> "Unexpected strings: " + body);
Assertions.assertEquals(
diff --git a/src/test/java/com/exactpro/th2/readlog/cfg/TestLogReaderConfiguration.java b/src/test/java/com/exactpro/th2/readlog/cfg/TestLogReaderConfiguration.java
index f0628dc..6a36e73 100644
--- a/src/test/java/com/exactpro/th2/readlog/cfg/TestLogReaderConfiguration.java
+++ b/src/test/java/com/exactpro/th2/readlog/cfg/TestLogReaderConfiguration.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
+import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
@@ -38,11 +39,18 @@ void deserializes() throws IOException {
LogReaderConfiguration cfg = LogReaderConfiguration.MAPPER.readValue(input, LogReaderConfiguration.class);
assertEquals(Duration.ofSeconds(5), cfg.getPullingInterval());
assertEquals(Set.of("A", "B"), cfg.getAliases().keySet());
- assertEquals(Set.of(Direction.FIRST, Direction.SECOND), cfg.getAliases().get("A").getDirectionToPattern().keySet());
- assertEquals(Set.of(Direction.FIRST), cfg.getAliases().get("B").getDirectionToPattern().keySet());
- assertEquals(".*", Objects.requireNonNull(cfg.getAliases().get("A").getRegexp()).pattern());
- assertEquals("202.*$", Objects.requireNonNull(cfg.getAliases().get("B").getTimestampRegexp()).pattern());
- assertEquals(DateTimeFormatter.ofPattern("yyyy.MM.dd").getResolverFields(), Objects.requireNonNull(cfg.getAliases().get("B").getTimestampFormat()).getResolverFields());
+ AliasConfiguration aliasA = cfg.getAliases().get("A");
+ AliasConfiguration aliasB = cfg.getAliases().get("B");
+ assertEquals(Set.of(Direction.FIRST, Direction.SECOND), aliasA.getDirectionToPattern().keySet());
+ assertEquals(Set.of(Direction.FIRST), aliasB.getDirectionToPattern().keySet());
+ assertEquals(".*", Objects.requireNonNull(aliasA.getRegexp()).pattern());
+ assertEquals("202.*$", Objects.requireNonNull(aliasB.getTimestampRegexp()).pattern());
+ assertEquals(DateTimeFormatter.ofPattern("yyyy.MM.dd").getResolverFields(), Objects.requireNonNull(aliasB.getTimestampFormat()).getResolverFields());
+ assertEquals(
+ Instant.parse("2022-10-31T10:35:00Z"),
+ aliasB.getSkipBefore(),
+ "unexpected 'skipBefore' value"
+ );
}
}
diff --git a/src/test/resources/test_cfg.json b/src/test/resources/test_cfg.json
index d49ad34..99d767b 100644
--- a/src/test/resources/test_cfg.json
+++ b/src/test/resources/test_cfg.json
@@ -13,7 +13,8 @@
"regexp": ".*",
"pathFilter": "fileB.*\\.log",
"timestampRegexp": "202.*$",
- "timestampFormat": "yyyy.MM.dd"
+ "timestampFormat": "yyyy.MM.dd",
+ "skipBefore": "2022-10-31T10:35:00Z"
}
},
"common": {