Skip to content

Commit

Permalink
Improved redis performance in large keyspace, added pagination, and a…
Browse files Browse the repository at this point in the history
…uto-expiration

This commit makes several interrelated changes:

1. Replaced the redis key scan to find job keys in `Client#find_workflow` with
   job class name persistence in `Workflow#to_hash`. This significantly improves
   performance when loading many workflows because it avoids n key scans.

2. Added `Client#workflow_ids` with sorting by creation timestamp and pagination
   as an alternative to `Client#all_workflows`, which has performance issues in
   large keyspaces and returns unwieldy amounts of data given a large number of
   workflows.

3. Added workflow and job indexes by `created_at` and `expires_at` timestamps.
   The former is necessary for paging through sorted workflow ids, and the latter
   is necessary to remove data on expiration.

4. Replace use of redis key TTL with explicit expiration via `Client#expire_workflows`,
   since there's no other way to remove data from the indexes.

5. Added a migration file (and infrastructure) to migrate to the new indexes
   and expiration format.

Given a redis instance with 10,000 workflows, this set of changes allows a page
of the most recent 100 workflows to be loaded in 0.1 seconds, whereas previously
`all_workflows` would take hours to return data.

(Or, for a less extreme example of 1000 workflows, we can load 100 workflows in
0.1 seconds compared to `all_workflows` taking 42 seconds).
  • Loading branch information
noahfpf committed Aug 6, 2024
1 parent bf3cc61 commit 2a0f336
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 18 deletions.
51 changes: 48 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,28 @@ flow.status

`reload` is needed to see the latest status, since workflows are updated asynchronously.

## Loading workflows

### Finding a workflow by id

```
flow = Workflow.find(id)
```

### Paging through workflows

To get workflows with pagination, use start and stop (inclusive) index values:

```
flows = Workflow.page(0, 99)
```

Or in reverse order:

```
flows = Workflow.page(0, 99, order: :desc)
```

## Advanced features

### Global parameters for jobs
Expand Down Expand Up @@ -418,12 +440,18 @@ end
bundle exec gush show <workflow_id>
```

- of all created workflows:
- of a page of workflows:

```
bundle exec gush list
```

- of the most recent 100 workflows

```
bundle exec gush list -99 -1
```

### Vizualizing workflows as image

This requires that you have imagemagick installed on your computer:
Expand All @@ -449,7 +477,9 @@ end

### Cleaning up afterwards

Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment).
Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is run. This data might be useful for analysis but at a certain point it can be purged. By default gush and Redis will keep keys forever. To configure expiration you need to do two things.

1. Create an initializer that specifies `config.ttl` in seconds. Best NOT to set TTL to be too short (like minutes) but about a week in length.

```ruby
# config/initializers/gush.rb
Expand All @@ -460,7 +490,9 @@ Gush.configure do |config|
end
```

And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values.
2. Call `Client#expire_workflows` periodically, which will clear all expired stored workflow and job data and indexes. This method can be called at any rate, but ideally should be called at least once for every 1000 workflows created.

If you need more control over individual workflow expiration, you can call `flow.expire!(ttl)` with a TTL different from the Gush configuration, or with -1 to never expire the workflow.

### Avoid overlapping workflows

Expand All @@ -478,6 +510,19 @@ def find_by_class klass
end
```

## Gush 3.0 Migration

Gush 3.0 adds indexing for fast workflow pagination and changes the mechanism for expiring workflow data from Redis.

### Migration

Run `bundle exec gush migrate` after upgrading. This will update internal data structures.

### Expiration API

Periodically run `Gush::Client.new.expire_workflows` to expire data. Workflows will be automatically enrolled in this expiration, so there is no longer a need to call `workflow.expire!`.


## Contributors

- [Mateusz Lenik](https://github.com/mlen)
Expand Down
1 change: 1 addition & 0 deletions lib/gush.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require "gush/configuration"
require "gush/errors"
require "gush/job"
require "gush/migration"
require "gush/worker"
require "gush/workflow"

Expand Down
29 changes: 26 additions & 3 deletions lib/gush/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ def rm(workflow_id)
client.destroy_workflow(workflow)
end

desc "list", "Lists all workflows with their statuses"
def list
workflows = client.all_workflows
desc "list START STOP", "Lists workflows from START index through STOP index with their statuses"
option :start, type: :numeric, default: nil
option :stop, type: :numeric, default: nil
def list(start=nil, stop=nil)
workflows = client.workflow_ids(start, stop).map do |id|
client.find_workflow(id)
end

rows = workflows.map do |workflow|
[workflow.id, (Time.at(workflow.started_at) if workflow.started_at), workflow.class, {alignment: :center, value: status_for(workflow)}]
end
Expand Down Expand Up @@ -120,6 +125,24 @@ def viz(class_or_id)
end
end

desc "migrate", "Runs all unapplied migrations to Gush storage"
def migrate
Dir[File.join(__dir__, 'migrate', '*.rb')].each {|file| require file }

applied = Gush::Migration.subclasses.sort(&:version).count do |klass|
migration = klass.new
next if migration.migrated?

puts "Migrating to #{klass.name} (#{migration.version})"
migration.migrate
puts "== #{migration.version} #{klass.name}: migrated ==="

true
end

puts "#{applied} #{"migrations".pluralize(applied)} applied"
end

private

def client
Expand Down
98 changes: 91 additions & 7 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,37 @@ def next_free_workflow_id
id
end

# Returns the specified range of workflow ids, sorted by created timestamp.
#
# @param start, stop [Integer] see https://redis.io/docs/latest/commands/zrange/#index-ranges
# for details on the start and stop parameters.
# @param by_ts [Boolean] if true, start and stop are treated as timestamps
# rather than as element indexes, which allows the workflows to be indexed
# by created timestamp
# @param order [Symbol] if :asc, finds ids in ascending created timestamp;
# if :desc, finds ids in descending created timestamp
# @returns [Array<String>] array of workflow ids
def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc)
start ||= 0
stop ||= 99

redis.zrange(
"gush.idx.workflows.created_at",
start,
stop,
by_score: by_ts,
rev: order&.to_sym == :desc
)
end

def workflows(start=nil, stop=nil, **kwargs)
workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) }
end

# Deprecated.
#
# This method is not performant when there are a large number of workflows
# or when the redis keyspace is large. Use workflows instead with pagination.
def all_workflows
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
Expand All @@ -92,7 +123,13 @@ def find_workflow(id)

unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
keys = redis.scan_each(match: "gush.jobs.#{id}.*")

if hash[:job_klasses]
keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" }
else
# For backwards compatibility, get job keys via a full keyspace scan
keys = redis.scan_each(match: "gush.jobs.#{id}.*")
end

nodes = keys.each_with_object([]) do |key, array|
array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
Expand All @@ -105,15 +142,25 @@ def find_workflow(id)
end

def persist_workflow(workflow)
created_at = Time.now.to_f
added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true)

if added && configuration.ttl&.positive?
expires_at = created_at + configuration.ttl
redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true)
end

redis.set("gush.workflows.#{workflow.id}", workflow.to_json)

workflow.jobs.each {|job| persist_job(workflow.id, job) }
workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) }
workflow.mark_as_persisted

true
end

def persist_job(workflow_id, job)
def persist_job(workflow_id, job, expires_at: nil)
redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at

redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end

Expand All @@ -134,22 +181,59 @@ def find_job(workflow_id, job_name)

def destroy_workflow(workflow)
redis.del("gush.workflows.#{workflow.id}")
redis.zrem("gush.idx.workflows.created_at", workflow.id)
redis.zrem("gush.idx.workflows.expires_at", workflow.id)
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

def destroy_job(workflow_id, job)
redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end

def expire_workflows(expires_at=nil)
expires_at ||= Time.now.to_f

ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true)
return if ids.empty?

redis.del(ids.map { |id| "gush.workflows.#{id}" })
redis.zrem("gush.idx.workflows.created_at", ids)
redis.zrem("gush.idx.workflows.expires_at", ids)

expire_jobs(expires_at)
end

def expire_jobs(expires_at=nil)
expires_at ||= Time.now.to_f

keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true)
return if keys.empty?

redis.del(keys.map { |key| "gush.jobs.#{key}" })
redis.zrem("gush.idx.jobs.expires_at", keys)
end

def expire_workflow(workflow, ttl=nil)
ttl = ttl || configuration.ttl
redis.expire("gush.workflows.#{workflow.id}", ttl)
ttl ||= configuration.ttl

if ttl&.positive?
redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id)
else
redis.zrem("gush.idx.workflows.expires_at", workflow.id)
end

workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end

def expire_job(workflow_id, job, ttl=nil)
ttl = ttl || configuration.ttl
redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl)
ttl ||= configuration.ttl

if ttl&.positive?
redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}")
else
redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end
end

def enqueue_job(workflow_id, job)
Expand Down
21 changes: 21 additions & 0 deletions lib/gush/migrate/1_create_gush_workflows_created.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Gush
class IndexWorkflowsByCreatedAtAndExpiresAt < Gush::Migration
def self.version
1
end

def up
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
workflow = client.find_workflow(id)

ttl = redis.ttl(key)
redis.persist(key)
workflow.jobs.each { |job| redis.persist("gush.jobs.#{id}.#{job.klass}") }

client.persist_workflow(workflow)
client.expire_workflow(workflow, ttl.positive? ? ttl : -1)
end
end
end
end
36 changes: 36 additions & 0 deletions lib/gush/migration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Gush
class Migration
def migrate
return if migrated?

up
migrated!
end

def up
# subclass responsibility
end

def version
self.class.version
end

def migrated?
redis.sismember("gush.migration.schema_migrations", version)
end

private

def migrated!
redis.sadd("gush.migration.schema_migrations", version)
end

def client
@client ||= Client.new
end

def redis
Gush::Client.redis_connection(client.configuration)
end
end
end
5 changes: 5 additions & 0 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def self.find(id)
Gush::Client.new.find_workflow(id)
end

def self.page(start=0, stop=99, order: :asc)
Gush::Client.new.workflows(start, stop, order: order)
end

def self.create(*args, **kwargs)
flow = new(*args, **kwargs)
flow.save
Expand Down Expand Up @@ -184,6 +188,7 @@ def to_hash
total: jobs.count,
finished: jobs.count(&:finished?),
klass: name,
job_klasses: jobs.map(&:class).map(&:to_s).uniq,
status: status,
stopped: stopped,
started_at: started_at,
Expand Down
Loading

0 comments on commit 2a0f336

Please sign in to comment.