Skip to content

Commit

Permalink
Readme added.
Browse files Browse the repository at this point in the history
  • Loading branch information
DamirDenis-Tudor committed Dec 14, 2024
1 parent 0a64f46 commit 98f5a82
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 41 deletions.
217 changes: 191 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,205 @@
# kabbitmq
# ![Ktor](https://avatars.githubusercontent.com/u/28214161?s=40&v=4.svg) KabbitMQ

This project was created using the [Ktor Project Generator](https://start.ktor.io).
## Overview

Here are some useful links to get you started:
- `kabbitmq` is a Ktor plugin for RabbitMQ that provides access to all the core functionalities of the Java RabbitMQ Client library. It integrates seamlessly with Ktor's DSL, offering readable, maintainable, and easy-to-use functionalities.

- [Ktor Documentation](https://ktor.io/docs/home.html)
- [Ktor GitHub page](https://github.com/ktorio/ktor)
- The [Ktor Slack chat](https://app.slack.com/client/T09229ZC6/C0A974TJ9). You'll need
to [request an invite](https://surveys.jetbrains.com/s3/kotlin-slack-sign-up) to join.

## Features

Here's a list of features included in this project:
### Gradle (Kotlin DSL) - dependencies

| Name | Description |
| ----------------------------------------------------|------------------------------------------------------------- |
| [Routing](https://start.ktor.io/p/routing-default) | Allows to define structured routes and associated handlers. |
```kotlin
dependencies {
implementation("io.github.damirdenis-tudor:kabbitmq:0.2.0")
}
```

### Installation

```kotlin
install(KabbitMQ) {
uri = "amqp://guest:guest@localhost:5678"
connectionName = "guest"
}
```

### Features
- DSL wrapper for almost all the functionalities with parameter validation.
- Channel & Connection management mechanism.
- Built in message serialization and deserialization.
- Possibility to interact directly with Java RabbitMQ Client Library.

### Samples

#### Channel management sample
```kotlin
/* default connection with default channel */
consumerCount { queue = "test-queue" }

/* default connection with new channel */
channel(
id = "intensive",
autoClose = false /* can be reused by id */
){
consumerCount { queue = "test-queue" }
}
```

#### Connection management sample
```kotlin
/* new connection with new channels */
connection(
id = "intensive",
autoClose = false /* can be reused by id */
) { /* channels will be terminated after task completion */
channel {
messageCount { queue = "test-queue" }
}
channel {
basicConsume {
queue = "test-queue"
autoAck = true
deliverCallback<Message> { tag, message ->
println("Message: $message with $tag")
}
}
}
}
```

#### Strongly type code style sample

## Building & Running

To build or run the project, use one of the following tasks:
```kotlin
/*
* In RabbitMQ client you have overloaded functions and in order to maintain compatibility
* with DSL style a mechanism for parameter validation is required.
* The core of this mechanism is a custom state delegator mechanism.
*/

| Task | Description |
| -------------------------------|---------------------------------------------------------------------- |
| `./gradlew test` | Run the tests |
| `./gradlew build` | Build everything |
| `buildFatJar` | Build an executable JAR of the server with all dependencies included |
| `buildImage` | Build the docker image to use with the fat JAR |
| `publishImageToLocalRegistry` | Publish the docker image locally |
| `run` | Run the server |
| `runDocker` | Run using the local docker image |
basicConsume {
queue = "test-queue"
/* autoAck = true */ // let's say that autoAck is omited
deliverCallback<Message> { tag, message ->
println("Message: $message with $tag")
}
}
```

If the server starts successfully, you'll see the following output:
```log
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <arguments>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <autoAck>, initialized: <false>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <cancelCallback>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <channel>, initialized: <false>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <consumerTag>, initialized: <false>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <deliverCallback>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <exclusive>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <noLocal>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <queue>, initialized: <true>
2024-12-14 18:29:26.015 [DefaultDispatcher-worker-1] WARN KabbitMQBasicConsumeBuilder - <shutdownSignalCallback>, initialized: <false>
java.lang.IllegalStateException: Unsupported combination of parameters for basicConsume.
```
2024-12-04 14:32:45.584 [main] INFO Application - Application started in 0.303 seconds.
2024-12-04 14:32:45.682 [main] INFO Application - Responding at http://0.0.0.0:8080

#### Direct interaction with libray sample


```kotlin
channel("direct-calls"){
basicPublish("test", "test-routing-key", null, "fdsf".toByteArray())

val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
println("Received message: ${body?.let { String(it) }}")
}
}
basicConsume(queueName, true, consumer)
}
```


## Dead Letter Queue Example

```kotlin
@Serializable
data class Message(
var content: String
)

fun Application.queueBinding() {
install(KabbitMQ) {
uri = "amqp://guest:guest@localhost:5678"
connectionName = "guest"
}

// declare dead letter queue
queueBind {
queue = "dlq"
exchange = "dlx"
routingKey = "dlq-dlx"
queueDeclare {
queue = "dlq"
durable = true
}
exchangeDeclare {
exchange = "dlx"
type = BuiltinExchangeType.DIRECT
}
}

// declare queue configured with dead letter queue
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = BuiltinExchangeType.FANOUT
}
}

repeat(10) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}

basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { tag, message ->
/* process message */
// ...

/* simulate something went wrong */
basicReject {
deliveryTag = tag
requeue = false
}
}
}

basicConsume {
queue = "dlq"
autoAck = true
deliverCallback<Message> { _, message ->
/* process message */
println("Message in DLQ: $message")
}
}
}
```
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ publishing {
}

signing {
//sign(publishing.publications["kotlin"])
sign(publishing.publications["kotlin"])
}

tasks.register<Zip>("zipBuildFolder") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.mesh.kabbitMq.delegator.Delegator.Companion.initialized
import com.mesh.kabbitMq.delegator.Delegator.Companion.stateTrace
import com.mesh.kabbitMq.delegator.Delegator.Companion.withThisRef
import com.rabbitmq.client.*
import io.ktor.util.logging.*
import kotlinx.serialization.json.Json

@KabbitMQDslMarker
Expand Down Expand Up @@ -64,7 +65,7 @@ class KabbitMQBasicConsumeBuilder(

fun build(): String = withThisRef(this@KabbitMQBasicConsumeBuilder) {
return@withThisRef when {
initialized(::consumerTag, ::deliverCallback, ::cancelCallback) -> {
initialized(::consumerTag, ::deliverCallback, ::cancelCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -74,7 +75,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::consumerTag, ::deliverCallback, ::cancelCallback, ::shutdownSignalCallback) -> {
initialized(::consumerTag, ::deliverCallback, ::cancelCallback, ::shutdownSignalCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -88,7 +89,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback, ::shutdownSignalCallback) -> {
initialized(::deliverCallback, ::cancelCallback, ::shutdownSignalCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -99,7 +100,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::consumerTag, ::deliverCallback, ::shutdownSignalCallback) -> {
initialized(::consumerTag, ::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -109,7 +110,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::shutdownSignalCallback) -> {
initialized(::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -119,7 +120,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::shutdownSignalCallback) -> {
initialized(::deliverCallback, ::shutdownSignalCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -128,7 +129,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback) -> {
initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -137,7 +138,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback) -> {
initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -146,7 +147,7 @@ class KabbitMQBasicConsumeBuilder(
)
}

initialized(::deliverCallback, ::cancelCallback) -> {
initialized(::deliverCallback, ::cancelCallback, ::autoAck) -> {
channel.basicConsume(
queue,
autoAck,
Expand All @@ -158,7 +159,7 @@ class KabbitMQBasicConsumeBuilder(


else -> {
stateTrace().forEach { println(it) }
stateTrace().forEach { KtorSimpleLogger("KabbitMQBasicConsumeBuilder").warn(it) }
error("Unsupported combination of parameters for basicConsume.")
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/mesh/kabbitMq/delegator/Delegator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal class Delegator<T : Any>{
fun stateTrace(thisRef: Any = ref): List<String> {
return thisRef::class.memberProperties.map {
val initialized = (stateMap[thisRef.javaClass.simpleName to it.name] is State.Initialized)
"${thisRef.javaClass.simpleName}: <${it.name}>, initialized: <$initialized>"
"<${it.name}>, initialized: <$initialized>"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ inline fun Application.channel(id: String, autoClose: Boolean = true, block: Cha

@KabbitMQDslMarker
inline fun Connection.channel(block: Channel.() -> Unit): Channel {
return this.createChannel().also(block)
return this.createChannel().also(block).apply{ this.close() }
}

@KabbitMQDslMarker
inline fun Connection.channel(id: String, block: Channel.() -> Unit): Channel {
return this.createChannel().also(block)
return this.createChannel().also(block).apply{ this.close() }
}
Loading

0 comments on commit 98f5a82

Please sign in to comment.