From 12fdd76451b676e947662cc851deb14d005de726 Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Sun, 4 Feb 2024 17:36:05 +0100 Subject: [PATCH 1/2] minor: Handle mismatches between oplog + known ops The current implementation handles mismatches between the op-log and known set of operations quite badly. It is required that the current op-log head points to a known and explicitly provided operation. This can cause issues in the following scenarios: + Working accross two different branches and applied operations from one branch not existing in the second. + Deleting applied operations because it is no longer desireable to keep the code around. Another issue exists as a result of this approach; information regarding the order in which operations were actually applied is lost. This may be distinct from the order in which operations were provided. When executing in the :down direction it would be safer to execute in the reverse order to which operations were applied. This commit resolves this issue by: + No longer requiring operations that have been applied to be provided at all. + Re-ordering operations based on the order they appear in the op-log. --- .../mallard/src/k16/mallard/datastore.clj | 12 +- packages/mallard/src/k16/mallard/dev.clj | 2 +- packages/mallard/src/k16/mallard/executor.clj | 167 +++++++++++------- .../test/k16/mallard/executor_test.clj | 163 ++++++++++------- .../mallard/test/k16/mallard/op_log_test.clj | 64 +++++++ 5 files changed, 271 insertions(+), 137 deletions(-) create mode 100644 packages/mallard/test/k16/mallard/op_log_test.clj diff --git a/packages/mallard/src/k16/mallard/datastore.clj b/packages/mallard/src/k16/mallard/datastore.clj index 0e8c56c..1bd007f 100644 --- a/packages/mallard/src/k16/mallard/datastore.clj +++ b/packages/mallard/src/k16/mallard/datastore.clj @@ -4,17 +4,17 @@ (set! *warn-on-reflection* true) -(def inst-codec +(def ^:private inst-codec {:encode/json #(.toString ^java.time.Instant %) :decode/json #(t/instant %)}) -(def ?instant +(def ^:private ?instant [:fn (merge inst-codec {:error/message "Should be an #instant"}) t/instant?]) (def ?Direction - [:or [:= :up] [:= :down]]) + [:enum :up :down]) -(def ?Operation +(def ?OpLogEntry [:map {:closed true} [:id :string] [:direction ?Direction] @@ -23,8 +23,8 @@ (def ?State [:map - [:log {:description "A log of all migration operations that have been executed"} - [:sequential ?Operation]]]) + [:log {:description "A log of all operations that have been executed"} + [:sequential ?OpLogEntry]]]) (defprotocol DataStore "Protocol for a data store that can hold the migration log and diff --git a/packages/mallard/src/k16/mallard/dev.clj b/packages/mallard/src/k16/mallard/dev.clj index 317e603..882c1a7 100644 --- a/packages/mallard/src/k16/mallard/dev.clj +++ b/packages/mallard/src/k16/mallard/dev.clj @@ -17,7 +17,7 @@ [:or [:map - [:migrations executor/?Migrations]] + [:migrations executor/?Operations]] [:map [:migrations-dir :string]]]]) diff --git a/packages/mallard/src/k16/mallard/executor.clj b/packages/mallard/src/k16/mallard/executor.clj index c27829a..6161eb8 100644 --- a/packages/mallard/src/k16/mallard/executor.clj +++ b/packages/mallard/src/k16/mallard/executor.clj @@ -8,7 +8,7 @@ (set! *warn-on-reflection* true) -(def ?Migration +(def ?Operation [:map [:id :string] [:run-up! {:error/message "should be a function with one argument" @@ -18,63 +18,104 @@ :optional true} [:=> [:cat :any] :any]]]) -(def ?Migrations - [:sequential {:error/message "should be a sequence of migrations"} - ?Migration]) +(def ?Operations + [:sequential {:error/message "should be a sequence of operations"} + ?Operation]) (def ?ExecuteProps [:map [:context {:optional true} [:maybe :any]] [:store datastore.api/?DataStore] - [:migrations ?Migrations] + [:migrations ?Operations] [:limit {:optional true} [:int {:min 1}]] [:direction [:enum :up :down]]]) +(defn- index-by [key-fn col] + (->> col + (map (fn [migration] + [(key-fn migration) migration])) + (into {}))) + (defn- project-op-log - "Reduces over the op-log in order to determine what migration id is currently - applied. Returns the last applied migration id." + "Reduces over the op-log to project the concrete sequence of currently applied migrations ids. + + The op-log contains a sequence of `:up` and `:down` operations which can be reduced down to a + sequence of only `:up` operation ids + + Example: + + ```clojure + (project-op-log [{:id \"1\" :direction :up} + {:id \"1\" :direction :down} + {:id \"2\" :direction :up} + {:id \"3\" :direction :up}]) + ;; => [\"2\" \"3\"] + ```" [op-log] - (->> op-log - (reduce - (fn [ids op] - (case (:direction op) - :up (conj ids (:id op)) - :down (if (= (:id op) (last ids)) - (pop ids) - (throw (ex-info "Error reprocessing op-log. A :down operation did not - follow an :up migration of the same id" - {:last-op (last ids) - :current-op (:id op)}))))) - []) - last)) - -(defn- index-of [item coll] - (let [index - (->> coll - (map-indexed vector) - (some (fn [[idx val]] - (when (= val item) idx))))] - (if index index -1))) - -(defn- get-index - "Determines the index of the currently applied index in the given collection of migrations" - [op-log migrations] - (if op-log - (let [last-run-id (project-op-log op-log) - index (->> migrations - (map :id) - (index-of last-run-id))] - - (if (and (not (nil? last-run-id)) (= index -1)) - (throw (ex-info (str "The last run migration " last-run-id " was not found in the given set of migrations") {})) - index)) - - -1)) + (reduce + (fn [operations op] + (case (:direction op) + :up (conj operations (:id op)) + :down (if (= (:id op) (last operations)) + (pop operations) + (throw (ex-info (str "Error reprocessing op-log. A :down operation did not " + "follow an :up migration of the same id") + {:last-op (last operations) + :current-op (:id op)}))))) + [] + op-log)) + +(defn- derive-active-state + "Determine what the current working state is based on the given `op-log` and set of ordered `operations`. + + Returns operations in two groups, those that have been applied and those that are yet to be applied. + + Operations in the `:applied` set may be ordered differently to how to are provided as the order they + appear in `op-log` takes precedence. + + Operations in the `:applied` section maybe also be `nil` in the event that the operation that was applied + as according to the `op-log` is no longer present or identifiable from the provided set of `operations`." + [op-log operations] + (let [operations-idx (index-by :id operations) + applied-operation-ids (project-op-log op-log) + + applied-operations + (mapv + (fn [op-id] + (let [operation (get operations-idx op-id)] + {:id op-id + :operation operation})) + applied-operation-ids) + + applied-idx (index-by :id applied-operations) + + unapplied-operations + (->> operations + (filter + (fn [operation] + (not (get applied-idx (:id operation))))) + (mapv (fn [operation] + {:id (:id operation) + :operation operation})))] + + {:applied applied-operations + :unapplied unapplied-operations})) + +(defn- find-unapplied + "Return an ordered set of operations based on the current op-log state and desired `:direction`. + + - If the direction is `:up` this will return the remaining set of *unapplied* migrations. + - If the direction is `:down` this will return the *applied* migrations in reverse order." + [op-log operations direction] + (let [{:keys [applied unapplied]} (derive-active-state op-log operations)] + (case direction + :up unapplied + :down (reverse applied)))) (defn- execute-one! - "Execute a single migration and return an ?Operation to be appended to the op-log." - [context migration direction] - (let [{:keys [id run-up! run-down!]} migration + "Execute a single migration and return an ?OpLogEntry to be appended to the op-log." + [context operation direction] + (let [{:keys [id run-up! run-down!]} operation ts (t/now)] (log/info (str "Executing migration " id " [" direction "]")) @@ -89,44 +130,38 @@ :started_at ts :finished_at (t/now)})) -(defn- find-unapplied - "Return an ordered set of unapplied migrations based on the current op-log state. This will - return migrations in reverse order if the direction is :down" - [op-log migrations direction] - (let [migrations' (if (= direction :down) (reverse migrations) migrations) - index (get-index op-log migrations') - ;; An :up migration needs to start at the next migration whereas a :down migration - ;; should start at the currently applied migration. - index' (if (= :up direction) (inc index) index)] - (subvec (vec migrations') index'))) - (defn execute! "Execute the given migrations and return the new log of operations. This will handle locking and will mutate the datastore with the changing op-log as migrations are applied." [{:keys [context store migrations direction limit] :as props}] (when-not (m/validate ?ExecuteProps props) - (throw (ex-info "Migration props are invalid" + (throw (ex-info "Invalid arguments provided" {:errors (me/humanize (m/explain ?ExecuteProps props))}))) (let [state (datastore.api/load-state store) op-log (atom (or (:log state) [])) - unapplied (find-unapplied (:log state) migrations direction) - unapplied' (if limit (take limit unapplied) unapplied) + unapplied (cond-> (find-unapplied (:log state) migrations direction) + limit ((partial take limit))) lock (datastore.api/acquire-lock! store)] (try - (if (pos? (count unapplied')) - (log/info (str "Running " (count unapplied') " operations")) - (log/info "No unapplied operations to run")) + (if (pos? (count unapplied)) + (log/info (str "Running " (count unapplied) " operations")) + (log/info "No operations to run")) + + (doseq [{:keys [id operation]} unapplied] + (when (not operation) + (log/error (str "Cannot run :down. Operation " id " is missing")) + (throw (ex-info (str "Missing operation " id) {:operation-id id}))) - (doseq [migration unapplied'] - (let [op (execute-one! context migration direction) + (let [op (execute-one! context operation direction) op-log' (swap! op-log #(conj % op))] (datastore.api/save-state! store {:log op-log'}))) (catch Exception e (log/error "Failed to execute operation" e) (throw e)) - (finally (datastore.api/release-lock! store lock))) + (finally + (datastore.api/release-lock! store lock))) @op-log)) diff --git a/packages/mallard/test/k16/mallard/executor_test.clj b/packages/mallard/test/k16/mallard/executor_test.clj index 141c3ce..605fef4 100644 --- a/packages/mallard/test/k16/mallard/executor_test.clj +++ b/packages/mallard/test/k16/mallard/executor_test.clj @@ -1,15 +1,21 @@ (ns k16.mallard.executor-test (:require - [clojure.test :refer [deftest is testing]] + [clojure.test :refer [deftest is testing use-fixtures]] [k16.mallard.datastore :as datastore.api] [k16.mallard.executor :as executor] - [matcher-combinators.matchers :as matchers] [k16.mallard.stores.memory :as stores.memory] - [malli.util :as mu] - [matcher-combinators.test]) + [matcher-combinators.test] + [taoensso.timbre :as log] + [tick.core :as t]) (:import [clojure.lang ExceptionInfo])) +(defn- disable-logs [test] + (log/set-config! {}) + (test)) + +(use-fixtures :once disable-logs) + (def migrations [{:id "1" :run-up! (fn [_]) @@ -25,7 +31,7 @@ (testing "Executor should thrown an exception with explanation" (let [ex (try (executor/execute! {:some :props}) (catch Exception e e))] (is (= ExceptionInfo (type ex))) - (is (= "Migration props are invalid" (ex-message ex))) + (is (= "Invalid arguments provided" (ex-message ex))) (is (= {:errors {:direction ["missing required key"], :migrations ["missing required key"], :store ["missing required key"]}} @@ -38,9 +44,9 @@ (catch Exception e e))] (is (= ExceptionInfo (type ex))) - (is (= "Migration props are invalid" (ex-message ex))) + (is (= "Invalid arguments provided" (ex-message ex))) (is (= {:errors {:direction ["should be either :up or :down"], - :migrations ["should be a sequence of migrations"], + :migrations ["should be a sequence of operations"], :store ["should Implement DataStore protocol"], :limit ["should be at least 1"]}} (ex-data ex)))))) @@ -68,29 +74,38 @@ :direction :down :migrations migs})))) -(deftest executor-test - (let [store (stores.memory/create-memory-datastore)] - (testing "Executing a single migration" - (let [op-log (executor/execute! {:store store - :migrations migrations - :direction :up - :limit 1})] +(deftest single-execution-test + (let [store (stores.memory/create-memory-datastore) + op-log (executor/execute! {:store store + :migrations migrations + :direction :up + :limit 1})] - (is (= 1 (count op-log))) - (is (match? [{:id "1" - :direction :up - :started_at inst? - :finished_at inst?}] - op-log)) + (is (= 1 (count op-log))) + (is (match? [{:id "1" + :direction :up + :started_at inst? + :finished_at inst?}] + op-log)) - (is (= {:log op-log} (datastore.api/load-state store))))) + (is (= {:log op-log} (datastore.api/load-state store))))) - (testing "Executing the remaining migrations" - (let [op-log (executor/execute! {:store store - :migrations migrations - :direction :up})] - (is (= 3 (count op-log))) - (is (= ["1" "2" "3"] (map :id op-log))))) +(deftest multi-execution-test + (let [store (stores.memory/create-memory-datastore) + op-log (executor/execute! {:store store + :migrations migrations + :direction :up})] + + (is (= 3 (count op-log))) + (is (= ["1" "2" "3"] (map :id op-log))))) + +(deftest down-migration-test + (let [store (stores.memory/create-memory-datastore)] + + (datastore.api/save-state! store {:log [{:id "1" + :direction :up + :started_at (t/now) + :finished_at (t/now)}]}) (testing "Undoing the last migration" (let [op-log (executor/execute! {:store store @@ -98,50 +113,70 @@ :direction :down :limit 1})] - (is (= 4 (count op-log))) + (is (= 2 (count op-log))) (is (= [{:id "1" :direction :up} - {:id "2" :direction :up} - {:id "3" :direction :up} - {:id "3" :direction :down}] - (map #(select-keys % [:id :direction]) op-log))))) + {:id "1" :direction :down}] + (map #(select-keys % [:id :direction]) op-log))))))) - (testing "Rerun the rolled back migration" +(deftest rerun-down-migration-test + (let [store (stores.memory/create-memory-datastore)] + + (datastore.api/save-state! store {:log [{:id "1" + :direction :up + :started_at (t/now) + :finished_at (t/now)} + {:id "1" + :direction :down + :started_at (t/now) + :finished_at (t/now)}]}) + + (testing "Rerunning the last migration" (let [op-log (executor/execute! {:store store :migrations migrations - :direction :up})] + :direction :up + :limit 1})] - (is (= 5 (count op-log))) + (is (= 3 (count op-log))) (is (= [{:id "1" :direction :up} - {:id "2" :direction :up} - {:id "3" :direction :up} - {:id "3" :direction :down} - {:id "3" :direction :up}] - (map #(select-keys % [:id :direction]) op-log))))) - - (testing "Failure state for missing ref migration" - (let [op-log (try (executor/execute! {:store store - :migrations (pop migrations) - :direction :up}) - (catch Exception _ false))] + {:id "1" :direction :down} + {:id "1" :direction :up}] + (map #(select-keys % [:id :direction]) op-log))))))) - (is (not op-log))))) +(deftest run-up-out-of-order-test + (let [store (stores.memory/create-memory-datastore)] + (datastore.api/save-state! store {:log [{:id "1" + :direction :up + :started_at (t/now) + :finished_at (t/now)} + {:id "3" + :direction :up + :started_at (t/now) + :finished_at (t/now)}]}) + + (let [op-log (executor/execute! {:store store + :migrations migrations + :direction :up + :limit 1})] + + (is (= 3 (count op-log))) + (is (= [{:id "1" :direction :up} + {:id "3" :direction :up} + {:id "2" :direction :up}] + (map #(select-keys % [:id :direction]) op-log)))))) + +(deftest run-down-missing-migration-test (let [store (stores.memory/create-memory-datastore)] - (testing "Rolling back a single migration" - (let [migrations' (take 1 migrations) - _ (executor/execute! {:store store - :migrations migrations' - :direction :up}) - _ (executor/execute! {:store store - :migrations migrations' - :direction :down - :limit 1}) - op-log (executor/execute! {:store store - :migrations migrations' - :direction :up - :limit 1})] - (is (= 3 (count op-log))) - (is (match? {:id "1" - :direction :up} - (last op-log))))))) + (datastore.api/save-state! store {:log [{:id "1" + :direction :up + :started_at (t/now) + :finished_at (t/now)}]}) + + (let [ex (try (executor/execute! {:store store + :migrations [] + :direction :down + :limit 1}) + (catch Exception e e))] + + (is (instance? Exception ex))))) diff --git a/packages/mallard/test/k16/mallard/op_log_test.clj b/packages/mallard/test/k16/mallard/op_log_test.clj new file mode 100644 index 0000000..874500b --- /dev/null +++ b/packages/mallard/test/k16/mallard/op_log_test.clj @@ -0,0 +1,64 @@ +(ns k16.mallard.op-log-test + (:require + [clojure.test :refer [deftest is]] + [k16.mallard.executor :as executor] + [matcher-combinators.test])) + +(def migrations + [{:id "1" + :run-up! (fn [_]) + :run-down! (fn [_])} + {:id "2" + :run-up! (fn [_]) + :run-down! (fn [_])} + + {:id "3" + :run-up! (fn [_]) + :run-down! (fn [_])}]) + +(deftest empty-op-log + (is (= {:unapplied [{:id "1" :operation (first migrations)} + {:id "2" :operation (second migrations)} + {:id "3" :operation (nth migrations 2)}] + :applied []} + (#'k16.mallard.executor/derive-active-state [] migrations)))) + +(deftest single-applied + (is (= {:applied [{:id "1" :operation (first migrations)}] + :unapplied [{:id "2" :operation (second migrations)} + {:id "3" :operation (nth migrations 2)}]} + (#'k16.mallard.executor/derive-active-state [{:id "1" :direction :up}] migrations)))) + +(deftest out-of-order-applied + (is (= {:applied [{:id "2" :operation (second migrations)}] + :unapplied [{:id "1" :operation (first migrations)} + {:id "3" :operation (nth migrations 2)}]} + (#'k16.mallard.executor/derive-active-state [{:id "2" :direction :up}] migrations)))) + +(deftest missing-applied-migration + (is (= {:applied [{:id "missing" :operation nil}] + :unapplied [{:id "1" :operation (nth migrations 0)} + {:id "2" :operation (nth migrations 1)} + {:id "3" :operation (nth migrations 2)}]} + (#'k16.mallard.executor/derive-active-state [{:id "missing" :direction :up}] migrations)))) + +(deftest complex-projection + (is (= {:applied [{:id "1" :operation (nth migrations 0)} + {:id "2" :operation (nth migrations 1)}] + + :unapplied [{:id "3" :operation (nth migrations 2)}]} + (#'k16.mallard.executor/derive-active-state [{:id "1" :direction :up} + {:id "2" :direction :up} + {:id "2" :direction :down} + {:id "2" :direction :up} + + {:id "3" :direction :up} + {:id "3" :direction :down}] migrations)))) + +(deftest corrupted-state + (let [ex (try + (#'k16.mallard.executor/derive-active-state [{:id "1" :direction :up} + {:id "missing" :direction :down}] migrations) + nil + (catch Exception e e))] + (is (instance? Exception ex)))) From 1d933386b96c0fda47890613910ff0f3b35a31fd Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Sun, 4 Feb 2024 17:52:42 +0100 Subject: [PATCH 2/2] Update README.md --- README.md | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 6abf346..3cf6269 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ This is a Clojure migrations framework which aims to provide a generalized and versatile mechanism for running arbitrary migration operations. It has the following goals: -+ Allow running arbitrary migrations as code. -+ Allow storing migration state anywhere, including in a separate store to the data being migrated. -+ Handle locking to prevent multiple migration operations occurring at the same time. -+ Keep a log of all migration operations that have been ever been executed, including rollbacks, and record their date and timing information. -+ Allow 'compressing' migrations or cleaning up by removing previously applied migrations. ++ Allow running arbitrary operations as code. ++ Allow storing migration state anywhere including in a store separate to the data being operated on. ++ Have a mechanism for locking to prevent multiple operations from occurring at the same time. ++ Keep a log of all operations that have ever been executed - including rollbacks - and record their date and timing information. ++ Allow deleting operations after they have been applied. ## Quick start @@ -96,11 +96,11 @@ The executor simply finds all unapplied migrations for the desired 'direction' a ## Compression/Cleanup -Migrations are once-off operations applied at a point in time, while your application itself changes through time. This makes keeping old migration code around often infeasible as it will often be referencing code in your application which has changed or been removed, or it is applying to data structures which have completely changed or also been removed. +Migrations are generally once-off operations applied at a point in time, while your application itself changes through time. This makes keeping old migration code around often infeasible as it will often be referencing code in your application which has changed or been removed, or it is applying to data structures which have completely changed or also been removed. Some migrations scripts, such as index creation/modification, we _do_ want to keep around as they are useful to apply when setting up a database (either during development on a new machine, or when your application is often deployed to fresh infrastructure). However these kinds of migrations we might want to 'compress' into a single 'init' or 'seed' migration. For example, when indexes are added/removed/changed this would need to happen in a new migration - but once this has been applied everywhere it might make sense to 'merge' the code back into an original 'init' migration. -Both types of cleanup are indirectly supported by this executor so long as you keep in mind how it operates. The executor always needs the last applied migration (as derived from the op-log) to be present in the provided set of migrations. This is what is used as a reference point for where it should resume. So long as this referential migration is not dropped, any migrations before or after can be changed as you wish. +Both types of cleanup are indirectly supported by this executor. Any previously applied operations can be deleted and the executor should be able to figure out from where to continue. The only thing to be careful with is renaming operations - **do not rename an operation if you do not want it to be applied again**. As an example the following state would execute just fine: @@ -119,8 +119,6 @@ As an example the following state would execute just fine: (find-unapplied op-log migrations :down) ;; => [{:id "1"}] ``` -Notice the migration with id `"2"` is missing from the provided set, simulating it having been cleaned up. Once the op-log has moved on to `"4"`, the migration with id `"3"` could also theoretically be removed. - ## GraalVM Native-Image Compatibility This tool is fully compatible with graalvm native-image. To properly structure your project you need to make sure your migrations are analysed at build time.