Skip to content

Commit

Permalink
Async transactions (#15)
Browse files Browse the repository at this point in the history
- [x] Implement with-buffer
- [x] Implement put!
- [x] Implement close! async stuff
- [x] Update readme on how to use with-buffer in conjunction with
cedric/Csv implementation
  • Loading branch information
verberktstan authored Feb 27, 2024
1 parent 7efd1b9 commit c9f30da
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

strategy:
matrix:
os: [ubuntu-latest, macOS-latest]
os: [ubuntu-latest]

runs-on: ${{ matrix.os }}

Expand Down
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ You can use swark.cedric for the persistence part, and swark.authom for the auth
```
(ns my.ns
(:require [swark.authom :as authom]
[swark.cedric :as cedric])
[swark.cedric :as cedric]
[swark.core :as swark])
(:import [swark.cedric Csv]))
(def DB (cedric/Csv. "db.csv"))
(def DB (cedric/Csv. "/tmp/db.csv"))
(def PROPS (merge authom/CEDRIC-PROPS {:primary-key :user/id}))
```

Expand All @@ -82,13 +83,21 @@ You can use swark.cedric for the persistence part, and swark.authom for the auth
(-> user (authom/check :user/id "pass" "SECRET") assert))
```

5. Since the csv file might change in the mean while, it is advised to execute all db actions as an asynchronous transaction. You can make use of `cedric/make-connection` like so:
```
(let [{::cedric/keys [transact! close!]} (-> "/tmp/db.csv" cedric/Csv. cedric/make-connection)]
(transact! cedric/upsert-items {:primary-key :id} [{:test "data"} {:more "testdata" :something 123}]) ; Returns the upserted items.
(transact! cedric/read-items {}) ; Returns all items read.
(close!)) ; Don't forget to close the async connection.
```

## Tests

Run the tests with `clojure -X:test/run`

## Development

Start a repl simply by running `clj -M:repl/basic` command in your terminal.
Start a repl simply by running `clojure -M:repl/basic` command in your terminal.
You can connect your editor via nrepl afterwards, e.g. from emacs; `cider-connect-clj`
Or create a repl from your editor, e.g. from emacs; `cider-jack-in-clj`

Expand Down
7 changes: 4 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{:deps
;; Clojure standard library
{org.clojure/clojure {:mvn/version "1.11.0"}
org.clojure/data.csv {:mvn/version "1.0.1"}} ;; NOTE: For testing CSV input/output only..
{org.clojure/clojure {:mvn/version "1.11.0"}
org.clojure/core.async {:mvn/version "1.6.681"}
org.clojure/data.csv {:mvn/version "1.0.1"}} ;; NOTE: For testing CSV input/output only..
:aliases
{ :repl/basic
{:repl/basic
{:extra-deps {nrepl/nrepl {:mvn/version "1.0.0"}
cider/cider-nrepl {:mvn/version "0.30.0"}}
:main-opts ["-m" "nrepl.cmdline"
Expand Down
23 changes: 21 additions & 2 deletions src/swark/cedric.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
[clojure.set :as set]
[clojure.data :as data]
#?(:cljs [goog.date :as gd])
#?(:clj [clojure.string :as str])
#?(:clj [clojure.java.io :as io])
[clojure.data.csv :as csv])
#?(:clj (:import [java.time Instant])))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CEDRIC - the Cedric Event DRIven datapersistence Companion
;; Store associatve data (maps) as rows in an append-only EAV database.

;; TODO: Add headers to csv db file
;; TODO: Test in cljs as well
;; TODO: Move back in time by filtering on txd (transaction's utc date)
;; TODO: Add some memoization with swark.core/memoire
Expand Down Expand Up @@ -280,3 +279,23 @@
(write-csv! filename new-rows)
{::archived (count new-rows)}))))

(defn make-connection
"Returns a map with ::transact! and ::close! functions."
[db]
(let [conn (swark/with-buffer db)]
{::transact! (partial swark/put! conn)
::close! #(swark/close! conn)}))

(comment
(let [connection (-> "/tmp/testdb123.csv" Csv. make-connection)]
(def transact! (::transact! connection))
(def close! (::close! connection)))

;; Upsert items via the transact! function
(transact! upsert-items {:primary-key :user/id} [{:user/name "Arnold"} {:user/name "Naomi"} {:user/name "Theodor"}])
;; Read (all) items via the transact! function
(transact! read-items {})
;; Archive an item via the transact! function
(transact! archive-items {:primary-key :user/id} [{:user/id "4"}])
;; Close the connection via the close! function
(close!))
51 changes: 48 additions & 3 deletions src/swark/core.cljc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns swark.core
(:require [clojure.string :as str]))
(:require [clojure.core.async :as a]
[clojure.string :as str]))

;; SWiss ARmy Knife - Your everyday clojure toolbelt!
;; Copyright 2024 - Stan Verberkt ([email protected])
Expand Down Expand Up @@ -71,7 +72,7 @@
(filter-keys map (comp predicate namespace)))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Try and return nil when something is thrown
;; Try and catch

(defn jab
{:added "0.1.3"
Expand All @@ -84,8 +85,9 @@
(apply f args)
#?(:cljs (catch :default _ nil) :clj (catch Throwable _ nil))))

;; TODO: Add tests
(defn with-retries
{:added "0.1.41" ; NOTE: To be released!
{:added "0.1.41"
:arglist '([n f & args])
:doc "Returns the result of (apply f args) after running it n times. When
something is thrown on the last try, returns the throwable map."}
Expand Down Expand Up @@ -209,3 +211,46 @@
(summ [10 12])
(summ :flush) ; Flush the complete cache (for all inputs)
)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Async stuff

;; TODO: Support channel transducers and ex-handler as well
(defn with-buffer
{:added "0.1.41"
:arglist '([x])
:doc "Starts a go-loop and returns a map with ::in and ::out async channels.
Input to ::in chan is expected to be [f & args] or [::closed!]. In the latter
case, the go-loop will stop. In the first case, (apply f x args) will be called
and the result is put on ::out chan."}
[x]
(let [in-chan (a/chan (a/sliding-buffer 99))
out-chan (a/chan (a/dropping-buffer 99))]
(a/go-loop [[f & args] (a/<! in-chan)]
(when-not (some-> f #{::closed!}) ; NOTE: Stop the go-loop in this case
(if-let [result (when f (apply f x args))]
(a/>! out-chan result)
(a/>! out-chan ::nil))
(recur (a/<! in-chan))))
{::in in-chan
::out out-chan}))

(defn put!
{:added "0.1.41"
:arglist '([buffered & args])
:doc "Put args on the ::in chan and blocks until something is returned via
::out chan. Returns the returned value."}
[{::keys [in out]} & args]
(assert in)
(assert out)
(a/go (a/>! in (or args [::closed!]))) ; NOTE: Close the go-loop when nil args
(a/<!! out))

(defn close!
{:added "0.1.41"
:arglist '([buffered])
:doc "Stops the underlying go-loop and closes all channels. Returns nil."}
[buffered]
(put! buffered nil) ; NOTE: Close the running go-loop
(let [channels (juxt ::in ::out)]
(run! a/close! (channels buffered))))
11 changes: 7 additions & 4 deletions test/swark/cedric_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
(doseq [make-db [#(Mem. (atom nil))
#(Csv. (str "/tmp/testdb-" (swark/unid) ".csv"))]]
(let [db (make-db)
db-conn (swark/with-buffer db)
transact! (partial swark/put! db-conn)
props {:primary-key :person/id}
the-names (some-names 25)
persons (map (partial assoc nil :person/name) the-names)
result (sut/upsert-items db props persons)]
result (transact! sut/upsert-items props persons)]
;; result
(testing "upsert-items"
(testing "returns the upserted items"
Expand All @@ -68,15 +70,16 @@
shuffle
(take 3)
(map #(assoc %2 :person/name %1) new-names))
updated (sut/upsert-items db props persons)]
updated (transact! sut/upsert-items props persons)]
(testing "returns the updated items"
(is (-> updated count #{3}))
(is (->> updated (map :person/name) set (= (set new-names))))))
(let [persons (->> result
shuffle
(take 5))
archived (sut/archive-items db props persons)]
archived (transact! sut/archive-items props persons)]
(testing "returns the number of ::archived items"
(is (= {::sut/archived 5} archived))))
(testing "returns all the items"
(is (-> db (sut/read-items {}) count #{20}))))))
(is (-> (transact! sut/read-items {}) count #{20})))
(swark/close! db-conn))))
7 changes: 7 additions & 0 deletions test/swark/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,10 @@
#"Spec should be a map!" nil {:id -1}
#"All vals in spec should implement IFn" {:id "not IFn"} {:id -1} ; Spec
#"Input should be a map!" {:id nat-int?} false)))

(t/deftest with-buffer-put-close
(let [{::sut/keys [in out] :as m} (sut/with-buffer {:test "map"})]
(t/is (and in out))
(t/is (= {:test "map" :key :value} (sut/put! m assoc :key :value)))
(doto m sut/close!) ; Close it, after this every eval of put! returns nil
(t/is (nil? (sut/put! m assoc :another "entry")))))

0 comments on commit c9f30da

Please sign in to comment.