From 94f7c4383139b1874ea4429296bf1daa94d0e4ec Mon Sep 17 00:00:00 2001 From: Michal Pisanko Date: Thu, 13 Feb 2025 13:11:56 +0100 Subject: [PATCH] separate DMLs from queries, rollback after fail, accommodate results in the UI --- src/clj/xt_play/transactions.clj | 48 ++++++++++++++++++++++++-------- src/cljs/xt_play/view.cljs | 29 ++++++++++++++++--- 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/src/clj/xt_play/transactions.clj b/src/clj/xt_play/transactions.clj index c86739b..faf92c5 100644 --- a/src/clj/xt_play/transactions.clj +++ b/src/clj/xt_play/transactions.clj @@ -19,22 +19,40 @@ ;;else txs)) +(defn- dml? [statement] + (when statement + (let [upper-st (str/upper-case statement)] + (or (str/includes? upper-st "INSERT") + (str/includes? upper-st "UPDATE") + (str/includes? upper-st "DELETE") + (str/includes? upper-st "ERASE") + (str/includes? upper-st "MERGE") + (str/includes? upper-st "PATCH"))))) + (defn- prepare-statements "Takes a batch of transactions and prepares the jdbc execution args to - be run sequentially" + be run sequentially. It groups statements by type and wraps DMLs in transactions if system time specified." [tx-batches] (for [{:keys [txs system-time]} tx-batches] (remove nil? (when txs - (concat - (when system-time - [[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]]) - (vec - (keep (fn [q] (when-not (empty? q) - (vector (str/trim q)))) - (str/split txs #"\s*;\s*"))) - (when system-time - [["COMMIT"]])))))) + (let [statements (str/split txs #"\s*;\s*") + by-type (partition-by dml? statements)] + (log/warn "by-type" by-type) + (mapcat + (fn [grp] + (let [dmls? (dml? (first grp))] + (concat + (when (and dmls? system-time) + [[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]]) + (vec + (keep (fn [q] (when-not (empty? q) + [(str/trim q)])) + grp)) + (when (and dmls? system-time) + [["COMMIT"]])))) + by-type) + ))))) (defn format-system-time [s] (when s (read-instant-date s))) @@ -93,18 +111,26 @@ (defn- run!-with-jdbc-conn [tx-batches] (xtdb/with-jdbc (fn [conn] - (let [res (mapv (fn [txs] + (let [tx-in-progress? (atom false) + res (mapv (fn [txs] (vec (mapcat (fn [statement] (log/info "beta executing statement:" statement) + (when (str/includes? (str/upper-case (first statement)) "BEGIN") + (reset! tx-in-progress? true)) (try (let [res (parse-result (xtdb/jdbc-execute! conn statement))] + (when (str/includes? (str/upper-case (first statement)) "COMMIT") + (reset! tx-in-progress? false)) (if-not (vector? (ffirst res)) [res] res)) (catch Exception ex (log/error "Exception while running statement" (ex-message ex)) + (when @tx-in-progress? + (log/warn "Rolling back transaction") + (xtdb/jdbc-execute! conn ["ROLLBACK"])) (parse-result [{:message (ex-message ex) :exception (.getClass ex) diff --git a/src/cljs/xt_play/view.cljs b/src/cljs/xt_play/view.cljs index cd35f13..fd50eee 100644 --- a/src/cljs/xt_play/view.cljs +++ b/src/cljs/xt_play/view.cljs @@ -167,7 +167,7 @@ "next.jdbc/update-count" ^{:key idx} [:p.mb-2.mx-2 "Transaction succeeded."] ^{:key idx}[display-table row tx-type idx]))) -(defn- tx-result? [row] +(defn- tx-result-or-error? [row] (let [[[[msg-k _msg] _]] row] (#{"next.jdbc/update-count" "message"} msg-k))) @@ -179,11 +179,29 @@ ;; stop editor expanding beyond the viewport "md:max-w-[48vw] lg:max-w-[49vw] ")) +(defn- prune-tx-results [results] + (if (every? tx-result-or-error? + (map vector results)) + (drop 1 (drop-last results)) + (:pruned + (reduce (fn [{:keys [tx-err] :as acc} res] + (if (tx-result-or-error? [res]) + (cond + (= 1 tx-err) (-> acc + (update :tx-err inc) + (update :pruned conj res)) + (= 2 tx-err) (assoc acc :tx-err 0) + :else (update acc :tx-err inc)) + (update acc :pruned conj res))) + {:tx-err 0 + :pruned []} + results)))) + (defn- tx-results "If there is a system-time - there was a transaction, so discard those results." [{:keys [system-time]} the-result tx-type] (if system-time - (map-indexed (partial display-tx-result tx-type) (drop 1 (drop-last the-result))) + (map-indexed (partial display-tx-result tx-type) (prune-tx-results the-result)) (map-indexed (partial display-tx-result tx-type) the-result))) (defn- results [position] @@ -205,7 +223,7 @@ (if failure [display-error failure position] (let [the-result (get results position)] - (if (tx-result? the-result) + (if (some tx-result-or-error? (map vector the-result)) [spacer-header (count results) (tx-results statements the-result tx-type)] (cond @@ -216,7 +234,10 @@ (every? empty? the-result) [spacer-header (count results) (empty-rows-message the-result)] :else - [display-table (first the-result) tx-type position]))))))])))) + (map-indexed (fn [idx sub-result] + ^{:key idx} + [display-table sub-result tx-type position]) + the-result)))))))])))) (defn- captions-row [text] (let [show-results? (rf/subscribe [::run/show-results?])]