From bb6b448c0fe8d1e623388a702cdc07f9e076d4a3 Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Thu, 28 Dec 2023 13:41:16 +0400 Subject: [PATCH] Added integration test (#29) --- build.gradle | 15 + .../exactpro/th2/http/client/Application.kt | 333 +++++++++++++++++ .../com/exactpro/th2/http/client/Main.kt | 339 +----------------- .../http/client/ApplicationIntegrationTest.kt | 326 +++++++++++++++++ .../client/annotations/IntegrationTest.kt | 22 ++ src/test/resources/log4j2.properties | 8 + 6 files changed, 715 insertions(+), 328 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/http/client/Application.kt create mode 100644 src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt create mode 100644 src/test/kotlin/com/exactpro/th2/http/client/annotations/IntegrationTest.kt create mode 100644 src/test/resources/log4j2.properties diff --git a/build.gradle b/build.gradle index c03df2d..cea3bec 100644 --- a/build.gradle +++ b/build.gradle @@ -30,6 +30,21 @@ dependencies { compileOnly "com.google.auto.service:auto-service:1.1.1" annotationProcessor "com.google.auto.service:auto-service:1.1.1" kapt "com.google.auto.service:auto-service:1.1.1" + + testImplementation 'com.exactpro.th2:junit-jupiter-integration:0.0.1-master-6956603819-5241ee5-SNAPSHOT' +} + +test { + useJUnitPlatform { + excludeTags('integration-test') + } +} + +tasks.register('integrationTest', Test) { + group = 'verification' + useJUnitPlatform { + includeTags('integration-test') + } } dependencyLocking { diff --git a/src/main/kotlin/com/exactpro/th2/http/client/Application.kt b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt new file mode 100644 index 0000000..085cf1a --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt @@ -0,0 +1,333 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.http.client + +import com.exactpro.th2.common.grpc.EventBatch +import com.exactpro.th2.common.grpc.EventID +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.schema.factory.CommonFactory +import com.exactpro.th2.common.schema.message.MessageListener +import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.common.utils.event.EventBatcher +import com.exactpro.th2.common.utils.event.storeEvent +import com.exactpro.th2.common.utils.event.transport.toProto +import com.exactpro.th2.common.utils.message.RAW_GROUP_SELECTOR +import com.exactpro.th2.common.utils.message.RawMessageBatcher +import com.exactpro.th2.common.utils.message.parentEventIds +import com.exactpro.th2.common.utils.message.transport.MessageBatcher +import com.exactpro.th2.common.utils.message.transport.MessageBatcher.Companion.GROUP_SELECTOR +import com.exactpro.th2.common.utils.message.transport.eventIds +import com.exactpro.th2.common.utils.shutdownGracefully +import com.exactpro.th2.http.client.api.IAuthSettings +import com.exactpro.th2.http.client.api.IAuthSettingsTypeProvider +import com.exactpro.th2.http.client.api.IRequestHandler +import com.exactpro.th2.http.client.api.IRequestHandler.RequestHandlerContext +import com.exactpro.th2.http.client.api.IStateManager +import com.exactpro.th2.http.client.api.IStateManager.StateManagerContext +import com.exactpro.th2.http.client.api.impl.AuthSettingsDeserializer +import com.exactpro.th2.http.client.api.impl.BasicAuthSettingsTypeProvider +import com.exactpro.th2.http.client.api.impl.BasicRequestHandler +import com.exactpro.th2.http.client.api.impl.BasicStateManager +import com.exactpro.th2.http.client.util.Certificate +import com.exactpro.th2.http.client.util.CertificateConverter +import com.exactpro.th2.http.client.util.PrivateKeyConverter +import com.exactpro.th2.http.client.util.toPrettyString +import com.exactpro.th2.http.client.util.toProtoMessage +import com.exactpro.th2.http.client.util.toTransportMessage +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.module.kotlin.KotlinFeature +import com.fasterxml.jackson.module.kotlin.KotlinModule +import com.google.common.util.concurrent.ThreadFactoryBuilder +import mu.KotlinLogging +import rawhttp.core.RawHttpRequest +import rawhttp.core.RawHttpResponse +import java.security.PrivateKey +import java.security.cert.X509Certificate +import java.time.Instant +import java.util.ServiceLoader +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.atomic.AtomicLong + +private const val SEND_PIN_ATTRIBUTE = "send" +internal const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE +private val INPUT_QUEUE_PROTO_ATTRIBUTES = arrayOf(SEND_PIN_ATTRIBUTE, "group") + +class Application( + factory: CommonFactory, + private val registerResource: (name: String, destructor: () -> Unit) -> Unit, +) { + private val stateManager = load(BasicStateManager::class.java) + private val requestHandler = load(BasicRequestHandler::class.java) + private val authSettingsType = load(BasicAuthSettingsTypeProvider::class.java).type + + private val settings: Settings + private val eventRouter: MessageRouter = factory.eventBatchRouter + private val protoMR: MessageRouter = factory.messageRouterMessageGroupBatch + private val transportMR: MessageRouter = factory.transportGroupBatchRouter + private val rootEventId: EventID = factory.rootEventId + + init { + val mapper = JsonMapper.builder() + .addModule( + KotlinModule.Builder() + .withReflectionCacheSize(512) + .configure(KotlinFeature.NullToEmptyCollection, false) + .configure(KotlinFeature.NullToEmptyMap, false) + .configure(KotlinFeature.NullIsSameAsDefault, true) + .configure(KotlinFeature.SingletonSupport, true) + .configure(KotlinFeature.StrictNullChecks, false) + .build() + ) + .addModule( + SimpleModule().addDeserializer( + IAuthSettings::class.java, + AuthSettingsDeserializer(authSettingsType) + ) + ) + .build() + + settings = factory.getCustomConfiguration(Settings::class.java, mapper) + } + + fun start() { + val incomingSequence = createSequence() + val outgoingSequence = createSequence() + + val onRequest: (RawHttpRequest) -> Unit + val onResponse: (RawHttpRequest, RawHttpResponse<*>) -> Unit + + val executor = Executors.newSingleThreadScheduledExecutor() + registerResource("message batch executor") { executor.shutdownGracefully() } + + with(settings) { + val book = rootEventId.bookName + val sessionGroup = sessionAlias + + val eventBatcher = EventBatcher( + maxBatchSizeInItems = maxBatchSize, + executor = executor, + maxFlushTime = maxFlushTime, + onBatch = eventRouter::send + ).also { registerResource("event batcher", it::close) } + + val onError: (Throwable) -> Unit = { + eventBatcher.storeEvent(rootEventId, "Batching problem: ${it.message}", "Message batching problem", it) + } + + if (useTransport) { + val messageBatcher = + MessageBatcher(maxBatchSize, maxFlushTime, book, GROUP_SELECTOR, executor, onError, transportMR::send) + .also { registerResource("transport message batcher", it::close) } + + onRequest = { request: RawHttpRequest -> + val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence()) + + messageBatcher.onMessage(rawMessage, sessionGroup) + eventBatcher.storeEvent( + rawMessage.eventId?.toProto() ?: rootEventId, + "Sent HTTP request", + "Send message" + ) + } + onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> + messageBatcher.onMessage( + response.toTransportMessage(sessionAlias, incomingSequence(), request), + sessionGroup + ) + stateManager.onResponse(response) + } + } else { + val connectionId = com.exactpro.th2.common.grpc.ConnectionID.newBuilder() + .setSessionAlias(sessionAlias) + .setSessionGroup(sessionGroup) + .build() + + val messageBatcher = RawMessageBatcher(maxBatchSize, maxFlushTime, RAW_GROUP_SELECTOR, executor, onError) { + protoMR.send(it, com.exactpro.th2.common.schema.message.QueueAttribute.RAW.value) + }.also { registerResource("proto message batcher", it::close) } + + onRequest = { request: RawHttpRequest -> + val rawMessage = request.toProtoMessage(connectionId, outgoingSequence()) + + messageBatcher.onMessage(rawMessage) + eventBatcher.storeEvent( + if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId, + "Sent HTTP request", + "Send message" + ) + } + onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> + messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request)) + stateManager.onResponse(response) + } + } + val client = HttpClient( + https, + host, + port, + readTimeout, + keepAliveTimeout, + maxParallelRequests, + defaultHeaders, + stateManager::prepareRequest, + onRequest, + onResponse, + stateManager::onStart, + stateManager::onStop, + validateCertificates, + certificate + ).apply { registerResource("client", ::close) } + + stateManager.runCatching { + registerResource("state-manager", ::close) + init(StateManagerContext(client, auth)) + }.onFailure { + LOGGER.error(it) { "Failed to init state manager" } + eventBatcher.storeEvent(rootEventId, "Failed to init state manager", "Error", it) + throw it + } + + requestHandler.runCatching { + registerResource("request-handler", ::close) + init(RequestHandlerContext(client)) + }.onFailure { + LOGGER.error(it) { "Failed to init request handler" } + eventBatcher.storeEvent(rootEventId, "Failed to init request handler", "Error", it) + throw it + } + + val sendService: ExecutorService = createExecutorService(maxParallelRequests) + + val proto = runCatching { + val listener = MessageListener { _, message -> + message.groupsList.forEach { group -> + sendService.submit { + group.runCatching(requestHandler::onRequest).recoverCatching { error -> + LOGGER.error(error) { "Failed to handle protobuf message group: ${group.toPrettyString()}" } + group.parentEventIds.ifEmpty { sequenceOf(rootEventId) }.forEach { + eventBatcher.storeEvent( + it, + "Failed to handle protobuf message group", + "Error", + error + ) + } + } + } + } + } + checkNotNull(protoMR.subscribe(listener, *INPUT_QUEUE_PROTO_ATTRIBUTES)) + }.onSuccess { monitor -> + registerResource("proto-raw-monitor", monitor::unsubscribe) + }.onFailure { + LOGGER.warn(it) { "Failed to subscribe to input protobuf queue" } + } + + val transport = runCatching { + val listener = MessageListener { _, message -> + message.groups.forEach { group -> + sendService.submit { + group.runCatching(requestHandler::onRequest).recoverCatching { error -> + LOGGER.error(error) { "Failed to handle transport message group: $group" } + group.eventIds.map(com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.EventId::toProto).ifEmpty { sequenceOf(rootEventId) }.forEach { + eventBatcher.storeEvent( + it, + "Failed to handle transport message group", + "Error", + error + ) + } + } + } + } + } + checkNotNull(transportMR.subscribe(listener, INPUT_QUEUE_TRANSPORT_ATTRIBUTE)) + }.onSuccess { monitor -> + registerResource("transport-raw-monitor", monitor::unsubscribe) + }.onFailure { + LOGGER.warn(it) { "Failed to subscribe to input transport queue" } + } + + if (proto.isFailure && transport.isFailure) { + error("Subscribe pin should be declared at least one of protobuf or transport protocols") + } + + client.runCatching(HttpClient::start).onFailure { + throw IllegalStateException("Failed to start client", it) + } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} + +data class Settings( + val https: Boolean = false, + val host: String, + val port: Int = if (https) 443 else 80, + val readTimeout: Int = 5000, + val maxParallelRequests: Int = 5, + val keepAliveTimeout: Long = 15000, + val defaultHeaders: Map> = emptyMap(), + val sessionAlias: String, + val auth: IAuthSettings? = null, + val validateCertificates: Boolean = true, + val useTransport: Boolean = false, + val batcherThreads: Int = 2, + val maxBatchSize: Int = 1000, + val maxFlushTime: Long = 1000, + @JsonDeserialize(converter = CertificateConverter::class) val clientCertificate: X509Certificate? = null, + @JsonDeserialize(converter = PrivateKeyConverter::class) val certificatePrivateKey: PrivateKey? = null, +) { + @JsonIgnore + val certificate: Certificate? = clientCertificate?.run { + requireNotNull(certificatePrivateKey) { + "'${::clientCertificate.name}' setting requires '${::certificatePrivateKey.name}' setting to be set" + } + + Certificate(clientCertificate, certificatePrivateKey) + } +} + +private inline fun load(defaultImpl: Class): T { + val instances = ServiceLoader.load(T::class.java).toList() + + return when (instances.size) { + 0 -> error("No instances of ${T::class.simpleName}") + 1 -> instances.first() + 2 -> instances.first { !defaultImpl.isInstance(it) } + else -> error("More than 1 non-default instance of ${T::class.simpleName} has been found: $instances") + } +} + +private fun createSequence(): () -> Long = Instant.now().run { + AtomicLong(epochSecond * SECONDS.toNanos(1) + nano) +}::incrementAndGet + +private fun createExecutorService(maxCount: Int): ExecutorService = + Executors.newFixedThreadPool(maxCount, ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("th2-http-client-%d") + .build()) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/http/client/Main.kt b/src/main/kotlin/com/exactpro/th2/http/client/Main.kt index 9d5d0f2..e51b3c8 100644 --- a/src/main/kotlin/com/exactpro/th2/http/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/http/client/Main.kt @@ -18,70 +18,15 @@ package com.exactpro.th2.http.client -import com.exactpro.th2.common.grpc.ConnectionID -import com.exactpro.th2.common.grpc.EventBatch -import com.exactpro.th2.common.grpc.EventID -import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.schema.factory.CommonFactory -import com.exactpro.th2.common.schema.message.MessageListener -import com.exactpro.th2.common.schema.message.MessageRouter -import com.exactpro.th2.common.schema.message.QueueAttribute.RAW -import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.EventId -import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch -import com.exactpro.th2.common.utils.event.EventBatcher -import com.exactpro.th2.common.utils.event.storeEvent -import com.exactpro.th2.common.utils.event.transport.toProto -import com.exactpro.th2.common.utils.message.RAW_GROUP_SELECTOR -import com.exactpro.th2.common.utils.message.RawMessageBatcher -import com.exactpro.th2.common.utils.message.parentEventIds -import com.exactpro.th2.common.utils.message.transport.MessageBatcher -import com.exactpro.th2.common.utils.message.transport.MessageBatcher.Companion.GROUP_SELECTOR -import com.exactpro.th2.common.utils.message.transport.eventIds -import com.exactpro.th2.common.utils.shutdownGracefully -import com.exactpro.th2.http.client.api.IAuthSettings -import com.exactpro.th2.http.client.api.IAuthSettingsTypeProvider -import com.exactpro.th2.http.client.api.IRequestHandler -import com.exactpro.th2.http.client.api.IRequestHandler.RequestHandlerContext -import com.exactpro.th2.http.client.api.IStateManager -import com.exactpro.th2.http.client.api.IStateManager.StateManagerContext -import com.exactpro.th2.http.client.api.impl.AuthSettingsDeserializer -import com.exactpro.th2.http.client.api.impl.BasicAuthSettingsTypeProvider -import com.exactpro.th2.http.client.api.impl.BasicRequestHandler -import com.exactpro.th2.http.client.api.impl.BasicStateManager -import com.exactpro.th2.http.client.util.Certificate -import com.exactpro.th2.http.client.util.CertificateConverter -import com.exactpro.th2.http.client.util.PrivateKeyConverter -import com.exactpro.th2.http.client.util.toPrettyString -import com.exactpro.th2.http.client.util.toProtoMessage -import com.exactpro.th2.http.client.util.toTransportMessage -import com.fasterxml.jackson.annotation.JsonIgnore -import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.module.SimpleModule -import com.fasterxml.jackson.module.kotlin.KotlinFeature -import com.fasterxml.jackson.module.kotlin.KotlinModule import mu.KotlinLogging -import rawhttp.core.RawHttpRequest -import rawhttp.core.RawHttpResponse -import java.security.PrivateKey -import java.security.cert.X509Certificate -import java.time.Instant -import java.util.ServiceLoader import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit.SECONDS -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.thread import kotlin.concurrent.withLock import kotlin.system.exitProcess private val LOGGER = KotlinLogging.logger { } -private const val SEND_PIN_ATTRIBUTE = "send" -private const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE -private val INPUT_QUEUE_PROTO_ATTRIBUTES = arrayOf(SEND_PIN_ATTRIBUTE, "group") fun main(args: Array) = try { val resources = ConcurrentLinkedDeque Unit>>() @@ -96,10 +41,6 @@ fun main(args: Array) = try { } }) - val stateManager = load(BasicStateManager::class.java) - val requestHandler = load(BasicRequestHandler::class.java) - val authSettingsType = load(BasicAuthSettingsTypeProvider::class.java).type - val factory = runCatching { CommonFactory.createFromArguments(*args) }.getOrElse { @@ -107,278 +48,20 @@ fun main(args: Array) = try { CommonFactory() }.apply { resources += "factory" to ::close } - val mapper = JsonMapper.builder() - .addModule( - KotlinModule.Builder() - .withReflectionCacheSize(512) - .configure(KotlinFeature.NullToEmptyCollection, false) - .configure(KotlinFeature.NullToEmptyMap, false) - .configure(KotlinFeature.NullIsSameAsDefault, true) - .configure(KotlinFeature.SingletonSupport, false) - .configure(KotlinFeature.StrictNullChecks, false) - .build() - ) - .addModule( - SimpleModule().addDeserializer( - IAuthSettings::class.java, - AuthSettingsDeserializer(authSettingsType) - ) - ) - .build() - - run( - factory.getCustomConfiguration(Settings::class.java, mapper), - factory.eventBatchRouter, - factory.messageRouterMessageGroupBatch, - factory.transportGroupBatchRouter, - stateManager, - requestHandler, - factory.rootEventId - ) { resource, destructor -> + Application(factory) { resource, destructor -> resources += resource to destructor - } -} catch (e: Exception) { - LOGGER.error(e) { "Uncaught exception. Shutting down" } - exitProcess(1) -} - -fun run( - settings: Settings, - eventRouter: MessageRouter, - protoMR: MessageRouter, - transportMR: MessageRouter, - stateManager: IStateManager, - requestHandler: IRequestHandler, - rootEventId: EventID, - registerResource: (name: String, destructor: () -> Unit) -> Unit, -) { - val incomingSequence = createSequence() - val outgoingSequence = createSequence() - - val onRequest: (RawHttpRequest) -> Unit - val onResponse: (RawHttpRequest, RawHttpResponse<*>) -> Unit - - val executor = Executors.newSingleThreadScheduledExecutor() - registerResource("message batch executor") { executor.shutdownGracefully() } - - with(settings) { - val book = rootEventId.bookName - val sessionGroup = sessionAlias - - val eventBatcher = EventBatcher( - maxBatchSizeInItems = maxBatchSize, - executor = executor, - maxFlushTime = maxFlushTime, - onBatch = eventRouter::send - ).also { registerResource("event batcher", it::close) } - - val onError: (Throwable) -> Unit = { - eventBatcher.storeEvent(rootEventId, "Batching problem: ${it.message}", "Message batching problem", it) - } - - if (useTransport) { - val messageBatcher = - MessageBatcher(maxBatchSize, maxFlushTime, book, GROUP_SELECTOR, executor, onError, transportMR::send) - .also { registerResource("transport message batcher", it::close) } - - onRequest = { request: RawHttpRequest -> - val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence()) - - messageBatcher.onMessage(rawMessage, sessionGroup) - eventBatcher.storeEvent( - rawMessage.eventId?.toProto() ?: rootEventId, - "Sent HTTP request", - "Send message" - ) - } - onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage( - response.toTransportMessage(sessionAlias, incomingSequence(), request), - sessionGroup - ) - stateManager.onResponse(response) - } - } else { - val connectionId = ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .setSessionGroup(sessionGroup) - .build() - - val messageBatcher = RawMessageBatcher(maxBatchSize, maxFlushTime, RAW_GROUP_SELECTOR, executor, onError) { - protoMR.send(it, RAW.value) - }.also { registerResource("proto message batcher", it::close) } - - onRequest = { request: RawHttpRequest -> - val rawMessage = request.toProtoMessage(connectionId, outgoingSequence()) - - messageBatcher.onMessage(rawMessage) - eventBatcher.storeEvent( - if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId, - "Sent HTTP request", - "Send message" - ) - } - onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request)) - stateManager.onResponse(response) - } - } - val client = HttpClient( - https, - host, - port, - readTimeout, - keepAliveTimeout, - maxParallelRequests, - defaultHeaders, - stateManager::prepareRequest, - onRequest, - onResponse, - stateManager::onStart, - stateManager::onStop, - validateCertificates, - certificate - ).apply { registerResource("client", ::close) } - - stateManager.runCatching { - registerResource("state-manager", ::close) - init(StateManagerContext(client, auth)) - }.onFailure { - LOGGER.error(it) { "Failed to init state manager" } - eventBatcher.storeEvent(rootEventId, "Failed to init state manager", "Error", it) - throw it - } - - requestHandler.runCatching { - registerResource("request-handler", ::close) - init(RequestHandlerContext(client)) - }.onFailure { - LOGGER.error(it) { "Failed to init request handler" } - eventBatcher.storeEvent(rootEventId, "Failed to init request handler", "Error", it) - throw it - } - - val sendService: ExecutorService = createExecutorService(maxParallelRequests) - - val proto = runCatching { - val listener = MessageListener { _, message -> - message.groupsList.forEach { group -> - sendService.submit { - group.runCatching(requestHandler::onRequest).recoverCatching { error -> - LOGGER.error(error) { "Failed to handle protobuf message group: ${group.toPrettyString()}" } - group.parentEventIds.ifEmpty { sequenceOf(rootEventId) }.forEach { - eventBatcher.storeEvent( - it, - "Failed to handle protobuf message group", - "Error", - error - ) - } - } - } - } - } - checkNotNull(protoMR.subscribe(listener, *INPUT_QUEUE_PROTO_ATTRIBUTES)) - }.onSuccess { monitor -> - registerResource("proto-raw-monitor", monitor::unsubscribe) - }.onFailure { - LOGGER.warn(it) { "Failed to subscribe to input protobuf queue" } - } - - val transport = runCatching { - val listener = MessageListener { _, message -> - message.groups.forEach { group -> - sendService.submit { - group.runCatching(requestHandler::onRequest).recoverCatching { error -> - LOGGER.error(error) { "Failed to handle transport message group: $group" } - group.eventIds.map(EventId::toProto).ifEmpty { sequenceOf(rootEventId) }.forEach { - eventBatcher.storeEvent( - it, - "Failed to handle transport message group", - "Error", - error - ) - } - } - } - } - } - checkNotNull(transportMR.subscribe(listener, INPUT_QUEUE_TRANSPORT_ATTRIBUTE)) - }.onSuccess { monitor -> - registerResource("transport-raw-monitor", monitor::unsubscribe) - }.onFailure { - LOGGER.warn(it) { "Failed to subscribe to input transport queue" } - } - - if (proto.isFailure && transport.isFailure) { - error("Subscribe pin should be declared at least one of protobuf or transport protocols") - } + }.start() - client.runCatching(HttpClient::start).onFailure { - throw IllegalStateException("Failed to start client", it) - } - - LOGGER.info { "Successfully started" } - - ReentrantLock().run { - val condition = newCondition() - registerResource("await-shutdown") { withLock(condition::signalAll) } - withLock(condition::await) - } + LOGGER.info { "Successfully started" } - LOGGER.info { "Finished running" } + ReentrantLock().run { + val condition = newCondition() + resources += "await-shutdown" to { withLock(condition::signalAll) } + withLock(condition::await) } -} -data class Settings( - val https: Boolean = false, - val host: String, - val port: Int = if (https) 443 else 80, - val readTimeout: Int = 5000, - val maxParallelRequests: Int = 5, - val keepAliveTimeout: Long = 15000, - val defaultHeaders: Map> = emptyMap(), - val sessionAlias: String, - val auth: IAuthSettings? = null, - val validateCertificates: Boolean = true, - val useTransport: Boolean = false, - val batcherThreads: Int = 2, - val maxBatchSize: Int = 1000, - val maxFlushTime: Long = 1000, - @JsonDeserialize(converter = CertificateConverter::class) val clientCertificate: X509Certificate? = null, - @JsonDeserialize(converter = PrivateKeyConverter::class) val certificatePrivateKey: PrivateKey? = null, -) { - @JsonIgnore - val certificate: Certificate? = clientCertificate?.run { - requireNotNull(certificatePrivateKey) { - "'${::clientCertificate.name}' setting requires '${::certificatePrivateKey.name}' setting to be set" - } - - Certificate(clientCertificate, certificatePrivateKey) - } -} - -private inline fun load(defaultImpl: Class): T { - val instances = ServiceLoader.load(T::class.java).toList() - - return when (instances.size) { - 0 -> error("No instances of ${T::class.simpleName}") - 1 -> instances.first() - 2 -> instances.first { !defaultImpl.isInstance(it) } - else -> error("More than 1 non-default instance of ${T::class.simpleName} has been found: $instances") - } -} - -private fun createSequence(): () -> Long = Instant.now().run { - AtomicLong(epochSecond * SECONDS.toNanos(1) + nano) -}::incrementAndGet - -private fun createExecutorService(maxCount: Int): ExecutorService { - val threadCount = AtomicInteger(1) - return Executors.newFixedThreadPool(maxCount) { runnable: Runnable? -> - Thread(runnable).apply { - isDaemon = true - name = "th2-http-client-${threadCount.incrementAndGet()}" - } - } + LOGGER.info { "Finished running" } +} catch (e: Exception) { + LOGGER.error(e) { "Uncaught exception. Shutting down" } + exitProcess(1) } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt new file mode 100644 index 0000000..d54dccc --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/http/client/ApplicationIntegrationTest.kt @@ -0,0 +1,326 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.http.client + +import com.exactpro.th2.common.grpc.Event +import com.exactpro.th2.common.grpc.EventBatch +import com.exactpro.th2.common.grpc.EventID +import com.exactpro.th2.common.grpc.EventStatus +import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration.DEFAULT_BOOK_NAME +import com.exactpro.th2.common.schema.factory.CommonFactory +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.EventId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Message +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.TransportGroupBatchRouter.Companion.TRANSPORT_GROUP_ATTRIBUTE +import com.exactpro.th2.common.utils.event.transport.toProto +import com.exactpro.th2.common.utils.message.transport.toGroup +import com.exactpro.th2.http.client.annotations.IntegrationTest +import com.exactpro.th2.test.annotations.Th2AppFactory +import com.exactpro.th2.test.annotations.Th2IntegrationTest +import com.exactpro.th2.test.annotations.Th2TestFactory +import com.exactpro.th2.test.extension.CleanupExtension +import com.exactpro.th2.test.queue.CollectorMessageListener +import com.exactpro.th2.test.spec.CustomConfigSpec +import com.exactpro.th2.test.spec.RabbitMqSpec +import com.exactpro.th2.test.spec.RabbitMqSpec.Companion.EVENTS_PIN_NAME +import com.exactpro.th2.test.spec.filter +import com.exactpro.th2.test.spec.message +import com.exactpro.th2.test.spec.pin +import com.exactpro.th2.test.spec.pins +import com.exactpro.th2.test.spec.publishers +import com.exactpro.th2.test.spec.subscribers +import org.junit.jupiter.api.Test +import strikt.api.Assertion +import strikt.api.expectThat +import strikt.assertions.all +import strikt.assertions.isEmpty +import strikt.assertions.isEqualTo +import strikt.assertions.isFalse +import strikt.assertions.isNotNull +import strikt.assertions.matches +import strikt.assertions.single +import strikt.assertions.withElementAt +import java.time.Duration.ofSeconds +import java.time.Instant +import kotlin.test.assertNotNull + +@IntegrationTest +@Th2IntegrationTest +class ApplicationIntegrationTest { + @JvmField + @Suppress("unused") + internal val customConfig = CustomConfigSpec.fromString( + """ + { + "host": "127.0.0.1", + "port": 8080, + "sessionAlias": "some_api", + "validateCertificates": false, + "sessionAlias": "test-session-alias", + "useTransport": true + } + """.trimIndent() + ) + + @JvmField + @Suppress("unused") + internal val mq = RabbitMqSpec.create() + .pins { + subscribers { + pin("sub") { + attributes(INPUT_QUEUE_TRANSPORT_ATTRIBUTE, TRANSPORT_GROUP_ATTRIBUTE) + filter { + message { + field("test") shouldBeEqualTo "a" + } + } + } + } + + publishers { + pin("pub") { + attributes(TRANSPORT_GROUP_ATTRIBUTE) + } + } + } + + @Test + fun `failed connection when process message without parent event id test`( + @Th2AppFactory appFactory: CommonFactory, + @Th2TestFactory testFactory: CommonFactory, + resources: CleanupExtension.Registry, + ) { + val eventListener = CollectorMessageListener.createWithCapacity(1) + testFactory.eventBatchRouter.subscribe(eventListener, EVENTS_PIN_NAME) + + val application = Application(appFactory) { resource, destructor -> + resources.add(resource, destructor) + } + + val rootEventId: EventID = eventListener.assertRootEvent().id + + application.start() + + testFactory.sendMessages(RawMessage.builder().apply { + idBuilder() + .setSessionAlias("test-session-alias") + .setTimestamp(Instant.now()) + .setDirection(Direction.OUTGOING) + .setSequence(1) + }.build()) + + expectThat(eventListener.poll(ofSeconds(2))).isNotNull() + .get { eventsList }.single().and { + get { name }.isEqualTo("Failed to handle transport message group") + get { type }.isEqualTo("Error") + get { status }.isEqualTo(EventStatus.FAILED) + get { id }.and { + get { bookName }.isEqualTo(rootEventId.bookName) + get { scope }.isEqualTo(rootEventId.scope) + } + get { parentId }.isEqualTo(rootEventId) + get { attachedMessageIdsList }.isEmpty() + get { body.toString(Charsets.UTF_8) }.isEqualTo( + """ + [{"data":"java.net.ConnectException: Connection refused (Connection refused)","type":"message"}] + """.trimIndent() + ) + } + } + + @Test + fun `failed connection when process message with parent event id test`( + @Th2AppFactory appFactory: CommonFactory, + @Th2TestFactory testFactory: CommonFactory, + resources: CleanupExtension.Registry, + ) { + val eventListener = CollectorMessageListener.createWithCapacity(1) + testFactory.eventBatchRouter.subscribe(eventListener, EVENTS_PIN_NAME) + + val application = Application(appFactory) { resource, destructor -> + resources.add(resource, destructor) + } + + eventListener.assertRootEvent() + + application.start() + + val eventId = EventId.builder() + .setBook(BOOK_TEST) + .setScope(SCOPE_TEST_A) + .setTimestamp(Instant.now()) + .setId("test-id") + .build() + + testFactory.sendMessages(RawMessage.builder().apply { + idBuilder() + .setSessionAlias("test-session-alias") + .setTimestamp(Instant.now()) + .setDirection(Direction.OUTGOING) + .setSequence(1) + setEventId(eventId) + }.build()) + + expectThat(eventListener.poll(ofSeconds(2))).isNotNull() + .get { eventsList }.single().and { + get { name }.isEqualTo("Failed to handle transport message group") + get { type }.isEqualTo("Error") + get { status }.isEqualTo(EventStatus.FAILED) + get { id }.and { + get { bookName }.isEqualTo(eventId.book) + get { scope }.isEqualTo(eventId.scope) + } + get { parentId }.isEqualTo(eventId.toProto()) + get { attachedMessageIdsList }.isEmpty() + get { body.toString(Charsets.UTF_8) }.isEqualTo( + """ + [{"data":"java.net.ConnectException: Connection refused (Connection refused)","type":"message"}] + """.trimIndent() + ) + } + } + + @Test + fun `failed connection when process messages with parent event id test`( + @Th2AppFactory appFactory: CommonFactory, + @Th2TestFactory testFactory: CommonFactory, + resources: CleanupExtension.Registry, + ) { + val eventListener = CollectorMessageListener.createWithCapacity(1) + testFactory.eventBatchRouter.subscribe(eventListener, EVENTS_PIN_NAME) + + val application = Application(appFactory) { resource, destructor -> + resources.add(resource, destructor) + } + + eventListener.assertRootEvent() + + application.start() + + val eventIdA = EventId.builder() + .setBook(BOOK_TEST) + .setScope(SCOPE_TEST_A) + .setTimestamp(Instant.now()) + .setId("test-id") + .build() + + val eventIdB = EventId.builder() + .setBook(BOOK_TEST) + .setScope(SCOPE_TEST_B) + .setTimestamp(Instant.now()) + .setId("test-id") + .build() + + testFactory.sendMessages( + RawMessage.builder().apply { + idBuilder() + .setSessionAlias("test-session-alias") + .setTimestamp(Instant.now()) + .setDirection(Direction.OUTGOING) + .setSequence(1) + setEventId(eventIdA) + }.build(), + RawMessage.builder().apply { + idBuilder() + .setSessionAlias("test-session-alias") + .setTimestamp(Instant.now()) + .setDirection(Direction.OUTGOING) + .setSequence(1) + setEventId(eventIdB) + }.build(), + ) + + val events = listOf( + assertNotNull(eventListener.poll(ofSeconds(2))), + assertNotNull(eventListener.poll(ofSeconds(2))), + ) + + expectThat(events) { + all { + get { eventsList }.single().and { + get { name }.isEqualTo("Failed to handle transport message group") + get { type }.isEqualTo("Error") + get { status }.isEqualTo(EventStatus.FAILED) + get { id }.and { + get { bookName }.isEqualTo(BOOK_TEST) + } + get { attachedMessageIdsList }.isEmpty() + get { body.toString(Charsets.UTF_8) }.isEqualTo( + """ + [{"data":"java.net.ConnectException: Connection refused (Connection refused)","type":"message"}] + """.trimIndent() + ) + } + } + withElementAt(0) { + get { getEvents(0) }.and { + get { id }.get { scope }.isEqualTo(eventIdA.scope) + get { parentId }.isEqualTo(eventIdA.toProto()) + } + } + withElementAt(1) { + get { getEvents(0) }.and { + get { id }.get { scope }.isEqualTo(eventIdB.scope) + get { parentId }.isEqualTo(eventIdB.toProto()) + } + } + } + } + + private fun CommonFactory.sendMessages( + vararg messages: Message<*> + ) { + transportGroupBatchRouter.send( + GroupBatch.builder().apply { + setBook(BOOK_TEST) + setSessionGroup(SESSION_GROUP_TEST) + + messages.asSequence() + .map(Message<*>::toGroup) + .forEach(this::addGroup) + }.build(), + "sub" + ) + } + + private fun CollectorMessageListener.assertRootEvent() = + assertNotNull(poll(ofSeconds(1))).also { + expectThat(it) { + get { eventsList }.single().isRootEvent(DEFAULT_BOOK_NAME, "app") + } + }.getEvents(0) + + companion object { + private const val BOOK_TEST = "test-book-A" + private const val SCOPE_TEST_A = "test-scope-A" + private const val SCOPE_TEST_B = "test-scope-B" + private const val SESSION_GROUP_TEST = "test-session-group" + + fun Assertion.Builder.isRootEvent(book: String, scope: String) { + get { id }.and { + get { getBookName() }.isEqualTo(book) + get { getScope() }.isEqualTo(scope) + } + get { hasParentId() }.isFalse() + get { name }.matches(Regex("$scope \\d{4}-[01]\\d-[0-3]\\dT[0-2]\\d:[0-5]\\d:[0-5]\\d\\.\\d+([+-][0-2]\\d:[0-5]\\d|Z) - Root event")) + get { type }.isEqualTo("Microservice") + get { status }.isEqualTo(EventStatus.SUCCESS) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/http/client/annotations/IntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/http/client/annotations/IntegrationTest.kt new file mode 100644 index 0000000..9dfea47 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/http/client/annotations/IntegrationTest.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.http.client.annotations + +import org.junit.jupiter.api.Tag + +@Tag("integration-test") +annotation class IntegrationTest \ No newline at end of file diff --git a/src/test/resources/log4j2.properties b/src/test/resources/log4j2.properties new file mode 100644 index 0000000..232679a --- /dev/null +++ b/src/test/resources/log4j2.properties @@ -0,0 +1,8 @@ +# Console appender configuration +appender.console.type = Console +appender.console.name = ConsoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{dd MMM yyyy HH:mm:ss,SSS} %-6p [%-15t] %c - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.stdout.ref = ConsoleLogger \ No newline at end of file