Skip to content

Commit

Permalink
fix a potential lifetime bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Dec 9, 2021
1 parent 7d9c74d commit cb4fa14
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions src/jepsen/hstream.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@

(def gen-adds (->> (range)
(map (fn [x] {:type :invoke, :f :add, :value x}))))
(def gen-read (gen/once {:type :invoke, :f :read, :value nil}))
(defn gen-read [id] (gen/once {:type :invoke, :f :read, :value nil, :consumer-id id}))

(defrecord Client [opts test-streams test-subscription-stream]
(defrecord Client [opts test-streams test-subscription-stream subscription-results]
client/Client

(open! [this _ node]
Expand Down Expand Up @@ -105,8 +105,7 @@
producer (get @test-producers (:stream this))]
(.get (write-data producer test-data))
(assoc op :type :ok :stream (:stream this))))
:read (dosync
(let [subscription-results (atom [])]
:read (let [subscription-result (get subscription-results (:consumer-id op))]
(try+ (subscribe (:client this)
test-subscription-id
test-subscription-stream
Expand All @@ -115,12 +114,12 @@
(catch Exception e nil))
(consume (:client this)
test-subscription-id
(gen-collect-value-callback subscription-results))
(gen-collect-value-callback subscription-result))
(Thread/sleep (* 1000 (:fetch-wait-time opts)))
(assoc op
:type :ok
:value @subscription-results
:stream test-subscription-stream))))
:value @subscription-result
:stream test-subscription-stream)))
(catch java.net.SocketTimeoutException e
(assoc op :type :fail :error :timeout))
(catch Exception e
Expand All @@ -145,14 +144,15 @@
test-streams (into [] (repeatedly (:max-streams opts)
#(rs/string test-stream-name-length)))
; "The stream of the only subscription."
test-subscription-stream (rand-nth test-streams)]
test-subscription-stream (rand-nth test-streams)
subscription-results (into [] (repeatedly (:consumer-number opts) #(ref [])))]

(merge tests/noop-test
opts
{:pure-generators true
:name "HStream"
:db (db "0.6.0")
:client (Client. opts test-streams test-subscription-stream)
:client (Client. opts test-streams test-subscription-stream subscription-results)
:nemesis nemesis/noop ;(nemesis/clock-scrambler 86400);(nemesis/hammer-time "hstream-server")
:ssh {:dummy? (:dummy opts)}
:checker (checker/compose
Expand All @@ -168,8 +168,7 @@
(->> gen-adds
(gen/stagger (/ (:rate opts)))
(gen/time-limit (:write-time opts)))
(gen/repeat (:consumer-number opts)
(gen/clients gen-read))
(map gen-read (range 0 (:consumer-number opts)))
(gen/sleep (+ 10 (:fetch-wait-time opts))))
(->> (->> [(gen/sleep 5)
{:type :info :f :start}
Expand Down

0 comments on commit cb4fa14

Please sign in to comment.