Skip to content

Commit

Permalink
test: minimal consumer tests added.
Browse files Browse the repository at this point in the history
  • Loading branch information
DamirDenis-Tudor committed Jan 20, 2025
1 parent eef50f2 commit 46a78ed
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,6 @@ open class ConnectionManager(
channelCache[id]?.close()
channelCache.remove(id)

logger.debug("Channel with id: <$channelId>, closed")
logger.debug("Channel with id: <$channelId> for connection with id <$connectionId>, closed")
}
}
45 changes: 11 additions & 34 deletions src/test/kotlin/TestContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.ChannelContext
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.ConnectionContext
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.PluginContext
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.RabbitDslMarker
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.getChannelContext
import io.github.damir.denis.tudor.ktor.server.rabbitmq.rabbitMQ
import io.ktor.server.application.Application
import io.ktor.server.application.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.random.Random

fun Application.rabbitmqTest(block: suspend PluginContext.() -> Unit) = runBlocking {
with(attributes[ConnectionManagerKey]) {
Expand All @@ -25,7 +23,7 @@ fun PluginContext.channelTest(block: suspend PluginContext.() -> Unit) = runBloc
with(connectionManager) {
getChannel().also {
coroutineScope.launch(Dispatchers.rabbitMQ) {
getChannelContext(it).apply { block() }
ChannelContext(connectionManager, it).apply { block() }
}.join()
}
}
Expand All @@ -40,9 +38,7 @@ fun PluginContext.channelTest(
with(connectionManager) {
getChannel(id).also {
coroutineScope.launch(Dispatchers.rabbitMQ) {
getChannelContext(it).apply {
block()
}
ChannelContext(connectionManager, it).apply { block() }
}.also { job ->
if (autoClose) {
job.join()
Expand Down Expand Up @@ -82,7 +78,11 @@ fun PluginContext.connectionTest(
with(connectionManager) {
getConnection(id).also {
coroutineScope.launch(Dispatchers.rabbitMQ) {
ConnectionContext(connectionManager, it).apply { block() }
ConnectionContext(
connectionManager = connectionManager,
connection = it,
defaultChannel = getChannel(connectionId = id)
).apply { block() }
}.also { job ->
if (autoClose) {
job.join()
Expand Down Expand Up @@ -126,7 +126,7 @@ inline fun ConnectionContext.channelTest(
}.also { job ->
if (autoClose) {
job.join()
closeChannel(id)
closeChannel(id, getConnectionId(connection))
}
}.join()
}
Expand All @@ -139,36 +139,13 @@ inline fun ConnectionContext.channelTest(
crossinline block: suspend ChannelContext.() -> Unit
) = runBlocking {
with(connectionManager) {
val connectionId = getConnectionId(connection)
val channelId = Random.nextInt(1000, 5000)
connectionManager.getChannel(channelId, connectionId).also {
getChannel(connectionId = getConnectionId(connection)).also {
coroutineScope.launch(Dispatchers.rabbitMQ) {
it.also { ChannelContext(connectionManager, it).apply { block() } }
}.also { job ->
if (autoClose) {
job.join()
closeChannel(channelId)
}
}.join()
}
}
}

@RabbitDslMarker
inline fun ConnectionContext.libChannel(
autoClose: Boolean = false,
crossinline block: suspend Channel.() -> Unit
) = runBlocking {
with(connectionManager) {
val connectionId = getConnectionId(connection)
val channelId = Random.nextInt(1000, 5000)
getChannel(channelId, connectionId).also {
coroutineScope.launch(Dispatchers.rabbitMQ) {
it.apply { block() }
}.also { job ->
if (autoClose) {
job.join()
closeChannel(channelId)
closeChannel(connectionId = getConnectionId(connection))
}
}.join()
}
Expand Down
54 changes: 51 additions & 3 deletions src/test/kotlin/integration/ConnectionTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package integration
import channelTest
import connectionTest
import io.github.damir.denis.tudor.ktor.server.rabbitmq.RabbitMQ
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.channel
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.connection
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.rabbitmq
import io.ktor.server.application.install
import io.ktor.server.testing.testApplication
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -38,7 +41,7 @@ class ConnectionTests {
}

@Test
fun `test install with default connection`() = testApplication {
fun `test install with default channel`() = testApplication {
application {
install(RabbitMQ) {
connectionAttempts = 3
Expand All @@ -47,7 +50,7 @@ class ConnectionTests {
}

rabbitmqTest {

assert(channel.isOpen)
}
}
}
Expand Down Expand Up @@ -97,7 +100,30 @@ class ConnectionTests {
}

@Test
fun `test autoclose`() = testApplication {
fun `test channel reuse within a connection block`() = testApplication {
application {
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
}
}

application {
runTest {
rabbitmqTest {
connectionTest(id = "test") {
val channelDefault = channelTest {}

Assertions.assertEquals(channel, channelDefault)
}
}
}
}
}

@Test
fun `test autoclose channel`() = testApplication {

application {
install(RabbitMQ) {
Expand All @@ -117,4 +143,26 @@ class ConnectionTests {
}
}
}

@Test
fun `test autoclose connection channel`() = testApplication {
application {
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
}
}
application {
runTest {
rabbitmqTest {
connectionTest(id = "test") {
val channel1 = channelTest(id = 99, autoClose = true) {}
val channel2 = channelTest(id = 99) {}
assertNotEquals(channel1, channel2)
}
}
}
}
}
}
3 changes: 3 additions & 0 deletions src/test/kotlin/integration/InstallTests.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package integration

import io.github.damir.denis.tudor.ktor.server.rabbitmq.RabbitMQ
import io.ktor.server.application.*
import io.ktor.server.testing.*
import kotlinx.io.files.FileNotFoundException
import org.junit.jupiter.api.Test
import kotlin.test.assertFailsWith


class InstallTests {
@Test
fun `test install with default parameters`() = testApplication {
Expand Down
141 changes: 133 additions & 8 deletions src/test/kotlin/integration/OperationsTests.kt
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package integration

import connectionTest
import io.github.damir.denis.tudor.ktor.server.rabbitmq.RabbitMQ
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.*
import io.github.damir.denis.tudor.ktor.server.rabbitmq.rabbitMQ
import kotlinx.serialization.Serializable
import io.ktor.server.application.*
import io.ktor.server.testing.*
import io.ktor.util.Digest
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.utility.DockerImageName
import kotlin.coroutines.coroutineContext
import rabbitmqTest
import java.lang.Thread.sleep
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith

class OperationsTests {

Expand Down Expand Up @@ -68,7 +73,7 @@ class OperationsTests {
exchange = "dlx"
type = "direct"
}
}.getOrThrow()
}
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ class OperationsTests {
}
}.getOrThrow()

Thread.sleep(2_000)
sleep(2_000)

assertEquals(messageCount { queue = "dlq" }.getOrNull(), 10)

Expand All @@ -149,10 +154,130 @@ class OperationsTests {
}
}

Thread.sleep(2_000)
sleep(2_000)

assertEquals(messageCount { queue = "dlq" }.getOrNull(), 0)
}
}
}

@Test
fun `consumer with coroutine poll (processing is not sequentially)`() = testApplication {
application {
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
}
}

application {
rabbitmqTest {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}

rabbitmqTest {
repeat(1_000) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}

rabbitmqTest {
val counter = AtomicInteger(0)
connectionTest(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispatcher = Dispatchers.IO
coroutinePollSize = 100
deliverCallback<String> { tag, message ->
delay(30)
withContext(Dispatchers.IO.limitedParallelism(1)) {
counter.incrementAndGet()
}
}
}
}

sleep(2_000)
log.info(counter.toString())
assert(counter.get() == 1_000)
}
}
}

@Test
fun `consumer with default 1 coroutine (processing is sequentially)`() = testApplication {
application {
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
}
}

application {
rabbitmqTest {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}

rabbitmqTest {
repeat(100) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}

rabbitmqTest {
val counter = AtomicInteger(0)
connectionTest(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispatcher = Dispatchers.IO
deliverCallback<String> { tag, message ->
log.info("Received message: $tag")
delay(5)
withContext(Dispatchers.IO.limitedParallelism(1)) {
counter.incrementAndGet()
}
}
}
}

sleep(2_000)
log.info(counter.toString())
assert(counter.get() == 100)
}
}
}
}

0 comments on commit 46a78ed

Please sign in to comment.