Skip to content

Commit

Permalink
Merge pull request #535 from FoxComm/hotfix/messaging-fails-at-decode
Browse files Browse the repository at this point in the history
Hotfix messaging decoding issue
  • Loading branch information
mempko authored Dec 5, 2016
2 parents b103e50 + a9e0d5a commit add66f8
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions messaging/src/messaging/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
json/parse-string
decode-activity-json))


(defn- safe-decode-minimal
[message]
(try
(let [msg (-> message :value str)]
(try
(json/parse-string msg)
(catch Exception _ msg)))
(catch Exception _
message)))


(def stop (atom false))

(defn start-app
Expand All @@ -66,13 +78,16 @@
(log/info "Partitions subscribed to:" (partition-subscriptions c))
(loop []
(let [cr (poll! c)]
(doseq [record cr :let [msg (decode record)]]
(log/debug msg)
(doseq [record cr]
(try
(mail/handle-activity msg)
(commit-offsets-async! c {(select-keys record [:topic :partition])
{:offset (:offset record) :metadata ""}})
(catch Exception e (log/error "Caught exception: " e)))))
(let [msg (decode record)]
(log/debug msg)
(mail/handle-activity msg)
(commit-offsets-sync! c {(select-keys record [:topic :partition])
{:offset (:offset record) :metadata ""}}))
(catch Exception e
(log/errorf "Caught exception: %s\n\tDuring processing %s" e
(safe-decode-minimal record))))))

(when-not @stop
(recur))))
Expand Down

0 comments on commit add66f8

Please sign in to comment.