Skip to content

Commit

Permalink
#5 passing correlationId
Browse files Browse the repository at this point in the history
  • Loading branch information
ojow committed May 31, 2018
1 parent 70c589a commit e55043d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
33 changes: 17 additions & 16 deletions re/src/re/service/timeout.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
(ns re.service.timeout
(:require [integrant.core :as ig]
[clojure.tools.logging :refer [error info]]
[re.utils :as utils]
[proto-edn.core :as proto])
[clojure.edn :as edn])
(:import events.SystemEventOuterClass$TimerExpired
events.SystemEventOuterClass$Timer
(org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder KeyValue)
Expand All @@ -28,7 +27,8 @@
store-key {:expireAt (request "expireAt")
:partition (.partition ctx)
:offset (.offset ctx)}
store-value {:recipientId recipientId}
store-value {:recipientId recipientId
:correlationId (request "correlationId")}
timers-store (.getStateStore ctx "timers")]
(.put timers-store store-key store-value)
nil))
Expand All @@ -40,13 +40,13 @@
(serializer [_]
(reify Serializer
(configure [_ _ _] nil)
(serialize [_ topic {expireAt :expireAt partition :partition offset :offset}]
(serialize [_ _ {expireAt :expireAt partition :partition offset :offset}]
(.getBytes (format "%016X-%02X-%016X" expireAt partition offset)))
(close [_] nil)))
(deserializer [_]
(reify Deserializer
(configure [_ _ _] nil)
(deserialize [_ topic bytes]
(deserialize [_ _ bytes]
(let [str (String. bytes)
expireAt (Long/parseLong (.substring str 0 16) 16)
partition (Integer/parseInt (.substring str 17 19) 16)
Expand All @@ -61,31 +61,32 @@
(serializer [_]
(reify Serializer
(configure [_ _ _] nil)
(serialize [_ topic {recipientId :recipientId}]
(.getBytes (format "%s" recipientId)))
(serialize [_ _ map]
(.getBytes (str map)))
(close [_] nil)))
(deserializer [_]
(reify Deserializer
(configure [_ _ _] nil)
(deserialize [_ topic bytes]
(let [str (String. bytes)
recipientId str]
{:recipientId recipientId}))
(deserialize [_ _ bytes]
(edn/read-string (String. bytes)))
(close [_] nil)))))

(defn punctuator [context]
(let [ctx (:ctx context)]
(reify Punctuator
(punctuate [_ _]
(let [timers-store (.getStateStore ctx "timers")
t0 {:expireAt 0 :partition 0 :offset 0}
now {:expireAt (+ (quot (System/currentTimeMillis) 1000) 1) :partition 0 :offset 0}
timers (.range timers-store t0 now)]
t0-key {:expireAt 0 :partition 0 :offset 0}
now (quot (System/currentTimeMillis) 1000)
now-key {:expireAt now :partition 0 :offset 0}
timers (.range timers-store t0-key now-key)]
(try
(while (.hasNext timers)
(do (let [timer (.next timers)
event {"header" {"type" "events.TimerExpired" "createdAt" (:expireAt now) "source" "test"}
"correlationId" "0"
event {"header" {"type" "events.TimerExpired"
"createdAt" now
"source" "TimeoutService"}
"correlationId" (:correlationId (.-value timer))
"universe" "0"}
bytes ((:encoder context) "events.TimerExpired" event)]
(.delete timers-store (.-key timer))
Expand Down
6 changes: 4 additions & 2 deletions re/test/re/service/timeout_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
{:topic "timer-requests"
:key "entity#43"
:value (let [event {"header" {"type" "events.Timer" "createdAt" 0 "source" "test"}
"correlationId" "0"
"correlationId" "correl#11"
"expireAt" 1}
bytes ((:re/encoder system) "events.Timer" event)] bytes)})
(let [res (deref (::db system) 30000 ::timeout)]
(assert (not= ::timeout res))
(let [result ((:re/decoder system) (:value res))]
(is (= "events.TimerExpired" ((result "header") "type")))))))
(is (and
(= "events.TimerExpired" ((result "header") "type"))
(= "correl#11" (result "correlationId"))))))))

0 comments on commit e55043d

Please sign in to comment.