Skip to content

Commit

Permalink
separate DMLs from queries, rollback after fail, accommodate results …
Browse files Browse the repository at this point in the history
…in the UI
  • Loading branch information
mpisanko committed Feb 13, 2025
1 parent 603563d commit 94f7c43
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
48 changes: 37 additions & 11 deletions src/clj/xt_play/transactions.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,40 @@
;;else
txs))

(defn- dml? [statement]
(when statement
(let [upper-st (str/upper-case statement)]
(or (str/includes? upper-st "INSERT")
(str/includes? upper-st "UPDATE")
(str/includes? upper-st "DELETE")
(str/includes? upper-st "ERASE")
(str/includes? upper-st "MERGE")
(str/includes? upper-st "PATCH")))))

(defn- prepare-statements
"Takes a batch of transactions and prepares the jdbc execution args to
be run sequentially"
be run sequentially. It groups statements by type and wraps DMLs in transactions if system time specified."
[tx-batches]
(for [{:keys [txs system-time]} tx-batches]
(remove nil?
(when txs
(concat
(when system-time
[[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]])
(vec
(keep (fn [q] (when-not (empty? q)
(vector (str/trim q))))
(str/split txs #"\s*;\s*")))
(when system-time
[["COMMIT"]]))))))
(let [statements (str/split txs #"\s*;\s*")
by-type (partition-by dml? statements)]
(log/warn "by-type" by-type)
(mapcat
(fn [grp]
(let [dmls? (dml? (first grp))]
(concat
(when (and dmls? system-time)
[[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]])
(vec
(keep (fn [q] (when-not (empty? q)
[(str/trim q)]))
grp))
(when (and dmls? system-time)
[["COMMIT"]]))))
by-type)
)))))

(defn format-system-time [s]
(when s (read-instant-date s)))
Expand Down Expand Up @@ -93,18 +111,26 @@
(defn- run!-with-jdbc-conn [tx-batches]
(xtdb/with-jdbc
(fn [conn]
(let [res (mapv (fn [txs]
(let [tx-in-progress? (atom false)
res (mapv (fn [txs]
(vec
(mapcat
(fn [statement]
(log/info "beta executing statement:" statement)
(when (str/includes? (str/upper-case (first statement)) "BEGIN")
(reset! tx-in-progress? true))
(try
(let [res (parse-result (xtdb/jdbc-execute! conn statement))]
(when (str/includes? (str/upper-case (first statement)) "COMMIT")
(reset! tx-in-progress? false))
(if-not (vector? (ffirst res))
[res]
res))
(catch Exception ex
(log/error "Exception while running statement" (ex-message ex))
(when @tx-in-progress?
(log/warn "Rolling back transaction")
(xtdb/jdbc-execute! conn ["ROLLBACK"]))
(parse-result
[{:message (ex-message ex)
:exception (.getClass ex)
Expand Down
29 changes: 25 additions & 4 deletions src/cljs/xt_play/view.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
"next.jdbc/update-count" ^{:key idx} [:p.mb-2.mx-2 "Transaction succeeded."]
^{:key idx}[display-table row tx-type idx])))

(defn- tx-result? [row]
(defn- tx-result-or-error? [row]
(let [[[[msg-k _msg] _]] row]
(#{"next.jdbc/update-count" "message"} msg-k)))

Expand All @@ -179,11 +179,29 @@
;; stop editor expanding beyond the viewport
"md:max-w-[48vw] lg:max-w-[49vw] "))

(defn- prune-tx-results [results]
(if (every? tx-result-or-error?
(map vector results))
(drop 1 (drop-last results))
(:pruned
(reduce (fn [{:keys [tx-err] :as acc} res]
(if (tx-result-or-error? [res])
(cond
(= 1 tx-err) (-> acc
(update :tx-err inc)
(update :pruned conj res))
(= 2 tx-err) (assoc acc :tx-err 0)
:else (update acc :tx-err inc))
(update acc :pruned conj res)))
{:tx-err 0
:pruned []}
results))))

(defn- tx-results
"If there is a system-time - there was a transaction, so discard those results."
[{:keys [system-time]} the-result tx-type]
(if system-time
(map-indexed (partial display-tx-result tx-type) (drop 1 (drop-last the-result)))
(map-indexed (partial display-tx-result tx-type) (prune-tx-results the-result))
(map-indexed (partial display-tx-result tx-type) the-result)))

(defn- results [position]
Expand All @@ -205,7 +223,7 @@
(if failure
[display-error failure position]
(let [the-result (get results position)]
(if (tx-result? the-result)
(if (some tx-result-or-error? (map vector the-result))
[spacer-header (count results)
(tx-results statements the-result tx-type)]
(cond
Expand All @@ -216,7 +234,10 @@
(every? empty? the-result) [spacer-header (count results)
(empty-rows-message the-result)]
:else
[display-table (first the-result) tx-type position]))))))]))))
(map-indexed (fn [idx sub-result]
^{:key idx}
[display-table sub-result tx-type position])
the-result)))))))]))))

(defn- captions-row [text]
(let [show-results? (rf/subscribe [::run/show-results?])]
Expand Down

0 comments on commit 94f7c43

Please sign in to comment.