Skip to content

Commit

Permalink
Merge pull request #180 from edgurgel/resilient-verk-improvements
Browse files Browse the repository at this point in the history
Resilient verk improvements
  • Loading branch information
edgurgel authored Jan 22, 2019
2 parents 36666a9 + be7d769 commit 719939a
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 130 deletions.
6 changes: 0 additions & 6 deletions lib/verk/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ defmodule Verk.Manager do
@doc false
def init(queues) do
ets = :ets.new(@table, @ets_options)
local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)

for {queue, size} <- queues do
:ets.insert_new(@table, {queue, size, :running})
Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis)
end

{:ok, ets}
Expand Down Expand Up @@ -75,8 +73,6 @@ defmodule Verk.Manager do
Logger.error("Queue #{queue} is already running")
end

local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)
Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis)
Verk.Manager.Supervisor.start_child(queue, size)
end

Expand All @@ -87,8 +83,6 @@ defmodule Verk.Manager do
@spec remove(atom) :: :ok | {:error, :not_found}
def remove(queue) do
:ets.delete(@table, queue)
local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)
Verk.Node.remove_queue!(local_verk_node_id, queue, Verk.Redis)
Verk.Manager.Supervisor.stop_child(queue)
end
end
49 changes: 26 additions & 23 deletions lib/verk/node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,6 @@ defmodule Verk.Node do

@verk_nodes_key "verk_nodes"

@spec register(String.t(), non_neg_integer, GenServer.t()) ::
:ok | {:error, :verk_node_id_already_running}
def register(verk_node_id, ttl, redis) do
case Redix.pipeline!(redis, [
["SADD", @verk_nodes_key, verk_node_id],
["PSETEX", "verk:node:#{verk_node_id}", ttl, "alive"]
]) do
[1, _] -> :ok
_ -> {:error, :node_id_already_running}
end
end

@spec deregister!(String.t(), GenServer.t()) :: :ok
def deregister!(verk_node_id, redis) do
Redix.pipeline!(redis, [
Expand All @@ -29,11 +17,12 @@ defmodule Verk.Node do
end

@spec members(integer, non_neg_integer, GenServer.t()) ::
{:ok, [String.t()]} | {:more, [String.t()], integer}
{:ok, [String.t()]} | {:more, [String.t()], integer} | {:error, term}
def members(cursor \\ 0, count \\ 25, redis) do
case Redix.command!(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do
["0", verk_nodes] -> {:ok, verk_nodes}
[cursor, verk_nodes] -> {:more, verk_nodes, cursor}
case Redix.command(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do
{:ok, ["0", verk_nodes]} -> {:ok, verk_nodes}
{:ok, [cursor, verk_nodes]} -> {:more, verk_nodes, cursor}
{:error, reason} -> {:error, reason}
end
end

Expand All @@ -42,9 +31,9 @@ defmodule Verk.Node do
Redix.command!(redis, ["PTTL", verk_node_key(verk_node_id)])
end

@spec expire_in!(String.t(), integer, GenServer.t()) :: integer
def expire_in!(verk_node_id, ttl, redis) do
Redix.command!(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"])
@spec expire_in(String.t(), integer, GenServer.t()) :: {:ok, integer} | {:error, term}
def expire_in(verk_node_id, ttl, redis) do
Redix.command(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"])
end

@spec queues!(String.t(), integer, non_neg_integer, GenServer.t()) ::
Expand All @@ -62,12 +51,26 @@ defmodule Verk.Node do
end
end

def add_queue!(verk_node_id, queue, redis) do
Redix.command!(redis, ["SADD", verk_node_queues_key(verk_node_id), queue])
@doc """
Redis command to add a queue to the set of queues that a node is processing
iex> Verk.Node.add_queue_redis_command("123", "default")
["SADD", "verk:node:123:queues", "default"]
"""
@spec add_queue_redis_command(String.t(), String.t()) :: [String.t()]
def add_queue_redis_command(verk_node_id, queue) do
["SADD", verk_node_queues_key(verk_node_id), queue]
end

def remove_queue!(verk_node_id, queue, redis) do
Redix.command!(redis, ["SREM", verk_node_queues_key(verk_node_id), queue])
@doc """
Redis command to add a queue to the set of queues that a node is processing
iex> Verk.Node.add_node_redis_command("123")
["SADD", "verk_nodes", "123"]
"""
@spec add_node_redis_command(String.t()) :: [String.t()]
def add_node_redis_command(verk_node_id) do
["SADD", @verk_nodes_key, verk_node_id]
end

defp verk_node_key(verk_node_id), do: "verk:node:#{verk_node_id}"
Expand Down
42 changes: 29 additions & 13 deletions lib/verk/node/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Verk.Node.Manager do
"Node Manager started for node #{local_verk_node_id}. Heartbeat will run every #{frequency} milliseconds"
)

:ok = Verk.Node.register(local_verk_node_id, 2 * frequency, Verk.Redis)
heartbeat(local_verk_node_id, frequency)

Process.send_after(self(), :heartbeat, frequency)
Process.flag(:trap_exit, true)
Verk.Scripts.load(Verk.Redis)
Expand All @@ -28,20 +29,37 @@ defmodule Verk.Node.Manager do

@doc false
def handle_info(:heartbeat, state = {local_verk_node_id, frequency}) do
faulty_nodes = find_faulty_nodes(local_verk_node_id)
heartbeat(local_verk_node_id, frequency)

for faulty_verk_node_id <- faulty_nodes do
Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!")
with {:ok, faulty_nodes} <- find_faulty_nodes(local_verk_node_id) do
for faulty_verk_node_id <- faulty_nodes do
Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!")

cleanup_queues(faulty_verk_node_id)
cleanup_queues(faulty_verk_node_id)

Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis)
Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis)
end
else
{:error, reason} ->
Logger.error("Failed while looking for faulty nodes. Reason: #{inspect(reason)}")
end

heartbeat!(local_verk_node_id, frequency)
Process.send_after(self(), :heartbeat, frequency)
{:noreply, state}
end

defp heartbeat(local_verk_node_id, frequency) do
case Verk.Node.expire_in(local_verk_node_id, 2 * frequency, Verk.Redis) do
{:ok, _} ->
:ok

{:error, reason} ->
Logger.error(
"Failed to heartbeat node '#{local_verk_node_id}'. Reason: #{inspect(reason)}"
)
end
end

def terminate(reason = {:shutdown, _}, {local_verk_node_id, _}) do
do_terminate(reason, local_verk_node_id)
end
Expand Down Expand Up @@ -82,11 +100,14 @@ defmodule Verk.Node.Manager do
defp find_faulty_nodes(local_verk_node_id, cursor \\ 0) do
case Verk.Node.members(cursor, Verk.Redis) do
{:ok, verk_nodes} ->
do_find_faulty_nodes(verk_nodes, local_verk_node_id)
{:ok, do_find_faulty_nodes(verk_nodes, local_verk_node_id)}

{:more, verk_nodes, cursor} ->
do_find_faulty_nodes(verk_nodes, local_verk_node_id) ++
find_faulty_nodes(local_verk_node_id, cursor)

{:error, reason} ->
{:error, reason}
end
end

Expand All @@ -96,11 +117,6 @@ defmodule Verk.Node.Manager do
end)
end

defp heartbeat!(local_verk_node_id, frequency) do
Verk.Node.expire_in!(local_verk_node_id, 2 * frequency, Verk.Redis)
Process.send_after(self(), :heartbeat, frequency)
end

defp enqueue_inprogress(node_id, queue) do
case InProgressQueue.enqueue_in_progress(queue, node_id, Verk.Redis) do
{:ok, [0, m]} ->
Expand Down
57 changes: 43 additions & 14 deletions lib/verk/queue_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Verk.QueueManager do

defmodule State do
@moduledoc false
defstruct [:queue_name, :redis, :node_id]
defstruct [:queue_name, :redis, :node_id, :track_node_id]
end

@doc """
Expand Down Expand Up @@ -80,7 +80,14 @@ defmodule Verk.QueueManager do
{:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url))
Verk.Scripts.load(redis)

state = %State{queue_name: queue_name, redis: redis, node_id: node_id}
track_node_id = Application.get_env(:verk, :generate_node_id, false)

state = %State{
queue_name: queue_name,
redis: redis,
node_id: node_id,
track_node_id: track_node_id
}

Logger.info("Queue Manager started for queue #{queue_name}")
{:ok, state}
Expand Down Expand Up @@ -118,18 +125,8 @@ defmodule Verk.QueueManager do
end
end

def handle_call({:dequeue, n}, _from, state) do
case Redix.command(state.redis, [
"EVALSHA",
@mrpop_lpush_src_dest_script_sha,
2,
"queue:#{state.queue_name}",
inprogress(state.queue_name, state.node_id),
min(@max_jobs, n)
]) do
{:ok, []} ->
{:reply, [], state}

def handle_call({:dequeue, n}, _from, state = %State{track_node_id: false}) do
case Redix.command(state.redis, mrpop_lpush_src_dest(state.node_id, state.queue_name, n)) do
{:ok, jobs} ->
{:reply, jobs, state}

Expand All @@ -142,6 +139,27 @@ defmodule Verk.QueueManager do
end
end

def handle_call({:dequeue, n}, _from, state = %State{track_node_id: true}) do
case Redix.pipeline(state.redis, [
["MULTI"],
Verk.Node.add_node_redis_command(state.node_id),
Verk.Node.add_queue_redis_command(state.node_id, state.queue_name),
mrpop_lpush_src_dest(state.node_id, state.queue_name, n),
["EXEC"]
]) do
{:ok, response} ->
jobs = response |> List.last() |> List.last()
{:reply, jobs, state}

{:error, %Redix.Error{message: message}} ->
Logger.error("Failed to fetch jobs: #{message}")
{:stop, :redis_failed, :redis_failed, state}

{:error, _} ->
{:reply, :redis_failed, state}
end
end

def handle_call({:retry, job, failed_at, exception, stacktrace}, _from, state) do
retry_count = (job.retry_count || 0) + 1
job = build_retry_job(job, retry_count, failed_at, exception, stacktrace)
Expand Down Expand Up @@ -215,4 +233,15 @@ defmodule Verk.QueueManager do
end

defp format_stacktrace(stacktrace), do: inspect(stacktrace)

defp mrpop_lpush_src_dest(node_id, queue_name, n) do
[
"EVALSHA",
@mrpop_lpush_src_dest_script_sha,
2,
"queue:#{queue_name}",
inprogress(queue_name, node_id),
min(@max_jobs, n)
]
end
end
32 changes: 29 additions & 3 deletions test/integration/test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ defmodule IntegrationTest do
end
end

setup_all do
setup do
Application.put_env(:verk, :generate_node_id, false, persistent: true)
Application.delete_env(:verk, :local_node_id, persistent: true)
{:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url))
Redix.command!(redis, ["FLUSHDB"])
{:ok, redis: redis}
end

@tag integration: true
test "shutdown", %{redis: redis} do
defp enqueue_jobs!(redis) do
for _x <- 0..10 do
Verk.enqueue(
%Verk.Job{queue: "queue_one", class: Integration.SleepWorker, args: [1_500]},
Expand All @@ -37,9 +38,34 @@ defmodule IntegrationTest do
redis
)
end
end

@tag integration: true
test "shutdown gracefully stops queues", %{redis: redis} do
enqueue_jobs!(redis)

Application.ensure_all_started(:integration)
{:ok, _consumer} = Consumer.start()
assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == []
Application.stop(:integration)

assert_receive %Verk.Events.QueuePausing{queue: :queue_one}
assert_receive %Verk.Events.QueuePausing{queue: :queue_two}
assert_receive %Verk.Events.QueuePaused{queue: :queue_one}
assert_receive %Verk.Events.QueuePaused{queue: :queue_two}
end

@tag integration: true
test "generate_node_id true maintains verk_nodes", %{redis: redis} do
enqueue_jobs!(redis)

Application.put_env(:verk, :generate_node_id, true, persistent: true)
Application.ensure_all_started(:integration)
{:ok, _consumer} = Consumer.start()
node_id = Application.fetch_env!(:verk, :local_node_id)
assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == [node_id]
assert Redix.command!(redis, ["TTL", "verk:node:#{node_id}"]) > 0

Application.stop(:integration)

assert_receive %Verk.Events.QueuePausing{queue: :queue_one}
Expand Down
5 changes: 0 additions & 5 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ defmodule Verk.ManagerTest do
describe "init/1" do
test "creates an ETS table with queues" do
queues = [default: 25, low_priority: 10]
expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok)
expect(Verk.Node, :add_queue!, [@node_id, :low_priority, Verk.Redis], :ok)
init(queues)

assert :ets.tab2list(:verk_manager) == [
Expand Down Expand Up @@ -128,7 +126,6 @@ defmodule Verk.ManagerTest do
init_table([])

expect(Verk.Manager.Supervisor, :start_child, [:default, 25], {:ok, :child})
expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok)

assert add(:default, 25) == {:ok, :child}
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}]
Expand All @@ -142,7 +139,6 @@ defmodule Verk.ManagerTest do
init_table(queues)

expect(Verk.Manager.Supervisor, :stop_child, [:default], :ok)
expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok)

assert remove(:default) == :ok
assert :ets.tab2list(:verk_manager) == [{:low_priority, 10, :running}]
Expand All @@ -154,7 +150,6 @@ defmodule Verk.ManagerTest do
init_table(queues)

expect(Verk.Manager.Supervisor, :stop_child, [:default], {:error, :not_found})
expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok)

assert remove(:default) == {:error, :not_found}
assert validate(Verk.Manager.Supervisor)
Expand Down
Loading

0 comments on commit 719939a

Please sign in to comment.