Skip to content

Commit

Permalink
Merge pull request #5 from DamirDenis-Tudor/features
Browse files Browse the repository at this point in the history
Features
  • Loading branch information
DamirDenis-Tudor authored Dec 15, 2024
2 parents 1215176 + e0c3877 commit 1ed92e3
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 61 deletions.
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ![Ktor](https://avatars.githubusercontent.com/u/28214161?s=40&v=4.svg) KabbitMQ
# ![KabbitMQ](https://github.com/user-attachments/assets/1fcc4641-1c1f-44c7-a929-6de86210303c)abbitMQ

## Overview

Expand All @@ -9,7 +9,7 @@

```kotlin
dependencies {
implementation("io.github.damirdenis-tudor:kabbitmq:0.2.0")
implementation("io.github.damirdenis-tudor:kabbitmq:<version>")
}
```

Expand All @@ -23,10 +23,10 @@ install(KabbitMQ) {
```

### Features
- DSL wrapper for almost all the functionalities with parameter validation.
- Channel & Connection management mechanism.
- Built in message serialization and deserialization.
- Possibility to interact directly with `com.rabbitmq:amqp-client`.
- DSL wrapper for most functionalities with built-in parameter validation.
- Robust channel and connection management mechanisms.
- Integrated message serialization and deserialization.
- Option to interact directly with `com.rabbitmq:amqp-client`.

### Samples

Expand Down Expand Up @@ -70,9 +70,10 @@ connection(


```kotlin
/*
* In com.rabbitmq:amqp-client you have overloaded functions and in order to maintain compatibility with DSL style
* a mechanism for parameter validation is required. The core of this mechanism is a custom state delegator mechanism.
/*
* `com.rabbitmq:amqp-client` offers overloaded functions, therefore to ensure
* compatibility with the DSL style, a robust parameter validation mechanism is necessary.
* At the heart of this mechanism is a custom state delegator.
*/

basicConsume {
Expand Down
19 changes: 10 additions & 9 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,16 @@ publishing {
name.set("Ktor rabbitMQ plugin")
packaging = "jar"
description.set(
"KabbitMQ is a Ktor plugin for RabbitMQ that provides access to all the core functionalities of the com.rabbitmq:amqp-client.\n" +
"It integrates seamlessly with Ktor's DSL, offering readable, maintainable, and easy-to-use functionalities.\n")
"KabbitMQ is a Ktor plugin for RabbitMQ that provides access to all the core functionalities of the com.rabbitmq:amqp-client.\n" +
"It integrates seamlessly with Ktor's DSL, offering readable, maintainable, and easy-to-use functionalities.\n"
)

url.set("https://github.com/DamirDenis-Tudor/kabbitmq")
url.set("https://github.com/DamirDenis-Tudor/KabbitMQ")

scm {
connection.set("scm:git:https://github.com/DamirDenis-Tudor/kabbitmq.git")
developerConnection.set("scm:git:[email protected]:DamirDenis-Tudor/kabbitmq.git")
url.set("https://github.com/DamirDenis-Tudor/kabbitmq")
connection.set("scm:git:https://github.com/DamirDenis-Tudor/KabbitMQ.git")
developerConnection.set("scm:git:[email protected]:DamirDenis-Tudor/KabbitMQ.git")
url.set("https://github.com/DamirDenis-Tudor/KabbitMQ")
}

licenses {
Expand All @@ -88,7 +89,7 @@ publishing {
}

repositories {
maven{
maven {
url = uri("$projectDir/build/publish")
}
}
Expand Down Expand Up @@ -155,9 +156,9 @@ tasks.register("uploadArtifact") {
}
}

with(this@with.toString(Charsets.UTF_8).apply(::println)) {
with(this@with.toString(Charsets.UTF_8).apply(::println)) {
when {
contains("PUBLISHED") -> Thread.sleep(1_000).apply { return@doLast }
contains("PUBLISHED") -> Thread.sleep(1_000).apply { return@doLast }
contains("FAILED") -> Thread.sleep(1_000).apply { exitProcess(-1) }
else -> Thread.sleep(60_000)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package com.mesh.kabbitMq.builders
import com.mesh.kabbitMq.dsl.KabbitMQDslMarker
import com.mesh.kabbitMq.delegator.State
import com.mesh.kabbitMq.delegator.Delegator
import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.Channel
import io.ktor.util.logging.*


@KabbitMQDslMarker
Expand All @@ -15,7 +19,16 @@ class KabbitMQBasicAckBuilder(private val channel: Channel) {
multiple = false
}

fun build() {
channel.basicAck(deliveryTag, multiple)
fun build() = withThisRef(this@KabbitMQBasicAckBuilder) {
return@withThisRef when {
initialized(::deliveryTag, ::multiple) -> {
channel.basicAck(deliveryTag, multiple)
}

else -> {
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicAckBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.*
import io.ktor.util.logging.*
import kotlinx.serialization.json.Json
import org.jetbrains.annotations.NotNull
import kotlin.properties.Delegates

@KabbitMQDslMarker
class KabbitMQBasicConsumeBuilder(
Expand Down Expand Up @@ -65,7 +67,7 @@ class KabbitMQBasicConsumeBuilder(

fun build(): String = withThisRef(this@KabbitMQBasicConsumeBuilder) {
return@withThisRef when {
initialized(::consumerTag, ::deliverCallback, ::cancelCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::consumerTag, ::deliverCallback, ::cancelCallback ) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -75,7 +77,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::consumerTag, ::deliverCallback, ::cancelCallback, ::shutdownSignalCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::consumerTag, ::noLocal, ::exclusive, ::arguments, ::deliverCallback, ::cancelCallback, ::shutdownSignalCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -89,7 +91,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback, ::shutdownSignalCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::arguments, ::deliverCallback, ::cancelCallback, ::shutdownSignalCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -100,7 +102,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::consumerTag, ::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::consumerTag, ::deliverCallback, ::shutdownSignalCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -110,7 +112,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::arguments, ::deliverCallback, ::shutdownSignalCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -120,7 +122,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::deliverCallback, ::shutdownSignalCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -129,7 +131,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::deliverCallback, ::cancelCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -138,16 +140,16 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
channel.basicConsume(
initialized(::queue, ::autoAck, ::deliverCallback, ::cancelCallback) -> {
channel.basicConsume(
queue,
autoAck,
deliverCallback,
cancelCallback
)
}

initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
initialized(::queue, ::autoAck, ::arguments, ::deliverCallback, ::cancelCallback) -> {
channel.basicConsume(
queue,
autoAck,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,28 @@ package com.mesh.kabbitMq.builders

import com.mesh.kabbitMq.dsl.KabbitMQDslMarker
import com.mesh.kabbitMq.delegator.Delegator
import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.Channel
import com.rabbitmq.client.GetResponse
import io.ktor.util.logging.*

@KabbitMQDslMarker
class KabbitMQBasicGetBuilder(private val channel: Channel) {
var queue: String by Delegator()
var autoAck: Boolean by Delegator()

fun build(): GetResponse = channel.basicGet(queue, autoAck)
fun build(): GetResponse = withThisRef(this@KabbitMQBasicGetBuilder) {
return@withThisRef when {
initialized(::queue, ::autoAck) -> {
channel.basicGet(queue, autoAck)
}

else -> {
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicGetBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package com.mesh.kabbitMq.builders

import com.mesh.kabbitMq.dsl.KabbitMQDslMarker
import com.mesh.kabbitMq.delegator.Delegator
import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.Channel
import io.ktor.util.logging.*

@KabbitMQDslMarker
class KabbitMQBasicNackBuilder(private val channel: Channel) {
Expand All @@ -15,6 +19,16 @@ class KabbitMQBasicNackBuilder(private val channel: Channel) {
requeue = false
}

fun build() = channel.basicNack(deliveryTag, multiple, requeue)
fun build() = withThisRef(this@KabbitMQBasicNackBuilder) {
return@withThisRef when {
initialized(::deliveryTag, ::multiple, ::requeue) -> {
channel.basicNack(deliveryTag, multiple, requeue)
}

else -> {
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicNackBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.mesh.kabbitMq.dsl.KabbitMQDslMarker
import com.mesh.kabbitMq.delegator.State
import com.mesh.kabbitMq.delegator.Delegator
import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.Channel
import io.ktor.util.logging.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.util.*
Expand Down Expand Up @@ -42,7 +44,7 @@ class KabbitMQBasicPublishBuilder(

fun build() = withThisRef(this@KabbitMQBasicPublishBuilder) {
return@withThisRef when {
initialized(::mandatory, ::immediate) -> {
initialized(::exchange, ::routingKey, ::message, ::mandatory, ::immediate, ::properties) -> {
channel.basicPublish(
exchange,
routingKey,
Expand All @@ -53,7 +55,7 @@ class KabbitMQBasicPublishBuilder(
)
}

initialized(::immediate) -> {
initialized(::exchange, ::routingKey, ::message, ::immediate, ::properties) -> {
channel.basicPublish(
exchange,
routingKey,
Expand All @@ -63,7 +65,7 @@ class KabbitMQBasicPublishBuilder(
)
}

initialized(::mandatory) -> {
initialized(::exchange, ::routingKey, ::message, ::mandatory, ::properties) -> {
channel.basicPublish(
exchange,
routingKey,
Expand All @@ -73,14 +75,19 @@ class KabbitMQBasicPublishBuilder(
)
}

else -> {
initialized(::exchange, ::routingKey, ::message, ::properties) -> {
channel.basicPublish(
exchange,
routingKey,
properties,
message
)
}

else -> {
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicPublishBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package com.mesh.kabbitMq.builders
import com.mesh.kabbitMq.dsl.KabbitMQDslMarker
import com.mesh.kabbitMq.delegator.Delegator
import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.Channel
import io.ktor.util.logging.*


@KabbitMQDslMarker
Expand All @@ -15,17 +17,22 @@ class KabbitMQBasicQosBuilder(private val channel: Channel) {

fun build() = withThisRef(this@KabbitMQBasicQosBuilder) {
return@withThisRef when {
initialized(::prefetchCount, ::global) -> {
initialized(::prefetchCount, ::prefetchCount, ::global) -> {
channel.basicQos(prefetchSize, prefetchCount, global)
}

initialized(::prefetchCount) -> {
initialized(::prefetchCount, ::global) -> {
channel.basicQos(prefetchCount, global)
}

else -> {
initialized(::prefetchCount) -> {
channel.basicQos(prefetchCount)
}

else -> {
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicQosBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
}
}
Loading

0 comments on commit 1ed92e3

Please sign in to comment.