Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into timeout_service
Browse files Browse the repository at this point in the history
  • Loading branch information
ojow committed May 31, 2018
2 parents 0b8ee6d + c267a5d commit 70c589a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion re/src/proto_edn/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@
;; (.getContainingOneof field))))
;; :let [v (.getField m field)]
(->>
(for [[field v] (.getAllFields m)]
(for [[^Descriptors$FieldDescriptor field v] (.getAllFields m)]
[(.getJsonName field)
(cond
(.isMapField field) (to-edn-map field v)
Expand Down
24 changes: 16 additions & 8 deletions re/src/re/kafka_consumer.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
(ns re.kafka-consumer
(:require [integrant.core :as ig]
[clojure.tools.logging :refer [error info]]
(:require [clojure.tools.logging :refer [error]]
[integrant.core :as ig]
[re.metrics :as metrics]
[re.utils :as utils])
(:import [java.util Collection Map]
[org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer]
org.apache.kafka.common.errors.WakeupException
[org.apache.kafka.common.serialization ByteArrayDeserializer StringDeserializer]))

(defonce metric-consume-summary
(metrics/make-summary
{:name "kafka_consume_seconds"
:help "Kafka events consuming time"
:labels ["topic" "partition"]}))

(defn consumer-loop [running? ^KafkaConsumer consumer ^Collection topics handler]
(.subscribe consumer topics)
(try
Expand All @@ -15,12 +22,13 @@
(doseq [^ConsumerRecord record res]
(utils/with-retry
(when @running?
(handler {:value (.value record)

:key (.key record)
:offset (.offset record)
:partition (.partition record)
:topic (.topic record)}))))))
(metrics/with-timer metric-consume-summary [(.topic record)
(str (.partition record))]
(handler {:value (.value record)
:key (.key record)
:offset (.offset record)
:partition (.partition record)
:topic (.topic record)})))))))
(catch WakeupException e
(when @running?
(throw e)))
Expand Down
19 changes: 18 additions & 1 deletion re/src/re/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns re.metrics
(:require [integrant.core :as ig])
(:import io.prometheus.client.hotspot.DefaultExports
(:import [io.prometheus.client Summary Summary$Timer]
io.prometheus.client.hotspot.DefaultExports
io.prometheus.client.jetty.JettyStatisticsCollector
org.eclipse.jetty.server.handler.StatisticsHandler))

Expand All @@ -14,3 +15,19 @@
(.setHandler server stats)
(-> (JettyStatisticsCollector. stats)
(.register)))))

(defn make-summary [{:keys [name help labels]}]
(-> (Summary/build)
(.name name)
(.labelNames (into-array String labels))
(.help help)
(.register)))

(defn with-timer* [^Summary metric labels f]
(let [timer (-> (.labels metric (into-array String labels))
(.startTimer))]
(f)
(.observeDuration ^Summary$Timer timer)))

(defmacro with-timer [metric labels & body]
`(with-timer* ~metric ~labels (fn [] ~@body)))
6 changes: 3 additions & 3 deletions re/src/re/oam.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
(ns re.oam
(:require [integrant.core :as ig])
(:import [com.d360.oam BCDeserializer Deserializer Inter PrimsKt Serializer]
(:import [com.d360.oam BCDeserializer Coeffect Deserializer Inter PrimsKt Serializer Snapshot]
java.io.InputStream))

(defn make-result [res]
(defn make-result [^Snapshot res]
(let [state (when (.isRunning (.getState res))
(.serialize (Serializer. (.getState res))))]
{:values (seq (.getValues res))
:coeffects (->> (.getCoeffects res)
(map (fn [c] [(.getId c) (.getDescription c)])))
(map (fn [^Coeffect c] [(.getId c) (.getDescription c)])))
:state state}))

(defn run [next data ^InputStream input-stream]
Expand Down

0 comments on commit 70c589a

Please sign in to comment.