Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the QueuePusher API #7

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ def subscribeStream(subscriber: QueueSubscriber[IO, String]): Stream[IO, Nothing

def program(client: QueueClient[IO]): IO[Unit] = {
val queueName = "my-queue"
client.publisher[String](queueName).use { publisher =>
// subscribe and publish concurrently
subscribeStream(client.subscriber[String](queueName))
.concurrently(publishStream(publisher))
.compile
// runs forever
.drain
}
// subscribe and publish concurrently
subscribeStream(client.subscribe[String](queueName))
// concurrently publish messages
.concurrently(publishStream(client.publish[String](queueName)))
.compile
// runs forever
.drain
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
override def administration: QueueAdministration[F] =
new SQSAdministration(client, getQueueUrl(_))

override def publisher[T: Serializer](name: String): Resource[F, QueuePublisher[F, T]] =
Resource.eval(getQueueUrl(name).map(new SQSPublisher(_, client)))
override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new SQSPublisher(client, getQueueUrl(name))

override def subscriber[T: Deserializer](name: String): QueueSubscriber[F, T] =
override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new SQSSubscriber[F, T](getQueueUrl(name), client)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,19 @@

package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.functor._
import com.commercetools.queue.{QueuePublisher, Serializer}
import cats.effect.{Async, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPublisher[F[_], T](queueUrl: String, client: SqsAsyncClient)(implicit F: Async[F], serializer: Serializer[T])
class SQSPublisher[F[_], T](
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
F: Async[F],
serializer: Serializer[T])
extends QueuePublisher[F, T] {

override def publish(message: T, delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void

override def publish(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
client.sendMessageBatch(
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.map { message =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.delaySeconds(delaySeconds)
.build()
}.asJava)
.build())
}
}.void
override def pusher: Resource[F, QueuePusher[F, T]] =
Resource.eval(getQueueUrl).map(new SQSPusher(client, _))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.functor._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit serializer: Serializer[T], F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
client.sendMessageBatch(
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.map { message =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.delaySeconds(delaySeconds)
.build()
}.asJava)
.build())
}
}.void

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ class ServiceBusClient[F[_]] private (
override def administration: QueueAdministration[F] =
new ServiceBusAdministration(adminBuilder.buildClient())

override def publisher[T: Serializer](name: String): Resource[F, QueuePublisher[F, T]] =
for {
sender <- Resource.make(F.delay(clientBuilder.sender().queueName(name).buildClient()))(s => F.delay(s.close()))
} yield new ServiceBusQueuePublisher[F, T](sender)
override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new ServiceBusQueuePublisher[F, T](clientBuilder, name)

override def subscriber[T: Deserializer](name: String): QueueSubscriber[F, T] =
override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new ServiceBusQueueSubscriber[F, T](name, clientBuilder)

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.syntax.all._
import com.azure.messaging.servicebus.{ServiceBusMessage, ServiceBusSenderClient}
import com.commercetools.queue.{QueuePusher, Serializer}

import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit serializer: Serializer[Data], F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
delay.traverse_ { delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
} *>
F.blocking(sender.sendMessage(sbMessage)).void
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
delay.traverse_ { delay =>
F.realTimeInstant.map { now =>
sbMessages.foreach { msg =>
msg.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC))
}
}
} *>
F.blocking(sender.sendMessages(sbMessages.asJava)).void
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,25 @@

package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.syntax.all._
import com.azure.messaging.servicebus.{ServiceBusMessage, ServiceBusSenderClient}
import com.commercetools.queue.{QueuePublisher, Serializer}

import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}

class ServiceBusQueuePublisher[F[_], Data](
sender: ServiceBusSenderClient
clientBuilder: ServiceBusClientBuilder,
queueName: String
)(implicit
F: Async[F],
serializer: Serializer[Data])
extends QueuePublisher[F, Data] {

override def publish(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
delay.traverse_(delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))) *>
F.blocking(sender.sendMessage(sbMessage)).void
}

override def publish(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
F.blocking(sender.sendMessages(sbMessages.asJava)).void
}
override def pusher: Resource[F, QueuePusher[F, Data]] =
Resource
.make {
F.delay(clientBuilder.sender().queueName(queueName).buildClient())
} { s =>
F.delay(s.close())
}
.map(new ServiceBusPusher(_))

}
12 changes: 5 additions & 7 deletions core/src/main/scala/com/commercetools/queue/QueueClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.commercetools.queue

import cats.effect.Resource

/**
* The entry point to using queues.
* A client will manage connection pools and has knowledge of the underlying queue system.
Expand All @@ -26,18 +24,18 @@ import cats.effect.Resource
trait QueueClient[F[_]] {

/**
* Gives access to adminsitrative operations.
* Gives access to adminsitrative API.
*/
def administration: QueueAdministration[F]

/**
* Creates a publisher to the queue.
* Gives access to the publication API.
*/
def publisher[T: Serializer](name: String): Resource[F, QueuePublisher[F, T]]
def publish[T: Serializer](name: String): QueuePublisher[F, T]

/**
* Creates a subscriber of the queue.
* Gives access to the subscription API.
*/
def subscriber[T: Deserializer](name: String): QueueSubscriber[F, T]
def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T]

}
28 changes: 14 additions & 14 deletions core/src/main/scala/com/commercetools/queue/QueuePublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,33 @@

package com.commercetools.queue

import fs2.Pipe

import scala.concurrent.duration.FiniteDuration
import cats.effect.{MonadCancel, Resource}
import fs2.Stream

/**
* The interface to publish to a queue.
*/
trait QueuePublisher[F[_], T] {

/**
* Publishes a single message to the queue, with an optional delay.
*/
def publish(message: T, delay: Option[FiniteDuration]): F[Unit]
abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {

/**
* Publishes a bunch of messages to the queue, with an optional delay.
* Returns a way to bush messages into the queue.
* This is a low-level construct, mainly aiming at integrating existing
* code bases that require to push explicitly.
*
* '''Note:''' Prefer using the sinks below when possible.
*/
def publish(messages: List[T], delay: Option[FiniteDuration]): F[Unit]
def pusher: Resource[F, QueuePusher[F, T]]

/**
* Sink to pipe your [[fs2.Stream Stream]] into, in order to publish
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10): Pipe[F, T, Nothing] =
_.chunkN(batchSize).foreach { chunk =>
publish(chunk.toList, None)
def sink(batchSize: Int = 10)(upstream: Stream[F, T]): Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
}
}

}
37 changes: 37 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueuePusher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import scala.concurrent.duration.FiniteDuration

/**
* A queue pusher allows for pushing elements into a queue either on at a time
* or in batch.
*/
trait QueuePusher[F[_], T] {
JPisCoding marked this conversation as resolved.
Show resolved Hide resolved

/**
* Publishes a single message to the queue, with an optional delay.
*/
def push(message: T, delay: Option[FiniteDuration]): F[Unit]

/**
* Publishes a bunch of messages to the queue, with an optional delay.
*/
def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit]

}
Loading