diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml new file mode 100644 index 0000000..1e13745 --- /dev/null +++ b/.github/workflows/integration-tests.yml @@ -0,0 +1,26 @@ +name: "Run integration tests" + +on: + push: + branches: + - '*' + +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 'zulu' '11' + uses: actions/setup-java@v3 + with: + distribution: 'zulu' + java-version: '11' + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + - name: Build with Gradle + run: ./gradlew --info clean integrationTest + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: integration-test-results + path: build/reports/tests/integrationTest/ diff --git a/README.md b/README.md index 542c703..1d4cc16 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# HTTP Client v2.2.0 +# HTTP Client v2.2.1 This microservice allows performing HTTP requests and receive HTTP responses. It also can perform basic authentication @@ -177,6 +177,10 @@ spec: ## Changelog +### v2.2.1 + +* Fixed problem - connect reorders sequence numbers when sending messages in parallel mode by one session alias. + ### v2.2.0 * Puts unique `th2-request-id` property to metadata of request/response messages @@ -186,7 +190,8 @@ spec: * Supports th2 transport protocol * Use event / message batcher - * Support batching by session group or session alias + direction + * messages are batched by session group + * events are batched by session scope * owasp upgrade to `8.2.1` * th2-common upgrade to `5.3.0` diff --git a/gradle.properties b/gradle.properties index e31f7dd..e69dbcd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ kotlin.code.style=official -release_version=2.2.0 +release_version=2.2.1 description='HTTP Client' vcs_url=https://github.com/th2-net/th2-conn-http-client app_main_class=com.exactpro.th2.http.client.Main \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/http/client/Application.kt b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt index 085cf1a..3eff5d3 100644 --- a/src/main/kotlin/com/exactpro/th2/http/client/Application.kt +++ b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,8 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock private const val SEND_PIN_ATTRIBUTE = "send" internal const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE @@ -110,6 +112,10 @@ class Application( } fun start() { + // component supported multithreading sending via single http client. + // increment sequence and putting into message batcher should be executed atomically. + val incomingLock = ReentrantLock() + val outgoingLock = ReentrantLock() val incomingSequence = createSequence() val outgoingSequence = createSequence() @@ -140,9 +146,11 @@ class Application( .also { registerResource("transport message batcher", it::close) } onRequest = { request: RawHttpRequest -> - val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence()) - - messageBatcher.onMessage(rawMessage, sessionGroup) + val rawMessage = outgoingLock.withLock { + request.toTransportMessage(sessionAlias, outgoingSequence()).also { + messageBatcher.onMessage(it, sessionGroup) + } + } eventBatcher.storeEvent( rawMessage.eventId?.toProto() ?: rootEventId, "Sent HTTP request", @@ -150,10 +158,12 @@ class Application( ) } onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage( - response.toTransportMessage(sessionAlias, incomingSequence(), request), - sessionGroup - ) + incomingLock.withLock { + messageBatcher.onMessage( + response.toTransportMessage(sessionAlias, incomingSequence(), request), + sessionGroup + ) + } stateManager.onResponse(response) } } else { @@ -167,9 +177,10 @@ class Application( }.also { registerResource("proto message batcher", it::close) } onRequest = { request: RawHttpRequest -> - val rawMessage = request.toProtoMessage(connectionId, outgoingSequence()) - - messageBatcher.onMessage(rawMessage) + val rawMessage = outgoingLock.withLock { + request.toProtoMessage(connectionId, outgoingSequence()) + .also(messageBatcher::onMessage) + } eventBatcher.storeEvent( if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId, "Sent HTTP request", @@ -177,7 +188,11 @@ class Application( ) } onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request)) + incomingLock.withLock { + messageBatcher.onMessage( + response.toProtoMessage(connectionId, incomingSequence(), request) + ) + } stateManager.onResponse(response) } } diff --git a/src/main/kotlin/com/exactpro/th2/http/client/ClientOptions.kt b/src/main/kotlin/com/exactpro/th2/http/client/ClientOptions.kt index 912a7d5..dc1f46b 100644 --- a/src/main/kotlin/com/exactpro/th2/http/client/ClientOptions.kt +++ b/src/main/kotlin/com/exactpro/th2/http/client/ClientOptions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package com.exactpro.th2.http.client +import mu.KLogger import mu.KotlinLogging import rawhttp.core.EagerHttpResponse import rawhttp.core.RawHttpRequest @@ -71,13 +72,21 @@ internal class ClientOptions( } override fun onRequest(httpRequest: RawHttpRequest): RawHttpRequest { - logger.info { "Sending request: $httpRequest" } + logger.log( + infoMsg = { "Sending request to URL: ${httpRequest.uri}" }, + debugMsg = { "Sending request: $httpRequest" }, + ) httpRequest.runCatching(onRequest).onFailure { logger.error(it) { "Failed to execute onRequest hook" } } return httpRequest } override fun onResponse(socket: Socket, uri: URI, httpResponse: RawHttpResponse): EagerHttpResponse = try { - httpResponse.eagerly().also { logger.info { "Received response on socket '$socket': $it" } } + httpResponse.eagerly().also { + logger.log( + infoMsg = { "Received response on local port '${socket.localPort}' from URL: $uri" }, + debugMsg = { "Received response on socket '$socket': $it" }, + ) + } } catch (e: Throwable) { throw IllegalStateException("Cannot read http response eagerly during onResponse call", e) } finally { @@ -163,4 +172,16 @@ internal class ClientOptions( logger.warn(error) { "Cannot close socket: $this" } } } + + companion object { + private fun KLogger.log( + infoMsg: (() -> Any?)? = null, + debugMsg: (() -> Any?)? = null, + ) { + when { + debugMsg != null && isDebugEnabled -> debug(debugMsg) + infoMsg != null && isInfoEnabled -> info(infoMsg) + } + } + } } diff --git a/src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/http/client/InvalidApplicationIntegrationTest.kt similarity index 95% rename from src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt rename to src/test/kotlin/com/exactpro/th2/http/client/InvalidApplicationIntegrationTest.kt index d54dccc..4448e95 100644 --- a/src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/http/client/InvalidApplicationIntegrationTest.kt @@ -49,20 +49,20 @@ import org.junit.jupiter.api.Test import strikt.api.Assertion import strikt.api.expectThat import strikt.assertions.all +import strikt.assertions.any import strikt.assertions.isEmpty import strikt.assertions.isEqualTo import strikt.assertions.isFalse import strikt.assertions.isNotNull import strikt.assertions.matches import strikt.assertions.single -import strikt.assertions.withElementAt import java.time.Duration.ofSeconds import java.time.Instant import kotlin.test.assertNotNull @IntegrationTest @Th2IntegrationTest -class ApplicationIntegrationTest { +class InvalidApplicationIntegrationTest { @JvmField @Suppress("unused") internal val customConfig = CustomConfigSpec.fromString( @@ -268,17 +268,13 @@ class ApplicationIntegrationTest { ) } } - withElementAt(0) { - get { getEvents(0) }.and { - get { id }.get { scope }.isEqualTo(eventIdA.scope) - get { parentId }.isEqualTo(eventIdA.toProto()) - } + + // conn processes message in different thread and events can be reordered + any { + isIdEqualTo(eventIdA) } - withElementAt(1) { - get { getEvents(0) }.and { - get { id }.get { scope }.isEqualTo(eventIdB.scope) - get { parentId }.isEqualTo(eventIdB.toProto()) - } + any { + isIdEqualTo(eventIdB) } } } @@ -322,5 +318,12 @@ class ApplicationIntegrationTest { get { type }.isEqualTo("Microservice") get { status }.isEqualTo(EventStatus.SUCCESS) } + + fun Assertion.Builder.isIdEqualTo(eventId: EventId) { + get { eventsList }.single().and { + get { id }.get { scope }.isEqualTo(eventId.scope) + get { parentId }.isEqualTo(eventId.toProto()) + } + } } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/http/client/ValidApplicationIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/http/client/ValidApplicationIntegrationTest.kt new file mode 100644 index 0000000..8d78795 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/http/client/ValidApplicationIntegrationTest.kt @@ -0,0 +1,193 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.http.client + +import com.exactpro.th2.common.schema.factory.CommonFactory +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction.INCOMING +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction.OUTGOING +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_ATTRIBUTE +import com.exactpro.th2.common.utils.message.transport.toGroup +import com.exactpro.th2.http.client.annotations.IntegrationTest +import com.exactpro.th2.test.annotations.Th2AppFactory +import com.exactpro.th2.test.annotations.Th2IntegrationTest +import com.exactpro.th2.test.annotations.Th2TestFactory +import com.exactpro.th2.test.extension.CleanupExtension +import com.exactpro.th2.test.queue.CollectorMessageListener +import com.exactpro.th2.test.spec.CustomConfigSpec +import com.exactpro.th2.test.spec.RabbitMqSpec +import com.exactpro.th2.test.spec.pin +import com.exactpro.th2.test.spec.pins +import com.exactpro.th2.test.spec.publishers +import com.exactpro.th2.test.spec.subscribers +import mu.KotlinLogging +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout +import rawhttp.core.RawHttp +import rawhttp.core.server.TcpRawHttpServer +import java.time.Duration.ofSeconds +import java.time.Instant +import java.util.EnumMap +import java.util.Optional +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +@IntegrationTest +@Th2IntegrationTest +class ValidApplicationIntegrationTest { + @JvmField + @Suppress("unused") + internal val customConfig = CustomConfigSpec.fromString( + """ + { + "host": "127.0.0.1", + "port": $SERVER_PORT, + "validateCertificates": false, + "sessionAlias": "test-session-alias", + "maxParallelRequests": $MAX_PARALLEL_REQUESTS, + "useTransport": true + } + """.trimIndent() + ) + + @JvmField + @Suppress("unused") + internal val mq = RabbitMqSpec.create() + .pins { + subscribers { + pin("sub") { + attributes(INPUT_QUEUE_TRANSPORT_ATTRIBUTE, TRANSPORT_GROUP_ATTRIBUTE) + } + } + publishers { + pin("pub") { + attributes(TRANSPORT_GROUP_ATTRIBUTE) + } + } + } + + @Test + @Timeout(30) + fun `sequence order test`( + @Th2AppFactory appFactory: CommonFactory, + @Th2TestFactory testFactory: CommonFactory, + resources: CleanupExtension.Registry, + ) { + val iterations = 1_000 + val messageListener = CollectorMessageListener.createWithCapacity(iterations) + testFactory.transportGroupBatchRouter.subscribe(messageListener, "pub") + + val application = Application(appFactory) { resource, destructor -> + resources.add(resource, destructor) + } + + application.start() + + val messageGroup = RawMessage.builder().apply { + idBuilder() + .setSessionAlias(SESSION_ALIAS_TEST) + .setTimestamp(Instant.now()) + .setDirection(OUTGOING) + .setSequence(1) + }.build().toGroup() + + val groupBatch = GroupBatch.builder().apply { + setBook(BOOK_TEST) + setSessionGroup(SESSION_GROUP_TEST) + groupsBuilder().apply { + repeat(iterations) { + add(messageGroup) + } + } + }.build() + + testFactory.transportGroupBatchRouter.send(groupBatch, "sub") + + val messageCounter = EnumMap(Direction::class.java) + val sequences = EnumMap(Direction::class.java) + while (messageCounter[INCOMING] != iterations && messageCounter[OUTGOING] != iterations) { + val batch = assertNotNull( + messageListener.poll(ofSeconds(2)), + "Batch not null, messages: $messageCounter" + ) + + batch.groups.forEach { group -> + group.messages.forEach { message -> + with(message.id) { + messageCounter.merge(direction, 1, Int::plus) + val previous = sequences.put(direction, sequence) ?: 0 + assertTrue( + previous < sequence, + """ + Decrease sequence + direction: $direction + previous: $previous + current: $sequence + messages: $messageCounter + """.trimIndent() + ) + } + } + } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + + private const val BOOK_TEST = "test-book" + private const val SESSION_ALIAS_TEST = "test-session-alias" + private const val SESSION_GROUP_TEST = "test-session-group" + + private const val MAX_PARALLEL_REQUESTS = 5 + private const val SERVER_PORT = 8086 + private const val BODY = + """{ "id" : 901, "name" : { "first":"Tom", "middle":"and", "last":"Jerry" }, "phones" : [ {"type" : "home", "number" : "1233333" }, {"type" : "work", "number" : "264444" }], "lazy" : false, "married" : null }""" + private val RESPONSE_DATA = """ + HTTP/1.1 200 OK + Content-Type: plain/text + Content-Length: ${BODY.length} + + $BODY + """.trimIndent() + + private val SERVER_RESPONSE_COUNTER = AtomicInteger(0) + + private val SERVER = TcpRawHttpServer(SERVER_PORT) + + @BeforeAll + @JvmStatic + fun setUp() { + SERVER.start { + LOGGER.debug { "Received request: ${it.eagerly().startLine}" } + SERVER_RESPONSE_COUNTER.incrementAndGet() + Optional.of(RawHttp().parseResponse(RESPONSE_DATA)) + } + } + + @AfterAll + @JvmStatic + fun finish() { + SERVER.stop() + } + } +} \ No newline at end of file diff --git a/src/test/resources/log4j2.properties b/src/test/resources/log4j2.properties index 232679a..4d7c3a7 100644 --- a/src/test/resources/log4j2.properties +++ b/src/test/resources/log4j2.properties @@ -5,4 +5,7 @@ appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{dd MMM yyyy HH:mm:ss,SSS} %-6p [%-15t] %c - %m%n rootLogger.level = INFO -rootLogger.appenderRef.stdout.ref = ConsoleLogger \ No newline at end of file +rootLogger.appenderRef.stdout.ref = ConsoleLogger + +logger.client.name=com.exactpro.th2.http.client.ClientOptions +logger.client.level=DEBUG \ No newline at end of file