Skip to content

Commit

Permalink
Persist dependencies so that Workflow#configure is not required on …
Browse files Browse the repository at this point in the history
…load (#118)

Previously, `Workflow#configure` was called every time a Workflow was
instantiated. This could create broken dependency graphs, e.g. if a
configure method sets up job dependencies based on some mutable data like
a timestamp argument or a database value.

Instead, serialize workflow dependencies along with the rest of a workflow's
data and reload it via `Client#workflow_from_hash` and tell `Workflow#initialize`
not to run setup/configure.

Note that for backwards compatibility with workflows persisted before this
change, the setup method will still be called if dependencies in the
deserialized hash are nil.
  • Loading branch information
noahfpf authored Jul 27, 2024
1 parent 68bc23f commit bf3cc61
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 21 deletions.
27 changes: 16 additions & 11 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,25 @@ def find_job_by_klass(workflow_id, job_name)
end

def workflow_from_hash(hash, nodes = [])
flow = hash[:klass].constantize.new(
*hash[:arguments],
**hash[:kwargs],
globals: hash[:globals]
)
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]

flow.jobs = nodes.map do |node|
jobs = nodes.map do |node|
Gush::Job.from_hash(node)
end

flow
internal_state = {
persisted: true,
jobs: jobs,
# For backwards compatibility, setup can only be skipped for a persisted
# workflow if there is no data missing from the persistence.
# 2024-07-23: dependencies added to persistence
skip_setup: !hash[:dependencies].nil?
}.merge(hash)

hash[:klass].constantize.new(
*hash[:arguments],
**hash[:kwargs],
globals: hash[:globals],
internal_state: internal_state
)
end

def redis
Expand Down
18 changes: 10 additions & 8 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

module Gush
class Workflow
attr_accessor :id, :jobs, :stopped, :persisted, :arguments, :kwargs, :globals
attr_accessor :id, :jobs, :dependencies, :stopped, :persisted, :arguments, :kwargs, :globals

def initialize(*args, globals: nil, **kwargs)
@id = id
@jobs = []
@dependencies = []
@persisted = false
@stopped = false
def initialize(*args, globals: nil, internal_state: {}, **kwargs)
@arguments = args
@kwargs = kwargs
@globals = globals || {}

setup
@id = internal_state[:id] || id
@jobs = internal_state[:jobs] || []
@dependencies = internal_state[:dependencies] || []
@persisted = internal_state[:persisted] || false
@stopped = internal_state[:stopped] || false

setup unless internal_state[:skip_setup]
end

def self.find(id)
Expand Down Expand Up @@ -179,6 +180,7 @@ def to_hash
arguments: @arguments,
kwargs: @kwargs,
globals: @globals,
dependencies: @dependencies,
total: jobs.count,
finished: jobs.count(&:finished?),
klass: name,
Expand Down
4 changes: 2 additions & 2 deletions spec/features/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def configure
INTERNAL_CONFIGURE_SPY = double('configure spy')
expect(INTERNAL_SPY).to receive(:some_method).exactly(110).times

# One time when persisting, second time when reloading in the spec
expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(2).times
# One time when persisting; reloading does not call configure again
expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(1).time

class SimpleJob < Gush::Job
def perform
Expand Down
3 changes: 3 additions & 0 deletions spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
it "returns Workflow object" do
expected_workflow = TestWorkflow.create
workflow = client.find_workflow(expected_workflow.id)
dependencies = workflow.dependencies

expect(workflow.id).to eq(expected_workflow.id)
expect(workflow.persisted).to eq(true)
expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name))
expect(workflow.dependencies).to eq(dependencies)
end

context "when workflow has parameters" do
Expand Down
37 changes: 37 additions & 0 deletions spec/gush/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ def configure(*args, **kwargs)
flow = TestWorkflow.new(globals: { global1: 'foo' })
expect(flow.globals[:global1]).to eq('foo')
end

it "accepts internal_state" do
flow = TestWorkflow.new

internal_state = {
id: flow.id,
jobs: flow.jobs,
dependencies: flow.dependencies,
persisted: true,
stopped: true,
}

flow_copy = TestWorkflow.new(internal_state: internal_state)

expect(flow_copy.id).to eq(flow.id)
expect(flow_copy.jobs).to eq(flow.jobs)
expect(flow_copy.dependencies).to eq(flow.dependencies)
expect(flow_copy.persisted).to eq(true)
expect(flow_copy.stopped).to eq(true)
end

it "does not call #configure if needs_setup is false" do
INTERNAL_SETUP_SPY = double('configure spy')
klass = Class.new(Gush::Workflow) do
def configure(*args)
INTERNAL_SETUP_SPY.some_method
end
end

expect(INTERNAL_SETUP_SPY).to receive(:some_method).never

flow = TestWorkflow.new(internal_state: { needs_setup: false })
end
end

describe "#status" do
Expand Down Expand Up @@ -118,6 +151,10 @@ def configure(*args)
"started_at" => nil,
"finished_at" => nil,
"stopped" => false,
"dependencies" => [{
"from" => "FetchFirstJob",
"to" => job_with_id("PersistFirstJob"),
}],
"arguments" => ["arg1", "arg2"],
"kwargs" => {"arg3" => 123},
"globals" => {}
Expand Down

0 comments on commit bf3cc61

Please sign in to comment.