Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-1988] separate connections for publishing and subscribing #318

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,15 @@ dependencies {

## Release notes

### 5.14.0-dev

+ Separate connections for publisher and consumer
+ Updated cradle `5.4.1-dev`
+ Updated kubernetes-client: `6.13.1`

### 5.13.1-dev

+ Provided ability to set either of raw body of several dody data to `Event` builder
+ Provided ability to set either of raw body of several body data to `Event` builder
+ Updated th2 gradle plugin `0.0.8`

### 5.13.0-dev
Expand Down
14 changes: 7 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ ext {
protobufVersion = '3.25.3'
serviceGeneratorVersion = '3.6.1'

cradleVersion = '5.3.0-dev'
junitVersion = '5.10.2'
cradleVersion = '5.4.1-dev'
junitVersion = '5.10.3'

jmhVersion = '1.37'
autoValueVersion = '1.10.4'
Expand Down Expand Up @@ -128,7 +128,7 @@ dependencies {
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor'

implementation "com.fasterxml.uuid:java-uuid-generator:5.0.0"
implementation "com.fasterxml.uuid:java-uuid-generator:5.1.0"

implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'
implementation 'org.apache.logging.log4j:log4j-core'
Expand All @@ -151,18 +151,18 @@ dependencies {
'Note: kotlin:1.9 is transitive dependency since 4.12.0')
}

implementation("io.fabric8:kubernetes-client:6.12.1") {
implementation("io.fabric8:kubernetes-client:6.13.1") {
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-yaml'
}

implementation "io.github.microutils:kotlin-logging:3.0.5"

testImplementation 'javax.annotation:javax.annotation-api:1.3.2'
testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
testImplementation "org.mockito.kotlin:mockito-kotlin:5.2.1"
testImplementation "org.mockito.kotlin:mockito-kotlin:5.3.1"
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation "org.testcontainers:testcontainers:1.19.6"
testImplementation "org.testcontainers:rabbitmq:1.19.6"
testImplementation "org.testcontainers:testcontainers:1.19.8"
testImplementation "org.testcontainers:rabbitmq:1.19.8"
testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") {
because("system property tests")
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
release_version=5.13.1
release_version=5.14.0
kotlin_version=1.8.22
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
Expand Down

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -51,8 +51,12 @@ private val ALL_READINESS = CopyOnWriteArrayList(listOf(RABBITMQ_READINESS, GRPC
fun registerLiveness(name: String) = LIVENESS_ARBITER.createMonitor(name)
fun registerReadiness(name: String) = READINESS_ARBITER.createMonitor(name)

fun registerLiveness(obj: Any) = LIVENESS_ARBITER.createMonitor("${obj::class.simpleName}_liveness_${obj.hashCode()}")
fun registerReadiness(obj: Any) = READINESS_ARBITER.createMonitor("${obj::class.simpleName}_readiness_${obj.hashCode()}")
@JvmOverloads
fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, "liveness", suffix))
@JvmOverloads
fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, "readiness", suffix))

private fun getMonitorName(obj: Any, infix: String, suffix: String) = "${obj::class.simpleName}_${infix}_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix"

@JvmField
val LIVENESS_MONITOR = registerLiveness("user_liveness")
Expand Down Expand Up @@ -131,6 +135,7 @@ class HealthMetrics @JvmOverloads constructor(
) {
@JvmOverloads
constructor(parent: Any, attempts: Int = 10) : this(registerLiveness(parent), registerReadiness(parent), attempts)
constructor(parent: Any, suffix: String = "") : this(registerLiveness(parent, suffix), registerReadiness(parent, suffix))

private val attempts = AtomicInteger(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.testcontainers.utility.DockerImageName
import java.time.Duration

object ContainerConstants {
@JvmField val RABBITMQ_IMAGE_NAME: DockerImageName = DockerImageName.parse("rabbitmq:3.13.0-management-alpine")
@JvmField val RABBITMQ_IMAGE_NAME: DockerImageName = DockerImageName.parse("rabbitmq:3.13.4-management-alpine")
const val ROUTING_KEY = "routingKey"
const val QUEUE_NAME = "queue"
const val EXCHANGE = "test-exchange"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
Expand All @@ -56,6 +57,7 @@ import java.time.Duration
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
Expand Down Expand Up @@ -1190,6 +1192,102 @@ class TestConnectionManager {
)
)

@Test
fun `connection manager receives messages when publishing is blocked`() {
val routingKey = "routingKey1"
val queueName = "queue1"
val exchange = "test-exchange1"
rabbit.let { rabbit ->
declareQueue(rabbit, queueName)
declareFanoutExchangeWithBinding(rabbit, exchange, queueName)

LOGGER.info { "Started with port ${rabbit.amqpPort}" }
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val blockAfter = 3
val countDown = CountDownLatch(messagesCount)
val messageSizeBytes = 7
createConnectionManager(
rabbit, ConnectionManagerConfiguration(
subscriberName = "test",
prefetchCount = DEFAULT_PREFETCH_COUNT,
confirmationTimeout = DEFAULT_CONFIRMATION_TIMEOUT,
enablePublisherConfirmation = true,
maxInflightPublicationsBytes = messagesCount * messageSizeBytes,
heartbeatIntervalSeconds = 1,
minConnectionRecoveryTimeout = 2000,
maxConnectionRecoveryTimeout = 2000,
// to avoid unexpected delays before recovery
retryTimeDeviationPercent = 0
)
).use { manager ->
repeat(messagesCount) { index ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messagesCount is greater than blockAfter, it means that publisher not blocking in this test because check logic is after this circle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We send 3 messages, then block publishers, then send 7 more. Subscribe, receive 3, unblock, receive 7. This allows to ensure that publishers were actually blocked. We can simplify the scenario: just send a few messages, block publishers, try to read messages.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then send 7 more -> If publisher blocked we could able to publish additional messages

if (index == blockAfter) {
assertFalse(manager.isPublishingBlocked)

// blocks all publishers ( https://www.rabbitmq.com/docs/memory )
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
}

manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
LOGGER.info("Published $index")

if (index == blockAfter) {
// wait for blocking of publishing connection
Awaitility.await("publishing blocked")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { manager.isPublishingBlocked }
}
}

val receivedMessages = linkedSetOf<String>()
LOGGER.info { "creating consumer" }

val subscribeFuture = Executors.newSingleThreadExecutor().submit {
manager.basicConsume(queueName, { _, delivery, ack ->
val message = delivery.body.toString(Charsets.UTF_8)
LOGGER.info { "Received $message from ${delivery.envelope.routingKey}" }
if (receivedMessages.add(message)) {
// decrement only unique messages
countDown.countDown()
} else {
LOGGER.warn { "Duplicated $message for ${delivery.envelope.routingKey}" }
}
ack.confirm()
}) {
LOGGER.info { "Canceled $it" }
}
}

assertDoesNotThrow("Failed to subscribe to queue") {
// if subscription connection is blocked generates TimeoutException
subscribeFuture.get(1, TimeUnit.SECONDS)
subscribeFuture.cancel(true)
}

Awaitility.await("receive messages sent before blocking")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { blockAfter.toLong() == messagesCount - countDown.count }

Thread.sleep(100) // ensure no more messages received
assertEquals(blockAfter.toLong(), messagesCount - countDown.count)
assertTrue(manager.isPublishingBlocked)

// unblocks publishers
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we unlock the publisher in this test?

assertFalse(manager.isPublishingBlocked)

// delay receiving all messages
Awaitility.await("all messages received")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { countDown.count == 0L }
}
}
}

@AfterEach
fun cleanupRabbitMq() {
// cleanup is done to prevent queue name collision during test
Expand Down
Loading