Skip to content

Commit

Permalink
Correct baton class: guarantee that one thread cannot put and get the…
Browse files Browse the repository at this point in the history
… same permit
  • Loading branch information
OptimumCode committed May 30, 2024
1 parent 5d0ab4d commit c3acdfe
Showing 1 changed file with 58 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ import io.grpc.stub.StreamObserver
import mu.KotlinLogging
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.AfterTestExecutionCallback
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.extension.ExtensionContext
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
Expand All @@ -50,23 +55,40 @@ import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue

private const val CANCEL_REASON = "test request is canceled"

@ExtendWith(DefaultGrpcRouterTest.ExecutionListener::class)
@IntegrationTest
internal class DefaultGrpcRouterTest {
/**
* Listener adds additional logging to help understanding from the stdout where test starts and finishes
*/
internal class ExecutionListener : BeforeTestExecutionCallback, AfterTestExecutionCallback {
private val logger = KotlinLogging.logger { }
override fun beforeTestExecution(ctx: ExtensionContext) {
logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' started" }
}

override fun afterTestExecution(ctx: ExtensionContext) {
logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' is finished" }
}

}

@IntegrationTest
abstract inner class AbstractGrpcRouterTest {
private val grpcRouterClient = DefaultGrpcRouter()
Expand Down Expand Up @@ -415,6 +437,7 @@ internal class DefaultGrpcRouterTest {
)
}

@Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
Expand Down Expand Up @@ -751,6 +774,7 @@ internal class DefaultGrpcRouterTest {
)
}

@Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
Expand Down Expand Up @@ -915,7 +939,7 @@ internal class DefaultGrpcRouterTest {
shutdownNow()
} else {
shutdown()
if (!awaitTermination(5, TimeUnit.SECONDS)) {
if (!awaitTermination(60, TimeUnit.SECONDS)) {
shutdownNow()
error("'Server' can't be closed")
}
Expand Down Expand Up @@ -974,10 +998,25 @@ internal class DefaultGrpcRouterTest {
val suspended: List<ExceptionMetadata>? = null
)

/**
* Baton class can help to synchronize two threads (only **two**).
*
* Baton class was migrated from using queue with size 1 to lock and conditions for synchronization.
*
* The implementation with queue did not provide guarantees that the same thread won't get the permit and put it back
* while another thread was still waiting for a free space in the queue.
*
* Using lock and conditions guarantees that the permit won't be given unless somebody is waiting for that permit.
* And vise-versa, nobody can get a permit unless somebody tries to put the permit
*/
internal class Baton(
private val name: String
) {
private val queue = ArrayBlockingQueue<Any>(1).apply { put(Any()) }
@Volatile
private var permits = 0
private val lock = ReentrantLock()
private val givenCondition = lock.newCondition()
private val getCondition = lock.newCondition()

fun giveAndGet(giveComment: String = "", getComment: String = "") {
give(giveComment)
Expand All @@ -986,13 +1025,25 @@ internal class DefaultGrpcRouterTest {

fun give(comment: String = "") {
K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" }
queue.put(Any())
lock.withLock {
if (permits == 0) {
getCondition.await()
}
permits += 1
givenCondition.signal()
}
K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" }
}

fun get(comment: String = "") {
K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" }
queue.poll()
lock.withLock {
getCondition.signal()
permits -= 1
if (permits < 0) {
givenCondition.await()
}
}
K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" }
}
}
Expand All @@ -1009,8 +1060,8 @@ internal class DefaultGrpcRouterTest {
}.build())

responseBaton?.let {
Thread.sleep(1_000)
it.give("response sent")
Thread.sleep(1_000)
}

if (complete) {
Expand All @@ -1027,8 +1078,8 @@ internal class DefaultGrpcRouterTest {
}

responseBaton?.let {
Thread.sleep(1_000)
it.give("response sent")
Thread.sleep(1_000)
}

if (complete) {
Expand Down

0 comments on commit c3acdfe

Please sign in to comment.