Skip to content

Commit

Permalink
[Ts-2095] Fixed problem - connect reorders sequence numbers when send…
Browse files Browse the repository at this point in the history
…ing messages in parallel mode by one session alias. (#30)

* refactored logging
  • Loading branch information
Nikita-Smirnov-Exactpro authored Feb 14, 2024
1 parent bb6b448 commit 859f99c
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 31 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: "Run integration tests"

on:
push:
branches:
- '*'

jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 'zulu' '11'
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: '11'
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Build with Gradle
run: ./gradlew --info clean integrationTest
- uses: actions/upload-artifact@v3
if: failure()
with:
name: integration-test-results
path: build/reports/tests/integrationTest/
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# HTTP Client v2.2.0
# HTTP Client v2.2.1

This microservice allows performing HTTP requests and receive HTTP responses. It also can perform basic authentication

Expand Down Expand Up @@ -177,6 +177,10 @@ spec:

## Changelog

### v2.2.1

* Fixed problem - connect reorders sequence numbers when sending messages in parallel mode by one session alias.

### v2.2.0

* Puts unique `th2-request-id` property to metadata of request/response messages
Expand All @@ -186,7 +190,8 @@ spec:

* Supports th2 transport protocol
* Use event / message batcher
* Support batching by session group or session alias + direction
* messages are batched by session group
* events are batched by session scope

* owasp upgrade to `8.2.1`
* th2-common upgrade to `5.3.0`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kotlin.code.style=official
release_version=2.2.0
release_version=2.2.1
description='HTTP Client'
vcs_url=https://github.com/th2-net/th2-conn-http-client
app_main_class=com.exactpro.th2.http.client.Main
39 changes: 27 additions & 12 deletions src/main/kotlin/com/exactpro/th2/http/client/Application.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,8 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

private const val SEND_PIN_ATTRIBUTE = "send"
internal const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE
Expand Down Expand Up @@ -110,6 +112,10 @@ class Application(
}

fun start() {
// component supported multithreading sending via single http client.
// increment sequence and putting into message batcher should be executed atomically.
val incomingLock = ReentrantLock()
val outgoingLock = ReentrantLock()
val incomingSequence = createSequence()
val outgoingSequence = createSequence()

Expand Down Expand Up @@ -140,20 +146,24 @@ class Application(
.also { registerResource("transport message batcher", it::close) }

onRequest = { request: RawHttpRequest ->
val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence())

messageBatcher.onMessage(rawMessage, sessionGroup)
val rawMessage = outgoingLock.withLock {
request.toTransportMessage(sessionAlias, outgoingSequence()).also {
messageBatcher.onMessage(it, sessionGroup)
}
}
eventBatcher.storeEvent(
rawMessage.eventId?.toProto() ?: rootEventId,
"Sent HTTP request",
"Send message"
)
}
onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> ->
messageBatcher.onMessage(
response.toTransportMessage(sessionAlias, incomingSequence(), request),
sessionGroup
)
incomingLock.withLock {
messageBatcher.onMessage(
response.toTransportMessage(sessionAlias, incomingSequence(), request),
sessionGroup
)
}
stateManager.onResponse(response)
}
} else {
Expand All @@ -167,17 +177,22 @@ class Application(
}.also { registerResource("proto message batcher", it::close) }

onRequest = { request: RawHttpRequest ->
val rawMessage = request.toProtoMessage(connectionId, outgoingSequence())

messageBatcher.onMessage(rawMessage)
val rawMessage = outgoingLock.withLock {
request.toProtoMessage(connectionId, outgoingSequence())
.also(messageBatcher::onMessage)
}
eventBatcher.storeEvent(
if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId,
"Sent HTTP request",
"Send message"
)
}
onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> ->
messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request))
incomingLock.withLock {
messageBatcher.onMessage(
response.toProtoMessage(connectionId, incomingSequence(), request)
)
}
stateManager.onResponse(response)
}
}
Expand Down
27 changes: 24 additions & 3 deletions src/main/kotlin/com/exactpro/th2/http/client/ClientOptions.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package com.exactpro.th2.http.client

import mu.KLogger
import mu.KotlinLogging
import rawhttp.core.EagerHttpResponse
import rawhttp.core.RawHttpRequest
Expand Down Expand Up @@ -71,13 +72,21 @@ internal class ClientOptions(
}

override fun onRequest(httpRequest: RawHttpRequest): RawHttpRequest {
logger.info { "Sending request: $httpRequest" }
logger.log(
infoMsg = { "Sending request to URL: ${httpRequest.uri}" },
debugMsg = { "Sending request: $httpRequest" },
)
httpRequest.runCatching(onRequest).onFailure { logger.error(it) { "Failed to execute onRequest hook" } }
return httpRequest
}

override fun onResponse(socket: Socket, uri: URI, httpResponse: RawHttpResponse<Void>): EagerHttpResponse<Void> = try {
httpResponse.eagerly().also { logger.info { "Received response on socket '$socket': $it" } }
httpResponse.eagerly().also {
logger.log(
infoMsg = { "Received response on local port '${socket.localPort}' from URL: $uri" },
debugMsg = { "Received response on socket '$socket': $it" },
)
}
} catch (e: Throwable) {
throw IllegalStateException("Cannot read http response eagerly during onResponse call", e)
} finally {
Expand Down Expand Up @@ -163,4 +172,16 @@ internal class ClientOptions(
logger.warn(error) { "Cannot close socket: $this" }
}
}

companion object {
private fun KLogger.log(
infoMsg: (() -> Any?)? = null,
debugMsg: (() -> Any?)? = null,
) {
when {
debugMsg != null && isDebugEnabled -> debug(debugMsg)
infoMsg != null && isInfoEnabled -> info(infoMsg)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ import org.junit.jupiter.api.Test
import strikt.api.Assertion
import strikt.api.expectThat
import strikt.assertions.all
import strikt.assertions.any
import strikt.assertions.isEmpty
import strikt.assertions.isEqualTo
import strikt.assertions.isFalse
import strikt.assertions.isNotNull
import strikt.assertions.matches
import strikt.assertions.single
import strikt.assertions.withElementAt
import java.time.Duration.ofSeconds
import java.time.Instant
import kotlin.test.assertNotNull

@IntegrationTest
@Th2IntegrationTest
class ApplicationIntegrationTest {
class InvalidApplicationIntegrationTest {
@JvmField
@Suppress("unused")
internal val customConfig = CustomConfigSpec.fromString(
Expand Down Expand Up @@ -268,17 +268,13 @@ class ApplicationIntegrationTest {
)
}
}
withElementAt(0) {
get { getEvents(0) }.and {
get { id }.get { scope }.isEqualTo(eventIdA.scope)
get { parentId }.isEqualTo(eventIdA.toProto())
}

// conn processes message in different thread and events can be reordered
any {
isIdEqualTo(eventIdA)
}
withElementAt(1) {
get { getEvents(0) }.and {
get { id }.get { scope }.isEqualTo(eventIdB.scope)
get { parentId }.isEqualTo(eventIdB.toProto())
}
any {
isIdEqualTo(eventIdB)
}
}
}
Expand Down Expand Up @@ -322,5 +318,12 @@ class ApplicationIntegrationTest {
get { type }.isEqualTo("Microservice")
get { status }.isEqualTo(EventStatus.SUCCESS)
}

fun Assertion.Builder<EventBatch>.isIdEqualTo(eventId: EventId) {
get { eventsList }.single().and {
get { id }.get { scope }.isEqualTo(eventId.scope)
get { parentId }.isEqualTo(eventId.toProto())
}
}
}
}
Loading

0 comments on commit 859f99c

Please sign in to comment.