diff --git a/re/src/re/service/timeout.clj b/re/src/re/service/timeout.clj index 27989c4..084bd60 100644 --- a/re/src/re/service/timeout.clj +++ b/re/src/re/service/timeout.clj @@ -1,8 +1,7 @@ (ns re.service.timeout (:require [integrant.core :as ig] [clojure.tools.logging :refer [error info]] - [re.utils :as utils] - [proto-edn.core :as proto]) + [clojure.edn :as edn]) (:import events.SystemEventOuterClass$TimerExpired events.SystemEventOuterClass$Timer (org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder KeyValue) @@ -28,7 +27,8 @@ store-key {:expireAt (request "expireAt") :partition (.partition ctx) :offset (.offset ctx)} - store-value {:recipientId recipientId} + store-value {:recipientId recipientId + :correlationId (request "correlationId")} timers-store (.getStateStore ctx "timers")] (.put timers-store store-key store-value) nil)) @@ -40,13 +40,13 @@ (serializer [_] (reify Serializer (configure [_ _ _] nil) - (serialize [_ topic {expireAt :expireAt partition :partition offset :offset}] + (serialize [_ _ {expireAt :expireAt partition :partition offset :offset}] (.getBytes (format "%016X-%02X-%016X" expireAt partition offset))) (close [_] nil))) (deserializer [_] (reify Deserializer (configure [_ _ _] nil) - (deserialize [_ topic bytes] + (deserialize [_ _ bytes] (let [str (String. bytes) expireAt (Long/parseLong (.substring str 0 16) 16) partition (Integer/parseInt (.substring str 17 19) 16) @@ -61,16 +61,14 @@ (serializer [_] (reify Serializer (configure [_ _ _] nil) - (serialize [_ topic {recipientId :recipientId}] - (.getBytes (format "%s" recipientId))) + (serialize [_ _ map] + (.getBytes (str map))) (close [_] nil))) (deserializer [_] (reify Deserializer (configure [_ _ _] nil) - (deserialize [_ topic bytes] - (let [str (String. bytes) - recipientId str] - {:recipientId recipientId})) + (deserialize [_ _ bytes] + (edn/read-string (String. bytes))) (close [_] nil))))) (defn punctuator [context] @@ -78,14 +76,17 @@ (reify Punctuator (punctuate [_ _] (let [timers-store (.getStateStore ctx "timers") - t0 {:expireAt 0 :partition 0 :offset 0} - now {:expireAt (+ (quot (System/currentTimeMillis) 1000) 1) :partition 0 :offset 0} - timers (.range timers-store t0 now)] + t0-key {:expireAt 0 :partition 0 :offset 0} + now (quot (System/currentTimeMillis) 1000) + now-key {:expireAt now :partition 0 :offset 0} + timers (.range timers-store t0-key now-key)] (try (while (.hasNext timers) (do (let [timer (.next timers) - event {"header" {"type" "events.TimerExpired" "createdAt" (:expireAt now) "source" "test"} - "correlationId" "0" + event {"header" {"type" "events.TimerExpired" + "createdAt" now + "source" "TimeoutService"} + "correlationId" (:correlationId (.-value timer)) "universe" "0"} bytes ((:encoder context) "events.TimerExpired" event)] (.delete timers-store (.-key timer)) diff --git a/re/test/re/service/timeout_test.clj b/re/test/re/service/timeout_test.clj index df762ea..5241226 100644 --- a/re/test/re/service/timeout_test.clj +++ b/re/test/re/service/timeout_test.clj @@ -36,10 +36,12 @@ {:topic "timer-requests" :key "entity#43" :value (let [event {"header" {"type" "events.Timer" "createdAt" 0 "source" "test"} - "correlationId" "0" + "correlationId" "correl#11" "expireAt" 1} bytes ((:re/encoder system) "events.Timer" event)] bytes)}) (let [res (deref (::db system) 30000 ::timeout)] (assert (not= ::timeout res)) (let [result ((:re/decoder system) (:value res))] - (is (= "events.TimerExpired" ((result "header") "type"))))))) + (is (and + (= "events.TimerExpired" ((result "header") "type")) + (= "correl#11" (result "correlationId"))))))))