minor: Handle mismatches between oplog + known ops
The current implementation handles mismatches between the op-log and
known set of operations quite badly.

It is required that the current op-log head points to a known and
explicitly provided operation.

This can cause issues in the following scenarios:

+ Working accross two different branches and applied operations from one
  branch not existing in the second.
+ Deleting applied operations because it is no longer desireable to keep
  the code around.

Another issue exists as a result of this approach; information regarding
the order in which operations were actually applied is lost. This may be
distinct from the order in which operations were provided.

When executing in the :down direction it would be safer to execute in
the reverse order to which operations were applied.

This commit resolves this issue by:
+ No longer requiring operations that have been applied to be provided at all.
+ Re-ordering operations based on the order they appear in the op-log.
julienvincent committed Feb 4, 2024
1 parent 83bbfa1 commit 12fdd76
Showing 5 changed files with 271 additions and 137 deletions.
12 changes: 6 additions & 6 deletions packages/mallard/src/k16/mallard/datastore.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

(set! *warn-on-reflection* true)

(def inst-codec
(def ^:private inst-codec
{:encode/json #(.toString ^java.time.Instant %)
:decode/json #(t/instant %)})

(def ?instant
(def ^:private ?instant
[:fn (merge inst-codec {:error/message "Should be an #instant"}) t/instant?])

(def ?Direction
[:or [:= :up] [:= :down]])
[:enum :up :down])

(def ?Operation
(def ?OpLogEntry
[:map {:closed true}
[:id :string]
[:direction ?Direction]
Expand All @@ -23,8 +23,8 @@

(def ?State
[:log {:description "A log of all migration operations that have been executed"}
[:sequential ?Operation]]])
[:log {:description "A log of all operations that have been executed"}
[:sequential ?OpLogEntry]]])

(defprotocol DataStore
"Protocol for a data store that can hold the migration log and
Expand Down
2 changes: 1 addition & 1 deletion packages/mallard/src/k16/mallard/dev.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[:migrations executor/?Migrations]]
[:migrations executor/?Operations]]

[:migrations-dir :string]]]])
Expand Down
167 changes: 101 additions & 66 deletions packages/mallard/src/k16/mallard/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

(set! *warn-on-reflection* true)

(def ?Migration
(def ?Operation
[:id :string]
[:run-up! {:error/message "should be a function with one argument"
Expand All @@ -18,63 +18,104 @@
:optional true}
[:=> [:cat :any] :any]]])

(def ?Migrations
[:sequential {:error/message "should be a sequence of migrations"}
(def ?Operations
[:sequential {:error/message "should be a sequence of operations"}

(def ?ExecuteProps
[:context {:optional true} [:maybe :any]]
[:store datastore.api/?DataStore]
[:migrations ?Migrations]
[:migrations ?Operations]
[:limit {:optional true} [:int {:min 1}]]
[:direction [:enum :up :down]]])

(defn- index-by [key-fn col]
(->> col
(map (fn [migration]
[(key-fn migration) migration]))
(into {})))

(defn- project-op-log
"Reduces over the op-log in order to determine what migration id is currently
applied. Returns the last applied migration id."
"Reduces over the op-log to project the concrete sequence of currently applied migrations ids.
The op-log contains a sequence of `:up` and `:down` operations which can be reduced down to a
sequence of only `:up` operation ids
(project-op-log [{:id \"1\" :direction :up}
{:id \"1\" :direction :down}
{:id \"2\" :direction :up}
{:id \"3\" :direction :up}])
;; => [\"2\" \"3\"]
(->> op-log
(fn [ids op]
(case (:direction op)
:up (conj ids (:id op))
:down (if (= (:id op) (last ids))
(pop ids)
(throw (ex-info "Error reprocessing op-log. A :down operation did not
follow an :up migration of the same id"
{:last-op (last ids)
:current-op (:id op)})))))

(defn- index-of [item coll]
(let [index
(->> coll
(map-indexed vector)
(some (fn [[idx val]]
(when (= val item) idx))))]
(if index index -1)))

(defn- get-index
"Determines the index of the currently applied index in the given collection of migrations"
[op-log migrations]
(if op-log
(let [last-run-id (project-op-log op-log)
index (->> migrations
(map :id)
(index-of last-run-id))]

(if (and (not (nil? last-run-id)) (= index -1))
(throw (ex-info (str "The last run migration " last-run-id " was not found in the given set of migrations") {}))

(fn [operations op]
(case (:direction op)
:up (conj operations (:id op))
:down (if (= (:id op) (last operations))
(pop operations)
(throw (ex-info (str "Error reprocessing op-log. A :down operation did not "
"follow an :up migration of the same id")
{:last-op (last operations)
:current-op (:id op)})))))

(defn- derive-active-state
"Determine what the current working state is based on the given `op-log` and set of ordered `operations`.
Returns operations in two groups, those that have been applied and those that are yet to be applied.
Operations in the `:applied` set may be ordered differently to how to are provided as the order they
appear in `op-log` takes precedence.
Operations in the `:applied` section maybe also be `nil` in the event that the operation that was applied
as according to the `op-log` is no longer present or identifiable from the provided set of `operations`."
[op-log operations]
(let [operations-idx (index-by :id operations)
applied-operation-ids (project-op-log op-log)

(fn [op-id]
(let [operation (get operations-idx op-id)]
{:id op-id
:operation operation}))

applied-idx (index-by :id applied-operations)

(->> operations
(fn [operation]
(not (get applied-idx (:id operation)))))
(mapv (fn [operation]
{:id (:id operation)
:operation operation})))]

{:applied applied-operations
:unapplied unapplied-operations}))

(defn- find-unapplied
"Return an ordered set of operations based on the current op-log state and desired `:direction`.
- If the direction is `:up` this will return the remaining set of *unapplied* migrations.
- If the direction is `:down` this will return the *applied* migrations in reverse order."
[op-log operations direction]
(let [{:keys [applied unapplied]} (derive-active-state op-log operations)]
(case direction
:up unapplied
:down (reverse applied))))

(defn- execute-one!
"Execute a single migration and return an ?Operation to be appended to the op-log."
[context migration direction]
(let [{:keys [id run-up! run-down!]} migration
"Execute a single migration and return an ?OpLogEntry to be appended to the op-log."
[context operation direction]
(let [{:keys [id run-up! run-down!]} operation
ts (t/now)]
(log/info (str "Executing migration " id " [" direction "]"))

Expand All @@ -89,44 +130,38 @@
:started_at ts
:finished_at (t/now)}))

(defn- find-unapplied
"Return an ordered set of unapplied migrations based on the current op-log state. This will
return migrations in reverse order if the direction is :down"
[op-log migrations direction]
(let [migrations' (if (= direction :down) (reverse migrations) migrations)
index (get-index op-log migrations')
;; An :up migration needs to start at the next migration whereas a :down migration
;; should start at the currently applied migration.
index' (if (= :up direction) (inc index) index)]
(subvec (vec migrations') index')))

(defn execute!
"Execute the given migrations and return the new log of operations. This will handle locking
and will mutate the datastore with the changing op-log as migrations are applied."
[{:keys [context store migrations direction limit] :as props}]
(when-not (m/validate ?ExecuteProps props)
(throw (ex-info "Migration props are invalid"
(throw (ex-info "Invalid arguments provided"
{:errors (me/humanize (m/explain ?ExecuteProps props))})))

(let [state (datastore.api/load-state store)
op-log (atom (or (:log state) []))
unapplied (find-unapplied (:log state) migrations direction)
unapplied' (if limit (take limit unapplied) unapplied)
unapplied (cond-> (find-unapplied (:log state) migrations direction)
limit ((partial take limit)))
lock (datastore.api/acquire-lock! store)]

(if (pos? (count unapplied'))
(log/info (str "Running " (count unapplied') " operations"))
(log/info "No unapplied operations to run"))
(if (pos? (count unapplied))
(log/info (str "Running " (count unapplied) " operations"))
(log/info "No operations to run"))

(doseq [{:keys [id operation]} unapplied]
(when (not operation)
(log/error (str "Cannot run :down. Operation " id " is missing"))
(throw (ex-info (str "Missing operation " id) {:operation-id id})))

(doseq [migration unapplied']
(let [op (execute-one! context migration direction)
(let [op (execute-one! context operation direction)
op-log' (swap! op-log #(conj % op))]
(datastore.api/save-state! store {:log op-log'})))

(catch Exception e
(log/error "Failed to execute operation" e)
(throw e))
(finally (datastore.api/release-lock! store lock)))
(datastore.api/release-lock! store lock)))


0 comments on commit 12fdd76

