From c3acdfe682d0ee87b8aa123b7cd47c9312133bb0 Mon Sep 17 00:00:00 2001 From: Oleg Date: Thu, 30 May 2024 11:45:07 +0400 Subject: [PATCH] Correct baton class: guarantee that one thread cannot put and get the same permit --- .../grpc/router/impl/DefaultGrpcRouterTest.kt | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt index b4da67ad..17de2cd6 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/grpc/router/impl/DefaultGrpcRouterTest.kt @@ -38,9 +38,14 @@ import io.grpc.stub.StreamObserver import mu.KotlinLogging import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.extension.AfterTestExecutionCallback +import org.junit.jupiter.api.extension.BeforeTestExecutionCallback +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.extension.ExtensionContext import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.mock import org.mockito.kotlin.timeout @@ -50,7 +55,6 @@ import org.mockito.kotlin.verifyNoMoreInteractions import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder import java.time.Duration import java.time.Instant -import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -58,6 +62,8 @@ import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertNotNull @@ -65,8 +71,24 @@ import kotlin.test.assertTrue private const val CANCEL_REASON = "test request is canceled" +@ExtendWith(DefaultGrpcRouterTest.ExecutionListener::class) @IntegrationTest internal class DefaultGrpcRouterTest { + /** + * Listener adds additional logging to help understanding from the stdout where test starts and finishes + */ + internal class ExecutionListener : BeforeTestExecutionCallback, AfterTestExecutionCallback { + private val logger = KotlinLogging.logger { } + override fun beforeTestExecution(ctx: ExtensionContext) { + logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' started" } + } + + override fun afterTestExecution(ctx: ExtensionContext) { + logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' is finished" } + } + + } + @IntegrationTest abstract inner class AbstractGrpcRouterTest { private val grpcRouterClient = DefaultGrpcRouter() @@ -415,6 +437,7 @@ internal class DefaultGrpcRouterTest { ) } + @Disabled("this test isn't relevant for async request") @Test override fun `interrupt thread during retry request`() { // this test isn't relevant for async request @@ -751,6 +774,7 @@ internal class DefaultGrpcRouterTest { ) } + @Disabled("this test isn't relevant for async request") @Test override fun `interrupt thread during retry request`() { // this test isn't relevant for async request @@ -915,7 +939,7 @@ internal class DefaultGrpcRouterTest { shutdownNow() } else { shutdown() - if (!awaitTermination(5, TimeUnit.SECONDS)) { + if (!awaitTermination(60, TimeUnit.SECONDS)) { shutdownNow() error("'Server' can't be closed") } @@ -974,10 +998,25 @@ internal class DefaultGrpcRouterTest { val suspended: List? = null ) + /** + * Baton class can help to synchronize two threads (only **two**). + * + * Baton class was migrated from using queue with size 1 to lock and conditions for synchronization. + * + * The implementation with queue did not provide guarantees that the same thread won't get the permit and put it back + * while another thread was still waiting for a free space in the queue. + * + * Using lock and conditions guarantees that the permit won't be given unless somebody is waiting for that permit. + * And vise-versa, nobody can get a permit unless somebody tries to put the permit + */ internal class Baton( private val name: String ) { - private val queue = ArrayBlockingQueue(1).apply { put(Any()) } + @Volatile + private var permits = 0 + private val lock = ReentrantLock() + private val givenCondition = lock.newCondition() + private val getCondition = lock.newCondition() fun giveAndGet(giveComment: String = "", getComment: String = "") { give(giveComment) @@ -986,13 +1025,25 @@ internal class DefaultGrpcRouterTest { fun give(comment: String = "") { K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" } - queue.put(Any()) + lock.withLock { + if (permits == 0) { + getCondition.await() + } + permits += 1 + givenCondition.signal() + } K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" } } fun get(comment: String = "") { K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" } - queue.poll() + lock.withLock { + getCondition.signal() + permits -= 1 + if (permits < 0) { + givenCondition.await() + } + } K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" } } } @@ -1009,8 +1060,8 @@ internal class DefaultGrpcRouterTest { }.build()) responseBaton?.let { - Thread.sleep(1_000) it.give("response sent") + Thread.sleep(1_000) } if (complete) { @@ -1027,8 +1078,8 @@ internal class DefaultGrpcRouterTest { } responseBaton?.let { - Thread.sleep(1_000) it.give("response sent") + Thread.sleep(1_000) } if (complete) {