Skip to content

Latest commit

 

History

History
116 lines (73 loc) · 5.83 KB

README.md

File metadata and controls

116 lines (73 loc) · 5.83 KB

Camel DSL for FS2

Apache Camel endpoints can be integrated into FS2 applications with a DSL.

Dependencies

The DSL is provided by the streamz-camel-fs2 artifact which is available for Scala 2.11 and 2.12:

resolvers += Resolver.bintrayRepo("krasserm", "maven")

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2"

Configuration

The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in application.conf:

streamz.camel.consumer.receive.timeout = 10s

DSL

The DSL can be imported with:

import streamz.camel.fs2.dsl._

Its usage requires an implicit StreamContext in scope. A StreamContext uses a CamelContext to manage the endpoints that are created and referenced by applications. A StreamContext with an internally managed CamelContext can be created with StreamContext():

import streamz.camel.StreamContext

// contains an internally managed CamelContext 
implicit val streamContext: StreamContext = StreamContext()

Applications that want to re-use an existing, externally managed CamelContext should create a StreamContext with StreamContext(camelContext: CamelContext):

import org.apache.camel.CamelContext
import streamz.camel.StreamContext

// externally managed CamelContext
val camelContext: CamelContext = ???

// re-uses the externally managed CamelContext
implicit val streamContext: StreamContext = StreamContext(camelContext)

A StreamContext internally manages an executorService for running blocking endpoint operations. Applications can configure a custom executor service by providing an executorServiceFactory during StreamContext creation. See API docs for details.

After usage, a StreamContext should be stopped with streamContext.stop().

Receiving in-only message exchanges from an endpoint

An FS2 stream that emits messages consumed from a Camel endpoint can be created with receive. Endpoints are referenced by their endpoint URI. For example,

import cats.effect.IO
import fs2.Stream
import streamz.camel.StreamContext 
import streamz.camel.StreamMessage 
import streamz.camel.fs2.dsl._

val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1")

creates an FS2 stream that consumes messages from the SEDA endpoint seda:q1 and converts them to StreamMessage[String]s. A StreamMessage[A] contains a message body of type A and message headers. Calling receive with a String type parameter creates an FS2 stream that converts consumed message bodies to type String before emitting them as StreamMessage[String]. Type conversion internally uses a Camel type converter. An FS2 stream that only emits the converted message bodies can be created with receiveBody:

val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1")

This is equivalent to receive[IO, String]("seda:q1").map(_.body).

receive and receiveBody can only be used with endpoints that create in-only message exchanges.

Receiving in-out message exchanges from an endpoint

...

Sending in-only message exchanges to an endpoint

For sending a StreamMessage to a Camel endpoint, the send combinator should be used:

val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")

This initiates an in-only message exchange with an endpoint and continues the stream with the sent StreamMessage.

The send combinator is not only available for streams of type Stream[IO, StreamMessage[A]] but more generally for any Stream[F, A] where F: ContextShift: Async.

val s2b: Stream[IO, String] = s1b.send("seda:q2")

If A is not a StreamMessage, send automatically wraps the message into a StreamMessage[A] before sending it to the endpoint and continues the stream with the unwrapped A.

Sending in-out message exchanges to an endpoint

For sending a request StreamMessage to an endpoint and obtaining a reply, the sendRequest combinator should be used:

val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")

This initiates an in-out message exchange with the endpoint and continues the stream with the output StreamMessage. Here, a Bean endpoint is used to call the weight(String): Int method on an object that is registered in the CamelContext under the name service. The input message body is used as weight call argument, the output message body is assigned the return value. The sendRequest type parameter (Int) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (Double, for example).

The sendRequest combinator is not only available for streams of type Stream[IO, StreamMessage[A]] but more generally for any Stream[F, A] where F: ContextShift: Async.

val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")

If A is not a StreamMessage, sendRequest automatically wraps the message into a StreamMessage[A] before sending it to the endpoint and continues the stream with the unwrapped message body B of the output StreamMessage[B].