Skip to content

Commit

Permalink
Merge pull request #1 from kepler16/jv/better-oplog-processing
Browse files Browse the repository at this point in the history
Handle mismatches between oplog + known ops
  • Loading branch information
julienvincent authored Feb 4, 2024
2 parents 83bbfa1 + 1d93338 commit 1a2fc93
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 146 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

This is a Clojure migrations framework which aims to provide a generalized and versatile mechanism for running arbitrary migration operations. It has the following goals:

+ Allow running arbitrary migrations as code.
+ Allow storing migration state anywhere, including in a separate store to the data being migrated.
+ Handle locking to prevent multiple migration operations occurring at the same time.
+ Keep a log of all migration operations that have been ever been executed, including rollbacks, and record their date and timing information.
+ Allow 'compressing' migrations or cleaning up by removing previously applied migrations.
+ Allow running arbitrary operations as code.
+ Allow storing migration state anywhere including in a store separate to the data being operated on.
+ Have a mechanism for locking to prevent multiple operations from occurring at the same time.
+ Keep a log of all operations that have ever been executed - including rollbacks - and record their date and timing information.
+ Allow deleting operations after they have been applied.

## Quick start

Expand Down Expand Up @@ -96,11 +96,11 @@ The executor simply finds all unapplied migrations for the desired 'direction' a

## Compression/Cleanup

Migrations are once-off operations applied at a point in time, while your application itself changes through time. This makes keeping old migration code around often infeasible as it will often be referencing code in your application which has changed or been removed, or it is applying to data structures which have completely changed or also been removed.
Migrations are generally once-off operations applied at a point in time, while your application itself changes through time. This makes keeping old migration code around often infeasible as it will often be referencing code in your application which has changed or been removed, or it is applying to data structures which have completely changed or also been removed.

Some migrations scripts, such as index creation/modification, we _do_ want to keep around as they are useful to apply when setting up a database (either during development on a new machine, or when your application is often deployed to fresh infrastructure). However these kinds of migrations we might want to 'compress' into a single 'init' or 'seed' migration. For example, when indexes are added/removed/changed this would need to happen in a new migration - but once this has been applied everywhere it might make sense to 'merge' the code back into an original 'init' migration.

Both types of cleanup are indirectly supported by this executor so long as you keep in mind how it operates. The executor always needs the last applied migration (as derived from the op-log) to be present in the provided set of migrations. This is what is used as a reference point for where it should resume. So long as this referential migration is not dropped, any migrations before or after can be changed as you wish.
Both types of cleanup are indirectly supported by this executor. Any previously applied operations can be deleted and the executor should be able to figure out from where to continue. The only thing to be careful with is renaming operations - **do not rename an operation if you do not want it to be applied again**.

As an example the following state would execute just fine:

Expand All @@ -119,8 +119,6 @@ As an example the following state would execute just fine:
(find-unapplied op-log migrations :down) ;; => [{:id "1"}]
```

Notice the migration with id `"2"` is missing from the provided set, simulating it having been cleaned up. Once the op-log has moved on to `"4"`, the migration with id `"3"` could also theoretically be removed.

## GraalVM Native-Image Compatibility

This tool is fully compatible with graalvm native-image. To properly structure your project you need to make sure your migrations are analysed at build time.
Expand Down
12 changes: 6 additions & 6 deletions packages/mallard/src/k16/mallard/datastore.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

(set! *warn-on-reflection* true)

(def inst-codec
(def ^:private inst-codec
{:encode/json #(.toString ^java.time.Instant %)
:decode/json #(t/instant %)})

(def ?instant
(def ^:private ?instant
[:fn (merge inst-codec {:error/message "Should be an #instant"}) t/instant?])

(def ?Direction
[:or [:= :up] [:= :down]])
[:enum :up :down])

(def ?Operation
(def ?OpLogEntry
[:map {:closed true}
[:id :string]
[:direction ?Direction]
Expand All @@ -23,8 +23,8 @@

(def ?State
[:map
[:log {:description "A log of all migration operations that have been executed"}
[:sequential ?Operation]]])
[:log {:description "A log of all operations that have been executed"}
[:sequential ?OpLogEntry]]])

(defprotocol DataStore
"Protocol for a data store that can hold the migration log and
Expand Down
2 changes: 1 addition & 1 deletion packages/mallard/src/k16/mallard/dev.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[:or
[:map
[:migrations executor/?Migrations]]
[:migrations executor/?Operations]]

[:map
[:migrations-dir :string]]]])
Expand Down
167 changes: 101 additions & 66 deletions packages/mallard/src/k16/mallard/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

(set! *warn-on-reflection* true)

(def ?Migration
(def ?Operation
[:map
[:id :string]
[:run-up! {:error/message "should be a function with one argument"
Expand All @@ -18,63 +18,104 @@
:optional true}
[:=> [:cat :any] :any]]])

(def ?Migrations
[:sequential {:error/message "should be a sequence of migrations"}
?Migration])
(def ?Operations
[:sequential {:error/message "should be a sequence of operations"}
?Operation])

(def ?ExecuteProps
[:map
[:context {:optional true} [:maybe :any]]
[:store datastore.api/?DataStore]
[:migrations ?Migrations]
[:migrations ?Operations]
[:limit {:optional true} [:int {:min 1}]]
[:direction [:enum :up :down]]])

(defn- index-by [key-fn col]
(->> col
(map (fn [migration]
[(key-fn migration) migration]))
(into {})))

(defn- project-op-log
"Reduces over the op-log in order to determine what migration id is currently
applied. Returns the last applied migration id."
"Reduces over the op-log to project the concrete sequence of currently applied migrations ids.
The op-log contains a sequence of `:up` and `:down` operations which can be reduced down to a
sequence of only `:up` operation ids
Example:
```clojure
(project-op-log [{:id \"1\" :direction :up}
{:id \"1\" :direction :down}
{:id \"2\" :direction :up}
{:id \"3\" :direction :up}])
;; => [\"2\" \"3\"]
```"
[op-log]
(->> op-log
(reduce
(fn [ids op]
(case (:direction op)
:up (conj ids (:id op))
:down (if (= (:id op) (last ids))
(pop ids)
(throw (ex-info "Error reprocessing op-log. A :down operation did not
follow an :up migration of the same id"
{:last-op (last ids)
:current-op (:id op)})))))
[])
last))

(defn- index-of [item coll]
(let [index
(->> coll
(map-indexed vector)
(some (fn [[idx val]]
(when (= val item) idx))))]
(if index index -1)))

(defn- get-index
"Determines the index of the currently applied index in the given collection of migrations"
[op-log migrations]
(if op-log
(let [last-run-id (project-op-log op-log)
index (->> migrations
(map :id)
(index-of last-run-id))]

(if (and (not (nil? last-run-id)) (= index -1))
(throw (ex-info (str "The last run migration " last-run-id " was not found in the given set of migrations") {}))
index))

-1))
(reduce
(fn [operations op]
(case (:direction op)
:up (conj operations (:id op))
:down (if (= (:id op) (last operations))
(pop operations)
(throw (ex-info (str "Error reprocessing op-log. A :down operation did not "
"follow an :up migration of the same id")
{:last-op (last operations)
:current-op (:id op)})))))
[]
op-log))

(defn- derive-active-state
"Determine what the current working state is based on the given `op-log` and set of ordered `operations`.
Returns operations in two groups, those that have been applied and those that are yet to be applied.
Operations in the `:applied` set may be ordered differently to how to are provided as the order they
appear in `op-log` takes precedence.
Operations in the `:applied` section maybe also be `nil` in the event that the operation that was applied
as according to the `op-log` is no longer present or identifiable from the provided set of `operations`."
[op-log operations]
(let [operations-idx (index-by :id operations)
applied-operation-ids (project-op-log op-log)

applied-operations
(mapv
(fn [op-id]
(let [operation (get operations-idx op-id)]
{:id op-id
:operation operation}))
applied-operation-ids)

applied-idx (index-by :id applied-operations)

unapplied-operations
(->> operations
(filter
(fn [operation]
(not (get applied-idx (:id operation)))))
(mapv (fn [operation]
{:id (:id operation)
:operation operation})))]

{:applied applied-operations
:unapplied unapplied-operations}))

(defn- find-unapplied
"Return an ordered set of operations based on the current op-log state and desired `:direction`.
- If the direction is `:up` this will return the remaining set of *unapplied* migrations.
- If the direction is `:down` this will return the *applied* migrations in reverse order."
[op-log operations direction]
(let [{:keys [applied unapplied]} (derive-active-state op-log operations)]
(case direction
:up unapplied
:down (reverse applied))))

(defn- execute-one!
"Execute a single migration and return an ?Operation to be appended to the op-log."
[context migration direction]
(let [{:keys [id run-up! run-down!]} migration
"Execute a single migration and return an ?OpLogEntry to be appended to the op-log."
[context operation direction]
(let [{:keys [id run-up! run-down!]} operation
ts (t/now)]
(log/info (str "Executing migration " id " [" direction "]"))

Expand All @@ -89,44 +130,38 @@
:started_at ts
:finished_at (t/now)}))

(defn- find-unapplied
"Return an ordered set of unapplied migrations based on the current op-log state. This will
return migrations in reverse order if the direction is :down"
[op-log migrations direction]
(let [migrations' (if (= direction :down) (reverse migrations) migrations)
index (get-index op-log migrations')
;; An :up migration needs to start at the next migration whereas a :down migration
;; should start at the currently applied migration.
index' (if (= :up direction) (inc index) index)]
(subvec (vec migrations') index')))

(defn execute!
"Execute the given migrations and return the new log of operations. This will handle locking
and will mutate the datastore with the changing op-log as migrations are applied."
[{:keys [context store migrations direction limit] :as props}]
(when-not (m/validate ?ExecuteProps props)
(throw (ex-info "Migration props are invalid"
(throw (ex-info "Invalid arguments provided"
{:errors (me/humanize (m/explain ?ExecuteProps props))})))

(let [state (datastore.api/load-state store)
op-log (atom (or (:log state) []))
unapplied (find-unapplied (:log state) migrations direction)
unapplied' (if limit (take limit unapplied) unapplied)
unapplied (cond-> (find-unapplied (:log state) migrations direction)
limit ((partial take limit)))
lock (datastore.api/acquire-lock! store)]

(try
(if (pos? (count unapplied'))
(log/info (str "Running " (count unapplied') " operations"))
(log/info "No unapplied operations to run"))
(if (pos? (count unapplied))
(log/info (str "Running " (count unapplied) " operations"))
(log/info "No operations to run"))

(doseq [{:keys [id operation]} unapplied]
(when (not operation)
(log/error (str "Cannot run :down. Operation " id " is missing"))
(throw (ex-info (str "Missing operation " id) {:operation-id id})))

(doseq [migration unapplied']
(let [op (execute-one! context migration direction)
(let [op (execute-one! context operation direction)
op-log' (swap! op-log #(conj % op))]
(datastore.api/save-state! store {:log op-log'})))

(catch Exception e
(log/error "Failed to execute operation" e)
(throw e))
(finally (datastore.api/release-lock! store lock)))
(finally
(datastore.api/release-lock! store lock)))

@op-log))
Loading

0 comments on commit 1a2fc93

Please sign in to comment.