Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset mnesia run at from last run #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions lib/honeydew/queue/mnesia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Honeydew.Queue.Mnesia do
])

# inspect/1 here becase queue_name can be of the form {:global, poolname}
table = ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom
table = table_name(queue_name)
in_progress_table = ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom

tables = %{table => [type: :ordered_set],
Expand All @@ -94,7 +94,7 @@ defmodule Honeydew.Queue.Mnesia do
in_progress_table: in_progress_table,
access_context: access_context(table_opts)}

:ok = reset_after_crash(state)
:ok = reset_after_shutdown(state)

time_warp_mode_warning()
poll()
Expand Down Expand Up @@ -232,7 +232,14 @@ defmodule Honeydew.Queue.Mnesia do
{reply, state}
end

defp reset_after_crash(%PState{in_progress_table: in_progress_table} = state) do
defp reset_after_shutdown(state) do
reset_all_in_progress_jobs(state)
reset_all_pending_jobs(state)

:ok
end

defp reset_all_in_progress_jobs(%PState{in_progress_table: in_progress_table} = state) do
in_progress_table
|> :mnesia.dirty_first()
|> case do
Expand All @@ -244,8 +251,26 @@ defmodule Honeydew.Queue.Mnesia do
|> WrappedJob.id_from_key
|> move_to_pending_table(%{}, state)

reset_after_crash(state)
reset_all_in_progress_jobs(state)
end

:ok
end

defp reset_all_pending_jobs(%PState{access_context: access_context, table: table}) do
:mnesia.activity(access_context, fn ->
:mnesia.foldl(fn wrapped_job_record, _ ->
new_wrapped_job_record =
wrapped_job_record
|> WrappedJob.from_record()
|> WrappedJob.recalc_run_at()
|> WrappedJob.to_record()

:ok = :mnesia.delete_object(table, wrapped_job_record, :write)
:ok = :mnesia.write(table, new_wrapped_job_record, :write)
end, [], table)
end)

:ok
end

Expand Down Expand Up @@ -321,6 +346,12 @@ defmodule Honeydew.Queue.Mnesia do
{:noreply, Queue.dispatch(queue_state)}
end

def table_name(queue_name) do
["honeydew", inspect(queue_name)]
|> Enum.join("_")
|> String.to_atom()
end

defp poll do
{:ok, _} = :timer.send_after(@poll_interval, :__poll__)
end
Expand Down
28 changes: 21 additions & 7 deletions lib/honeydew/queue/mnesia/wrapped_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
alias Honeydew.Job

@record_name :wrapped_job
@record_fields [:key, :job]
@record_fields [:key, :last_run, :job]

job_filter_map =
%Job{}
Expand All @@ -14,6 +14,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
@job_filter struct(Job, job_filter_map)

defstruct [:run_at,
:last_run,
:id,
:job]

Expand All @@ -23,27 +24,28 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
def new(%Job{delay_secs: delay_secs} = job) do
id = :erlang.unique_integer()
run_at = now() + delay_secs
last_run = System.system_time(:millisecond)

job = %{job | private: id}

%__MODULE__{run_at: run_at,
id: id,
job: job}
%__MODULE__{id: id, job: job, run_at: run_at, last_run: last_run}
end

def from_record({@record_name, {run_at, id}, job}) do
def from_record({@record_name, {run_at, id}, last_run, job}) do
%__MODULE__{run_at: run_at,
last_run: last_run,
id: id,
job: job}
end

def to_record(%__MODULE__{run_at: run_at,
last_run: last_run,
id: id,
job: job}) do
{@record_name, key(run_at, id), job}
{@record_name, key(run_at, id), last_run, job}
end

def key({@record_name, key, _job}) do
def key({@record_name, key, _last_run, _job}) do
key
end

Expand All @@ -55,10 +57,20 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
id
end

def set_run_at_to_now(%__MODULE__{} = wrapped_job) do
%__MODULE__{wrapped_job | run_at: now()}
end

def recalc_run_at(%__MODULE__{last_run: last_run, job: job} = wrapped_job) do
delta_t = now - System.system_time(:second)
%__MODULE__{wrapped_job | run_at: round(last_run / 1000.0) + delta_t + job.delay_secs}
end

def id_pattern(id) do
%__MODULE__{
id: id,
run_at: :_,
last_run: :_,
job: :_
}
|> to_record
Expand All @@ -70,6 +82,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
%__MODULE__{
id: :_,
run_at: :_,
last_run: :_,
job: job
}
|> to_record
Expand All @@ -80,6 +93,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
%__MODULE__{
id: :_,
run_at: :"$1",
last_run: :"_",
job: :_
}
|> to_record
Expand Down
41 changes: 40 additions & 1 deletion test/honeydew/queue/mnesia_queue_integration_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Honeydew.MnesiaQueueIntegrationTest do
use ExUnit.Case, async: false # shares doctest queue name with ErlangQueue test
alias Honeydew.Job
alias Honeydew.Queue.Mnesia.WrappedJob
alias Honeydew.Processes

@moduletag :capture_log
Expand Down Expand Up @@ -223,7 +224,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do
assert Enum.count(monitors) < 20
end

test "resets in-progress jobs after crashing", %{queue: queue} do
test "resets in-progress jobs after restart", %{queue: queue} do
Enum.each(1..10, fn _ ->
Honeydew.async(fn -> Process.sleep(20_000) end, queue)
end)
Expand All @@ -244,6 +245,30 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do
assert in_progress == 0
end

test "resets job run_at after restart", %{queue: queue} do
:ok = Honeydew.stop_workers(queue)

num_jobs = 10
delay_secs = 15

Enum.each(1..num_jobs, fn _ ->
Honeydew.async(fn -> Process.sleep(20_000) end, queue, delay_secs: delay_secs)
end)

:ok = Honeydew.stop_queue(queue)

Process.sleep(2_000) # let the monotonic clock tick

now = :erlang.monotonic_time(:second)
:ok = start_queue(queue)

queue
|> wrapped_jobs()
|> Enum.each(fn %WrappedJob{run_at: run_at} ->
assert_in_delta run_at, now, 1
end)
end

@tag :skip_worker_pool
test "when workers join a queue with existing jobs", %{queue: queue} do
%Job{} = {:send_msg, [self(), :hi]} |> Honeydew.async(queue)
Expand Down Expand Up @@ -418,4 +443,18 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do

:ok
end

defp wrapped_jobs(queue_name) do
alias Honeydew.Queue.Mnesia, as: MnesiaQueue

table = MnesiaQueue.table_name(queue_name)

:mnesia.activity(:async_dirty, fn ->
:mnesia.foldl(fn wrapped_job_record, list ->
[wrapped_job_record | list]
end, [], table)
end)
|> Enum.map(&WrappedJob.from_record/1)
|> Enum.reverse()
end
end