Skip to content

Commit

Permalink
Merge pull request #3 from commercetools/auto-ack-rework
Browse files Browse the repository at this point in the history
Make auto-ack streams access the message
  • Loading branch information
satabin authored Feb 20, 2024
2 parents 2260776 + 7ae1046 commit 55f1077
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 36 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.effect.IO
import cats.effect.std.Random
import scala.concurrent.duration._

import de.commercetools.queue._
import com.commercetools.queue._

def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] =
Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random =>
Expand All @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// waiting max for 20 seconds
// print every received message,
// and ack automatically
.processWithAutoAck(5, 20.seconds)(IO.println(_))
.processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload))
// results are non important
.drain

Expand All @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = {
## Working with Azure Service Bus queues

```scala
import de.commercetools.queue.azure.servicebus._
import com.commercetools.queue.azure.servicebus._
import com.azure.identity.DefaultAzureCredentialBuilder

val namespace = "{namespace}.servicebus.windows.net" // your namespace
Expand All @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_))


```scala
import de.commercetools.queue.aws.sqs._
import com.commercetools.queue.aws.sqs._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/com/commercetools/queue/Message.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.commercetools.queue

import java.time.Instant

/**
* Interface to access message data received from a queue.
*/
trait Message[T] {

/**
* Unique message identifier
*/
def messageId: String

/**
* The message payload
*/
def payload: T

/**
* When the message was put into the queue.
*/
def enqueuedAt: Instant

/**
* Raw message metadata (depending on the underlying queue system).
*/
def metadata: Map[String, String]

}
23 changes: 1 addition & 22 deletions core/src/main/scala/com/commercetools/queue/MessageContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,13 @@ package com.commercetools.queue
import cats.effect.IO

import java.time
import java.time.Instant
import scala.concurrent.duration._

/**
* Interface to interact with a message received from a queue.
* The messages must be explicitly aknowledged after having been processed.
*/
trait MessageContext[T] {

/**
* Unique message identifier
*/
def messageId: String

/**
* The message payload
*/
def payload: T

/**
* When the message was put into the queue.
*/
def enqueuedAt: Instant

/**
* Raw message metadata (depending on the underlying queue system).
*/
def metadata: Map[String, String]
trait MessageContext[T] extends Message[T] {

/**
* Acknowledges the message. It will be removed from the queue, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ trait QueueSubscriber[T] {
* Messages in a batch are processed sequentially, stopping at the first error.
* All results up to the error will be emitted downstream before failing.
*/
def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res]): Stream[IO, Res] = {
def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res])
: Stream[IO, Res] = {
// to have full control over nacking things in time after a failure, and emitting
// results up to the error, we resort to a `Pull`, which allows this fine graind control
// over pulling/emitting/failing
Expand All @@ -53,7 +54,7 @@ trait QueueSubscriber[T] {
} else {
val ctx = chunk(idx)
Pull
.eval(f(ctx.payload).guaranteeCase {
.eval(f(ctx).guaranteeCase {
case Outcome.Succeeded(_) => ctx.ack()
case _ =>
// if it was cancelled or errored, let's nack this and up to the end of the chunk
Expand Down Expand Up @@ -86,10 +87,10 @@ trait QueueSubscriber[T] {
* Messages in a batch are processed in parallel but result is emitted in
* order the messages were received.
*/
def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res])
def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res])
: Stream[IO, Either[Throwable, Res]] =
messages(batchSize, waitingTime).parEvalMap(batchSize)(ctx =>
f(ctx.payload).attempt.flatTap {
f(ctx).attempt.flatTap {
case Right(_) => ctx.ack()
case Left(_) => ctx.nack()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ class SubscriberSuite extends CatsEffectSuite {
result <- subscriber
// take all messages in one big batch
.processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m =>
IO.raiseWhen(m == "message-43")(new Exception("BOOM")).as(m))
IO.raiseWhen(m.payload == "message-43")(new Exception("BOOM")).as(m))
.attempt
.compile
.toList
} yield (messages, result))
.flatMap { case (originals, result) =>
for {
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.init.map(_.map(_.payload))), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, originals.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
Expand Down
8 changes: 4 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.effect.IO
import cats.effect.std.Random
import scala.concurrent.duration._

import de.commercetools.queue._
import com.commercetools.queue._

def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] =
Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random =>
Expand All @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// waiting max for 20 seconds
// print every received message,
// and ack automatically
.processWithAutoAck(5, 20.seconds)(IO.println(_))
.processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload))
// results are non important
.drain

Expand All @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = {
## Working with Azure Service Bus queues

```scala mdoc:compile-only
import de.commercetools.queue.azure.servicebus._
import com.commercetools.queue.azure.servicebus._
import com.azure.identity.DefaultAzureCredentialBuilder

val namespace = "{namespace}.servicebus.windows.net" // your namespace
Expand All @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_))


```scala mdoc:compile-only
import de.commercetools.queue.aws.sqs._
import com.commercetools.queue.aws.sqs._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

Expand Down

0 comments on commit 55f1077

Please sign in to comment.