Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dependency updates (Clojure and Spring) as well as adding request-reply functionality and providing access to headers #8

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
*/target
target
*/.nrepl-port
.lein-repl-history
.lein-failures
.lein-repl-history
.nrepl-port
pom.xml
<<<<<<< HEAD
=======
*jar
lib
classes
Expand All @@ -7,4 +16,7 @@ classes
/clamq-core/.lein-failures
/clamq-jms/.lein-failures
/clamq-rabbitmq/.lein-failures
/clamq-runner/.lein-failures
/clamq-runner/.lein-failures
*/.lein-repl-history
*/.nrepl-port
>>>>>>> gdp-update
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,34 @@ The configuration map currently supports the following keys:

* **:pubSub** : valid only for JMS brokers, defines (true/false) if produced messages are for publish/subscribe, that is, must be sent to a topic.

###### Publish

Once defined, the producer can be used to send messages as follows:

(publish producer endpoint message attributes)
(publish producer endpoint message headers)

Where:

* **producer** is a producer obtained as previously described.
* **endpoint** is the definition of a message queue endpoint.
* **message** is the message to be sent, of type text or object.
* **attributes** is only for JMS brokers, and defines an optional map of attributes to set into the message.
* **headers** is only for JMS brokers. It allows for setting JMS header attributes as well as for providing general purpose key/value properties.

`TODO: give example of headers map`

The **endpoint** definition depends on the actual broker:
for JMS brokers, it is just the queue/topic name, for AMQP brokers it is a map containing two entries, *:exchange* and *:routing-key*.


###### Request-Reply

The Producer also has the ability to perform a synchronous request-reply. Under the covers, this leverages a temporary destination as the return channel where a blocking **receive** is performed on that channel for a given amount of time (as defined by **receive-timeout**. defaults to 10 seconds). If no response is received within that time, *nil* is returned.


(request-reply producer endpoint message headers)



### Message consumers

Message consumers work by asynchronously pulling messages from the queue they listen to, and processing them through a user-defined function.
Expand Down Expand Up @@ -303,4 +317,4 @@ Clamq tests work good as examples too:

## Feedback

Feel free to open issues on the project tracker, and/or contact me on twitter: [sbtourist](http://twitter.com/sbtourist)
Feel free to open issues on the project tracker, and/or contact me on twitter: [sbtourist](http://twitter.com/sbtourist)
7 changes: 4 additions & 3 deletions clamq-activemq/project.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(defproject clamq/clamq-activemq "0.5-SNAPSHOT"
:description "Clojure APIs for Message Queues"
:dependencies [[clamq/clamq-jms "0.5-SNAPSHOT"]
[org.slf4j/slf4j-api "1.6.1"]
[org.apache.activemq/activemq-core "5.5.0"]]
:dev-dependencies [[org.slf4j/slf4j-simple "1.6.1"]])
[org.clojure/clojure "1.8.0"]
[org.slf4j/slf4j-api "1.7.21"]
[org.apache.activemq/activemq-core "5.7.0"]]
:dev-dependencies [[org.slf4j/slf4j-simple "1.6.1"]])
14 changes: 9 additions & 5 deletions clamq-activemq/src/clamq/activemq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
[org.apache.activemq ActiveMQConnectionFactory]
[org.springframework.jms.connection CachingConnectionFactory]))

(defn activemq-connection [broker & {username :username password :password max-connections :max-connections :or {max-connections 1}}]
"Returns an ActiveMQ javax.jms.ConnectionFactory pointing to the given broker url.
It currently supports the following optional named arguments (refer to ActiveMQ docs for more details about them):
:username, :password"
(defn activemq-connection
"Returns an ActiveMQ javax.jms.ConnectionFactory pointing to the given broker url.
It currently supports the following optional named arguments (refer to ActiveMQ docs for more details about them):
:username, :password"
[broker & {:keys [username password max-connections]
:or {max-connections 1}}]
(when (nil? broker) (throw (IllegalArgumentException. "No value specified for broker URL!")))
(let [factory (doto (ActiveMQConnectionFactory. broker) (.setUserName username) (.setPassword password))
(let [factory (doto (ActiveMQConnectionFactory. broker)
(.setUserName username)
(.setPassword password))
pool (CachingConnectionFactory. factory)]
(jms/jms-connection pool #(.destroy pool))))
3 changes: 2 additions & 1 deletion clamq-activemq/test/clamq/test/activemq_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
(setup-connection-and-test producer-consumer-test)
(setup-connection-and-test producer-consumer-topic-test)
(setup-connection-and-test producer-consumer-limit-test)
(setup-connection-and-test producer-consumer-with-properties-test)
(setup-connection-and-test on-failure-test)
(setup-connection-and-test transacted-test)
(setup-connection-and-test seqable-consumer-test)
Expand All @@ -21,4 +22,4 @@
(setup-connection-and-test multi-pipe-limit-test)
(setup-connection-and-test router-pipe-test)
(setup-connection-and-test router-pipe-topic-test)
(setup-connection-and-test router-pipe-limit-test))
(setup-connection-and-test router-pipe-limit-test))
4 changes: 2 additions & 2 deletions clamq-core/project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject clamq/clamq-core "0.5-SNAPSHOT"
:description "Clojure APIs for Message Queues"
:dependencies [[org.clojure/clojure "1.3.0"]
[org.springframework/spring-context "3.0.5.RELEASE"]])
:dependencies [[org.clojure/clojure "1.8.0"]
[org.springframework/spring-context "4.3.2.RELEASE"]])
27 changes: 18 additions & 9 deletions clamq-core/src/clamq/internal/macros.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
(ns clamq.internal.macros)

(defmacro validate
"Validates value is not nil. If it is, throw IllegalArgumentException with failed-msg"
[value failed-msg]
`(when (nil? ~value)
(throw (IllegalArgumentException. ~failed-msg))))

(defmacro non-blocking-listener [listener-class listener-method converter handler-fn failure-fn limit container]
`(let [~'counter (atom 0)]
(proxy [~listener-class] []
(~listener-method [~'message]
(let [~'converted (.fromMessage ~converter ~'message)]
(swap! ~'counter inc)
(try
(~handler-fn ~'converted)
(catch Exception ~'ex
(~failure-fn {:message ~'converted :exception ~'ex}))
(finally
(if (= ~limit ~'@counter) (do (.stop ~container) (future (.shutdown ~container)))))))))))
(try
(let [~'converted (.fromMessage ~converter ~'message)]
(swap! ~'counter inc)
(try
(~handler-fn ~'converted)
(catch Exception ~'ex
(~failure-fn {:message ~'converted :exception ~'ex}))
(finally
(if (= ~limit ~'@counter) (do (.stop ~container) (future (.shutdown ~container)))))))
(catch Exception ~'ex
(~failure-fn {:message ~'message :exception ~'ex})))))))

(defmacro blocking-listener [listener-class listener-method converter request-queue reply-queue container]
`(proxy [~listener-class] []
Expand All @@ -20,7 +29,7 @@
(loop []
; Is spinning really the better option?
(let [~'m (.poll ~reply-queue 1000 java.util.concurrent.TimeUnit/MILLISECONDS)]
(cond
(cond
(and (nil? ~'m) (.isRunning ~container)) (recur)
(and (nil? ~'m) (not (.isRunning ~container))) (throw (RuntimeException.))
(= :rollback ~'m) (throw (RuntimeException.))
Expand Down
2 changes: 1 addition & 1 deletion clamq-core/src/clamq/protocol/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
(producer [self] [self conf])
(consumer [self conf])
(seqable [self conf])
(close [self]))
(close [self]))
3 changes: 2 additions & 1 deletion clamq-core/src/clamq/protocol/producer.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
(ns clamq.protocol.producer)

(defprotocol Producer
(publish [self destination message] [self destination message attributes]))
(publish [self destination message] [self destination message attributes])
(request-reply [self destination message attributes]))
5 changes: 3 additions & 2 deletions clamq-jms/project.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(defproject clamq/clamq-jms "0.5-SNAPSHOT"
:description "Clojure APIs for Message Queues"
:dependencies [[clamq/clamq-core "0.5-SNAPSHOT"]
[org.slf4j/slf4j-api "1.6.1"]
[org.springframework/spring-jms "3.0.5.RELEASE"]])
[org.slf4j/slf4j-api "1.7.21"]
[org.springframework/spring-jms "4.3.2.RELEASE"]
[camel-snake-kebab "0.4.0"]])
70 changes: 70 additions & 0 deletions clamq-jms/src/clamq/converters.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
(ns clamq.converters
(:require [camel-snake-kebab.core :refer [->PascalCase]])
(:import [org.springframework.jms.support.converter MessageConverter SimpleMessageConverter]
[org.springframework.jms.core JmsTemplate MessageCreator]
[javax.jms Session TextMessage DeliveryMode]))

(defn- jms-message->headers [jms-message]
{:correlation-id (.getJMSCorrelationID jms-message)
:correlation-id-as-bytes (.getJMSCorrelationIDAsBytes jms-message)
:delivery-mode (.getJMSDeliveryMode jms-message)
:destination (.getJMSDestination jms-message)
:expiration (.getJMSExpiration jms-message)
:message-id (.getJMSMessageID jms-message)
:priority (.getJMSPriority jms-message)
:redelivered (.getJMSRedelivered jms-message)
:reply-to (.getJMSReplyTo jms-message)
:timestamp (.getJMSTimestamp jms-message)
:type (.getJMSType jms-message)})

(defn uuid []
(.toString (java.util.UUID/randomUUID)))

(defn- headers->jms-message [jms-message headers]
(.setJMSCorrelationID jms-message (:correlation-id headers (uuid)))
;; (.setJMSCorrelationIDAsBytes jms-message (:correlation-id-as-bytes headers))
(.setJMSDeliveryMode jms-message (:delivery-mode headers (DeliveryMode/PERSISTENT)))
(.setJMSExpiration jms-message (:expiration headers 0))
(.setJMSPriority jms-message (:priority headers 4))
(.setJMSRedelivered jms-message (:redelivered headers false))
(.setJMSType jms-message (:jms-type headers))
;; (.setJMSReplyTo jms-message (:reply-to headers))
jms-message)

(defn- jms-message->properties [jms-message]
(when-let [properties (.getProperties jms-message)]
(zipmap
(map #(keyword %) (keys properties))
(vals properties))))

(defn- properties->jms-message [jms-message properties {:keys [key-fn]
:or {key-fn ->PascalCase}}]
(doseq [[k v] properties]
(.setStringProperty jms-message (key-fn (name k)) v))
jms-message)

(defn body-and-header-converter []
(reify
MessageConverter
(fromMessage [this message]
(let [body (.fromMessage (SimpleMessageConverter.) message)
hdrs (jms-message->headers message)
props (jms-message->properties message)]
{:headers (merge hdrs props) :body body}))

(toMessage [this message session]
(.toMessage (SimpleMessageConverter.) message session))))

(defn- text-message [session message headers opts]
(let [txt-msg (-> (.createTextMessage session message)
(headers->jms-message headers)
(properties->jms-message (:properties headers) opts))]
txt-msg))

(defn message-creator
"Creates an implementation of a Spring MessageCreator. This implementation creates a
TextMessage and sets any provided headers and properties on that message."
[message headers & [opts]]
(reify MessageCreator
(createMessage [_ session]
(text-message session message headers opts))))
82 changes: 49 additions & 33 deletions clamq-jms/src/clamq/jms.clj
Original file line number Diff line number Diff line change
@@ -1,65 +1,80 @@
(ns clamq.jms
(:require
[clamq.helpers :as helpers]
[clamq.helpers :as helpers]
[clamq.internal.macros :as macros]
[clamq.internal.utils :as utils]
[clamq.internal.utils :as utils]
[clamq.protocol.connection :as connection]
[clamq.protocol.consumer :as consumer]
[clamq.protocol.seqable :as seqable]
[clamq.protocol.producer :as producer])
[clamq.protocol.producer :as producer]
[clamq.converters :as conv])
(:import
[java.util.concurrent SynchronousQueue]
[javax.jms BytesMessage ObjectMessage TextMessage ExceptionListener MessageListener]
[org.springframework.jms.core JmsTemplate MessagePostProcessor]
[javax.jms BytesMessage ObjectMessage TextMessage ExceptionListener MessageListener Session]
[org.springframework.jms.core JmsTemplate MessagePostProcessor MessageCreator]
[org.springframework.jms.support.converter SimpleMessageConverter]
[org.springframework.jms.listener DefaultMessageListenerContainer]))

(defn- proxy-message-post-processor [attributes]
(proxy [MessagePostProcessor] []
(postProcessMessage [message]
(doseq [attribute attributes] (.setStringProperty message (attribute 0) (attribute 1)))
message)))

(defn- jms-producer [connection {pubSub :pubSub :or {pubSub false}}]
(when (nil? connection) (throw (IllegalArgumentException. "No value specified for connection!")))
(defn- jms-producer [connection {:keys [pubsub time-to-live receive-timeout]
:or {pubsub false
receive-timeout 10000}}]
(macros/validate connection "No value specified for connection!")
(let [template (JmsTemplate. connection)]
(doto template (.setMessageConverter (SimpleMessageConverter.)) (.setPubSubDomain pubSub))
(doto template
(.setMessageConverter (conv/body-and-header-converter))
(.setPubSubDomain pubsub)
(.setReceiveTimeout receive-timeout))
(when time-to-live
(doto template
(.setExplicitQosEnabled true)
(.setTimeToLive time-to-live)))

(reify producer/Producer
(publish [self destination message attributes]
(.convertAndSend template destination message (proxy-message-post-processor attributes)))
(publish [self destination message] (producer/publish self destination message {})))))
(publish [_ destination message headers]
(.send template destination (conv/message-creator message headers)))
(publish [self destination message] (producer/publish self destination message {}))
(request-reply [_ destination message headers]
(.sendAndReceive template destination (conv/message-creator message headers))))))

(defn- jms-consumer [connection {endpoint :endpoint handler-fn :on-message transacted :transacted pubSub :pubSub limit :limit failure-fn :on-failure :or {pubSub false limit 0 failure-fn helpers/rethrow-on-failure}}]
(when (nil? connection) (throw (IllegalArgumentException. "No value specified for connection!")))
(when (nil? endpoint) (throw (IllegalArgumentException. "No value specified for :endpoint!")))
(when (nil? transacted) (throw (IllegalArgumentException. "No value specified for :transacted!")))
(when (nil? handler-fn) (throw (IllegalArgumentException. "No value specified for :on-message!")))
(let [container (DefaultMessageListenerContainer.)
listener (macros/non-blocking-listener MessageListener onMessage (SimpleMessageConverter.) handler-fn failure-fn limit container)]
(defn- jms-consumer [connection {:keys [endpoint transacted pubsub limit convert-with-headers]
handler-fn :on-message
failure-fn :on-failure
:or {pubsub false
limit 0
failure-fn helpers/rethrow-on-failure
convert-with-headers false}}]
(macros/validate connection "No value specified for connection!")
(macros/validate endpoint "No value specified for :endpoint!")
(macros/validate transacted "No value specified for :transacted!")
(macros/validate handler-fn "No value specified for :on-message!")
(let [container (DefaultMessageListenerContainer.)
msg-converter (if convert-with-headers (conv/body-and-header-converter) (SimpleMessageConverter.))
listener (macros/non-blocking-listener MessageListener onMessage msg-converter handler-fn failure-fn limit container)]
(doto container
(.setConnectionFactory connection)
(.setDestinationName endpoint)
(.setMessageListener listener)
(.setSessionTransacted transacted)
(.setPubSubDomain pubSub)
(.setPubSubDomain pubsub)
(.setConcurrentConsumers 1))
(reify consumer/Consumer
(start [self] (do (doto container (.start) (.initialize)) nil))
(close [self] (do (.shutdown container) nil)))))

(defn- jms-seqable-consumer [connection {endpoint :endpoint timeout :timeout :or {timeout 0}}]
(when (nil? connection) (throw (IllegalArgumentException. "No value specified for connection!")))
(when (nil? endpoint) (throw (IllegalArgumentException. "No value specified for :endpoint!")))
(let [request-queue (SynchronousQueue.) reply-queue (SynchronousQueue.)
container (DefaultMessageListenerContainer.)
(defn- jms-seqable-consumer [connection {:keys [endpoint timeout] :or {timeout 0}}]
(macros/validate connection "No value specified for connection!")
(macros/validate endpoint "No value specified for :endpoint!")
(let [request-queue (SynchronousQueue.)
reply-queue (SynchronousQueue.)
container (DefaultMessageListenerContainer.)
listener (macros/blocking-listener MessageListener onMessage (SimpleMessageConverter.) request-queue reply-queue container)]
(doto container
(.setConnectionFactory connection)
(.setDestinationName endpoint)
(.setMessageListener listener)
(.setSessionTransacted true)
(.setConcurrentConsumers 1)
(.start)
(.start)
(.initialize))
(reify seqable/Seqable
(mseq [self]
Expand All @@ -72,8 +87,9 @@
(.offer reply-queue :rollback 5 java.util.concurrent.TimeUnit/SECONDS)
(.shutdown container)))))

(defn jms-connection [connectionFactory close-fn]
"Returns a JMS Connection from the given javax.jms.ConnectionFactory object."
(defn jms-connection
"Returns a JMS Connection from the given javax.jms.ConnectionFactory object."
[connectionFactory close-fn]
(reify connection/Connection
(producer [self]
(jms-producer connectionFactory {}))
Expand Down
Loading