From c7873caf4981fc6375d6336a3bd124ffe86dc985 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Thu, 17 Jan 2019 12:56:35 +1300 Subject: [PATCH 1/7] Change Node.Manager to not crash if Redis fails --- lib/verk/node.ex | 15 ++++++++------- lib/verk/node/manager.ex | 39 ++++++++++++++++++++++++++------------ test/node/manager_test.exs | 6 +++--- test/node_test.exs | 4 ++-- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/lib/verk/node.ex b/lib/verk/node.ex index 085240b..ce5b2dd 100644 --- a/lib/verk/node.ex +++ b/lib/verk/node.ex @@ -29,11 +29,12 @@ defmodule Verk.Node do end @spec members(integer, non_neg_integer, GenServer.t()) :: - {:ok, [String.t()]} | {:more, [String.t()], integer} + {:ok, [String.t()]} | {:more, [String.t()], integer} | {:error, term} def members(cursor \\ 0, count \\ 25, redis) do - case Redix.command!(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do - ["0", verk_nodes] -> {:ok, verk_nodes} - [cursor, verk_nodes] -> {:more, verk_nodes, cursor} + case Redix.command(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do + {:ok, ["0", verk_nodes]} -> {:ok, verk_nodes} + {:ok, [cursor, verk_nodes]} -> {:more, verk_nodes, cursor} + {:error, reason} -> {:error, reason} end end @@ -42,9 +43,9 @@ defmodule Verk.Node do Redix.command!(redis, ["PTTL", verk_node_key(verk_node_id)]) end - @spec expire_in!(String.t(), integer, GenServer.t()) :: integer - def expire_in!(verk_node_id, ttl, redis) do - Redix.command!(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"]) + @spec expire_in(String.t(), integer, GenServer.t()) :: {:ok, integer} | {:error, term} + def expire_in(verk_node_id, ttl, redis) do + Redix.command(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"]) end @spec queues!(String.t(), integer, non_neg_integer, GenServer.t()) :: diff --git a/lib/verk/node/manager.ex b/lib/verk/node/manager.ex index 07a4818..881bcc4 100644 --- a/lib/verk/node/manager.ex +++ b/lib/verk/node/manager.ex @@ -28,20 +28,37 @@ defmodule Verk.Node.Manager do @doc false def handle_info(:heartbeat, state = {local_verk_node_id, frequency}) do - faulty_nodes = find_faulty_nodes(local_verk_node_id) + heartbeat(local_verk_node_id, frequency) - for faulty_verk_node_id <- faulty_nodes do - Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!") + with {:ok, faulty_nodes} <- find_faulty_nodes(local_verk_node_id) do + for faulty_verk_node_id <- faulty_nodes do + Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!") - cleanup_queues(faulty_verk_node_id) + cleanup_queues(faulty_verk_node_id) - Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis) + Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis) + end + else + {:error, reason} -> + Logger.error("Failed while looking for faulty nodes. Reason: #{inspect(reason)}") end - heartbeat!(local_verk_node_id, frequency) + Process.send_after(self(), :heartbeat, frequency) {:noreply, state} end + defp heartbeat(local_verk_node_id, frequency) do + case Verk.Node.expire_in(local_verk_node_id, 2 * frequency, Verk.Redis) do + {:ok, _} -> + :ok + + {:error, reason} -> + Logger.error( + "Failed to heartbeat node '#{local_verk_node_id}'. Reason: #{inspect(reason)}" + ) + end + end + def terminate(reason = {:shutdown, _}, {local_verk_node_id, _}) do do_terminate(reason, local_verk_node_id) end @@ -82,11 +99,14 @@ defmodule Verk.Node.Manager do defp find_faulty_nodes(local_verk_node_id, cursor \\ 0) do case Verk.Node.members(cursor, Verk.Redis) do {:ok, verk_nodes} -> - do_find_faulty_nodes(verk_nodes, local_verk_node_id) + {:ok, do_find_faulty_nodes(verk_nodes, local_verk_node_id)} {:more, verk_nodes, cursor} -> do_find_faulty_nodes(verk_nodes, local_verk_node_id) ++ find_faulty_nodes(local_verk_node_id, cursor) + + {:error, reason} -> + {:error, reason} end end @@ -96,11 +116,6 @@ defmodule Verk.Node.Manager do end) end - defp heartbeat!(local_verk_node_id, frequency) do - Verk.Node.expire_in!(local_verk_node_id, 2 * frequency, Verk.Redis) - Process.send_after(self(), :heartbeat, frequency) - end - defp enqueue_inprogress(node_id, queue) do case InProgressQueue.enqueue_in_progress(queue, node_id, Verk.Redis) do {:ok, [0, m]} -> diff --git a/test/node/manager_test.exs b/test/node/manager_test.exs index 3cdc110..818e12d 100644 --- a/test/node/manager_test.exs +++ b/test/node/manager_test.exs @@ -35,7 +35,7 @@ defmodule Verk.Node.ManagerTest do test "heartbeat when only one node" do state = {@verk_node_id, @frequency} expect(Verk.Node, :members, [0, Verk.Redis], {:ok, [@verk_node_id]}) - expect(Verk.Node, :expire_in!, [@verk_node_id, 2 * @frequency, Verk.Redis], :ok) + expect(Verk.Node, :expire_in, [@verk_node_id, 2 * @frequency, Verk.Redis], {:ok, 1}) assert handle_info(:heartbeat, state) == {:noreply, state} assert_receive :heartbeat end @@ -45,7 +45,7 @@ defmodule Verk.Node.ManagerTest do state = {@verk_node_id, @frequency} expect(Verk.Node, :members, [0, Verk.Redis], {:ok, [@verk_node_id, alive_node_id]}) expect(Verk.Node, :ttl!, [alive_node_id, Verk.Redis], 500) - expect(Verk.Node, :expire_in!, [@verk_node_id, 2 * @frequency, Verk.Redis], :ok) + expect(Verk.Node, :expire_in, [@verk_node_id, 2 * @frequency, Verk.Redis], {:ok, 1}) assert handle_info(:heartbeat, state) == {:noreply, state} assert_receive :heartbeat end @@ -57,7 +57,7 @@ defmodule Verk.Node.ManagerTest do expect(Verk.Node, :members, [0, Verk.Redis], {:more, [@verk_node_id], 123}) expect(Verk.Node, :members, [123, Verk.Redis], {:ok, [dead_node_id]}) expect(Verk.Node, :ttl!, [dead_node_id, Verk.Redis], -2) - expect(Verk.Node, :expire_in!, [@verk_node_id, 2 * @frequency, Verk.Redis], :ok) + expect(Verk.Node, :expire_in, [@verk_node_id, 2 * @frequency, Verk.Redis], {:ok, 1}) expect(Verk.Node, :queues!, ["dead-node", 0, Verk.Redis], {:more, ["queue_1"], 123}) expect(Verk.Node, :queues!, ["dead-node", 123, Verk.Redis], {:ok, ["queue_2"]}) diff --git a/test/node_test.exs b/test/node_test.exs index b88ac37..48f223f 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -80,9 +80,9 @@ defmodule Verk.NodeTest do end end - describe "expire_in!/3" do + describe "expire_in/3" do test "resets expiration item", %{redis: redis} do - assert expire_in!(@node, 888, redis) + assert {:ok, _} = expire_in(@node, 888, redis) assert_in_delta Redix.command!(redis, ["PTTL", @node_key]), 888, 5 end end From d9eed804585f264113a34e1a4b0ce6bce3a33921 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Tue, 22 Jan 2019 14:14:10 +1300 Subject: [PATCH 2/7] Add Verk.Node.add_node_redis_command/1 --- lib/verk/node.ex | 11 +++++++++++ test/node_test.exs | 1 + 2 files changed, 12 insertions(+) diff --git a/lib/verk/node.ex b/lib/verk/node.ex index ce5b2dd..19482da 100644 --- a/lib/verk/node.ex +++ b/lib/verk/node.ex @@ -63,6 +63,17 @@ defmodule Verk.Node do end end + @doc """ + Redis command to add a queue to the set of queues that a node is processing + + iex> Verk.Node.add_node_redis_command("123") + ["SADD", "verk_nodes", "123"] + """ + @spec add_node_redis_command(String.t()) :: [String.t()] + def add_node_redis_command(verk_node_id) do + ["SADD", @verk_nodes_key, verk_node_id] + end + def add_queue!(verk_node_id, queue, redis) do Redix.command!(redis, ["SADD", verk_node_queues_key(verk_node_id), queue]) end diff --git a/test/node_test.exs b/test/node_test.exs index 48f223f..e47a1f1 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -1,6 +1,7 @@ defmodule Verk.NodeTest do use ExUnit.Case import Verk.Node + doctest Verk.Node @verk_nodes_key "verk_nodes" From 71bcdef9ef4d158021b22608c9f298ab94186985 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Tue, 22 Jan 2019 14:14:32 +1300 Subject: [PATCH 3/7] Add Verk.Node.add_queue_redis_command/2 --- lib/verk/node.ex | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/verk/node.ex b/lib/verk/node.ex index 19482da..98f8e38 100644 --- a/lib/verk/node.ex +++ b/lib/verk/node.ex @@ -63,6 +63,17 @@ defmodule Verk.Node do end end + @doc """ + Redis command to add a queue to the set of queues that a node is processing + + iex> Verk.Node.add_queue_redis_command("123", "default") + ["SADD", "verk:node:123:queues", "default"] + """ + @spec add_queue_redis_command(String.t(), String.t()) :: [String.t()] + def add_queue_redis_command(verk_node_id, queue) do + ["SADD", verk_node_queues_key(verk_node_id), queue] + end + @doc """ Redis command to add a queue to the set of queues that a node is processing From fa7c89647b8510ebbd8c96425851cef5289ad721 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Tue, 22 Jan 2019 14:16:07 +1300 Subject: [PATCH 4/7] Change QueueManager to conditionally track node information If generate_node_id is true it will ensure that the set of nodes includes the local node_id and that the set of tracked queues includes the managed queue If generate_node_id is false then it will work as previously --- lib/verk/queue_manager.ex | 57 ++++++++--- test/queue_manager_test.exs | 182 ++++++++++++++++++++++++++++++++---- 2 files changed, 208 insertions(+), 31 deletions(-) diff --git a/lib/verk/queue_manager.ex b/lib/verk/queue_manager.ex index 31d7a12..11d9653 100644 --- a/lib/verk/queue_manager.ex +++ b/lib/verk/queue_manager.ex @@ -16,7 +16,7 @@ defmodule Verk.QueueManager do defmodule State do @moduledoc false - defstruct [:queue_name, :redis, :node_id] + defstruct [:queue_name, :redis, :node_id, :track_node_id] end @doc """ @@ -80,7 +80,14 @@ defmodule Verk.QueueManager do {:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url)) Verk.Scripts.load(redis) - state = %State{queue_name: queue_name, redis: redis, node_id: node_id} + track_node_id = Application.get_env(:verk, :generate_node_id, false) + + state = %State{ + queue_name: queue_name, + redis: redis, + node_id: node_id, + track_node_id: track_node_id + } Logger.info("Queue Manager started for queue #{queue_name}") {:ok, state} @@ -118,18 +125,8 @@ defmodule Verk.QueueManager do end end - def handle_call({:dequeue, n}, _from, state) do - case Redix.command(state.redis, [ - "EVALSHA", - @mrpop_lpush_src_dest_script_sha, - 2, - "queue:#{state.queue_name}", - inprogress(state.queue_name, state.node_id), - min(@max_jobs, n) - ]) do - {:ok, []} -> - {:reply, [], state} - + def handle_call({:dequeue, n}, _from, state = %State{track_node_id: false}) do + case Redix.command(state.redis, mrpop_lpush_src_dest(state.node_id, state.queue_name, n)) do {:ok, jobs} -> {:reply, jobs, state} @@ -142,6 +139,27 @@ defmodule Verk.QueueManager do end end + def handle_call({:dequeue, n}, _from, state = %State{track_node_id: true}) do + case Redix.pipeline(state.redis, [ + ["MULTI"], + Verk.Node.add_node_redis_command(state.node_id), + Verk.Node.add_queue_redis_command(state.node_id, state.queue_name), + mrpop_lpush_src_dest(state.node_id, state.queue_name, n), + ["EXEC"] + ]) do + {:ok, response} -> + jobs = response |> List.last() |> List.last() + {:reply, jobs, state} + + {:error, %Redix.Error{message: message}} -> + Logger.error("Failed to fetch jobs: #{message}") + {:stop, :redis_failed, :redis_failed, state} + + {:error, _} -> + {:reply, :redis_failed, state} + end + end + def handle_call({:retry, job, failed_at, exception, stacktrace}, _from, state) do retry_count = (job.retry_count || 0) + 1 job = build_retry_job(job, retry_count, failed_at, exception, stacktrace) @@ -215,4 +233,15 @@ defmodule Verk.QueueManager do end defp format_stacktrace(stacktrace), do: inspect(stacktrace) + + defp mrpop_lpush_src_dest(node_id, queue_name, n) do + [ + "EVALSHA", + @mrpop_lpush_src_dest_script_sha, + 2, + "queue:#{queue_name}", + inprogress(queue_name, node_id), + min(@max_jobs, n) + ] + end end diff --git a/test/queue_manager_test.exs b/test/queue_manager_test.exs index bc4f49c..d4e0335 100644 --- a/test/queue_manager_test.exs +++ b/test/queue_manager_test.exs @@ -13,6 +13,7 @@ defmodule Verk.QueueManagerTest do on_exit(fn -> unload() Application.delete_env(:verk, :local_node_id) + Application.delete_env(:verk, :generate_node_id) end) :ok @@ -26,7 +27,7 @@ defmodule Verk.QueueManagerTest do end describe "init/1" do - test "sets up redis connection" do + test "sets up proper State" do redis_url = Confex.get_env(:verk, :redis_url) node_id = Confex.fetch_env!(:verk, :local_node_id) @@ -34,7 +35,33 @@ defmodule Verk.QueueManagerTest do expect(Verk.Scripts, :load, [:redis], :ok) assert init(["queue_name"]) == - {:ok, %State{node_id: node_id, queue_name: "queue_name", redis: :redis}} + {:ok, + %State{ + node_id: node_id, + queue_name: "queue_name", + redis: :redis, + track_node_id: false + }} + + assert validate([Redix, Verk.Scripts]) + end + + test "sets up proper State with generate_node_id true" do + redis_url = Confex.get_env(:verk, :redis_url) + node_id = Confex.fetch_env!(:verk, :local_node_id) + Application.put_env(:verk, :generate_node_id, true) + + expect(Redix, :start_link, [redis_url], {:ok, :redis}) + expect(Verk.Scripts, :load, [:redis], :ok) + + assert init(["queue_name"]) == + {:ok, + %State{ + node_id: node_id, + queue_name: "queue_name", + redis: :redis, + track_node_id: true + }} assert validate([Redix, Verk.Scripts]) end @@ -98,8 +125,19 @@ defmodule Verk.QueueManagerTest do end end - describe "handle_call/3 dequeue" do - test "dequeue with an empty queue" do + describe "handle_call/3 dequeue track_node_id false" do + setup do + state = %State{ + queue_name: "test_queue", + redis: :redis, + node_id: "test_node", + track_node_id: false + } + + {:ok, state: state} + end + + test "dequeue with an empty queue", %{state: state} do expect( Redix, :command, @@ -110,14 +148,12 @@ defmodule Verk.QueueManagerTest do {:ok, []} ) - state = %State{queue_name: "test_queue", redis: :redis, node_id: "test_node"} - assert handle_call({:dequeue, 3}, :from, state) == {:reply, [], state} assert validate(Redix) end - test "dequeue with a non empty queue" do + test "dequeue with a non empty queue", %{state: state} do expect( Redix, :command, @@ -128,13 +164,11 @@ defmodule Verk.QueueManagerTest do {:ok, ["job"]} ) - state = %State{queue_name: "test_queue", redis: :redis, node_id: "test_node"} - assert handle_call({:dequeue, 3}, :from, state) == {:reply, ["job"], state} assert validate([Redix]) end - test "dequeue with a non empty queue and more than max_jobs" do + test "dequeue with a non empty queue and more than max_jobs", %{state: state} do expect( Redix, :command, @@ -152,26 +186,140 @@ defmodule Verk.QueueManagerTest do {:ok, ["job"]} ) - state = %State{queue_name: "test_queue", redis: :redis, node_id: "test_node"} - assert handle_call({:dequeue, 500}, :from, state) == {:reply, ["job"], state} assert validate([Redix]) end - test "dequeue and redis failed" do + test "dequeue and redis failed", %{state: state} do expect(Redix, :command, 2, {:error, :reason}) - state = %State{queue_name: "test_queue", redis: :redis} - assert handle_call({:dequeue, 3}, :from, state) == {:reply, :redis_failed, state} assert validate(Redix) end - test "dequeue and redis failed to evaluate the script" do + test "dequeue and redis failed to evaluate the script", %{state: state} do expect(Redix, :command, 2, {:error, %Redix.Error{message: "a message"}}) - state = %State{queue_name: "test_queue", redis: :redis} + assert handle_call({:dequeue, 3}, :from, state) == + {:stop, :redis_failed, :redis_failed, state} + + assert validate(Redix) + end + + test "dequeue and timeout" do + pid = spawn_link(fn -> :timer.sleep(5000) end) + assert dequeue(pid, 1, 1) == :timeout + end + end + + describe "handle_call/3 dequeue track_node_id true" do + setup do + state = %State{ + queue_name: "test_queue", + redis: :redis, + node_id: "test_node", + track_node_id: true + } + + {:ok, state: state} + end + + test "dequeue with an empty queue", %{state: state} do + expect( + Redix, + :pipeline, + [ + :redis, + [ + ["MULTI"], + ["SADD", "verk_nodes", state.node_id], + ["SADD", "verk:node:#{state.node_id}:queues", state.queue_name], + [ + "EVALSHA", + @mrpop_script, + 2, + "queue:test_queue", + "inprogress:test_queue:test_node", + 3 + ], + ["EXEC"] + ] + ], + {:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", [0, 0, []]]} + ) + + assert handle_call({:dequeue, 3}, :from, state) == {:reply, [], state} + + assert validate(Redix) + end + + test "dequeue with a non empty queue", %{state: state} do + expect( + Redix, + :pipeline, + [ + :redis, + [ + ["MULTI"], + ["SADD", "verk_nodes", state.node_id], + ["SADD", "verk:node:#{state.node_id}:queues", state.queue_name], + [ + "EVALSHA", + @mrpop_script, + 2, + "queue:test_queue", + "inprogress:test_queue:test_node", + 3 + ], + ["EXEC"] + ] + ], + {:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", [0, 0, ["job"]]]} + ) + + assert handle_call({:dequeue, 3}, :from, state) == {:reply, ["job"], state} + assert validate([Redix]) + end + + test "dequeue with a non empty queue and more than max_jobs", %{state: state} do + expect( + Redix, + :pipeline, + [ + :redis, + [ + ["MULTI"], + ["SADD", "verk_nodes", state.node_id], + ["SADD", "verk:node:#{state.node_id}:queues", state.queue_name], + [ + "EVALSHA", + @mrpop_script, + 2, + "queue:test_queue", + "inprogress:test_queue:test_node", + 100 + ], + ["EXEC"] + ] + ], + {:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", [0, 0, ["job"]]]} + ) + + assert handle_call({:dequeue, 500}, :from, state) == {:reply, ["job"], state} + assert validate([Redix]) + end + + test "dequeue and redis failed", %{state: state} do + expect(Redix, :pipeline, 2, {:error, :reason}) + + assert handle_call({:dequeue, 3}, :from, state) == {:reply, :redis_failed, state} + + assert validate(Redix) + end + + test "dequeue and redis failed to evaluate the script", %{state: state} do + expect(Redix, :pipeline, 2, {:error, %Redix.Error{message: "a message"}}) assert handle_call({:dequeue, 3}, :from, state) == {:stop, :redis_failed, :redis_failed, state} From 435f0f65b5133ba45481b0df6a0c18475b9f6b34 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Tue, 22 Jan 2019 14:23:20 +1300 Subject: [PATCH 5/7] Remove Verk.Node.(add|remove)_queue/2 This data will be tracked automatically by QueueManager --- lib/verk/manager.ex | 6 ------ lib/verk/node.ex | 8 -------- test/manager_test.exs | 5 ----- test/node_test.exs | 25 ++++--------------------- 4 files changed, 4 insertions(+), 40 deletions(-) diff --git a/lib/verk/manager.ex b/lib/verk/manager.ex index 9efa2ac..1746f90 100644 --- a/lib/verk/manager.ex +++ b/lib/verk/manager.ex @@ -16,11 +16,9 @@ defmodule Verk.Manager do @doc false def init(queues) do ets = :ets.new(@table, @ets_options) - local_verk_node_id = Application.fetch_env!(:verk, :local_node_id) for {queue, size} <- queues do :ets.insert_new(@table, {queue, size, :running}) - Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis) end {:ok, ets} @@ -75,8 +73,6 @@ defmodule Verk.Manager do Logger.error("Queue #{queue} is already running") end - local_verk_node_id = Application.fetch_env!(:verk, :local_node_id) - Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis) Verk.Manager.Supervisor.start_child(queue, size) end @@ -87,8 +83,6 @@ defmodule Verk.Manager do @spec remove(atom) :: :ok | {:error, :not_found} def remove(queue) do :ets.delete(@table, queue) - local_verk_node_id = Application.fetch_env!(:verk, :local_node_id) - Verk.Node.remove_queue!(local_verk_node_id, queue, Verk.Redis) Verk.Manager.Supervisor.stop_child(queue) end end diff --git a/lib/verk/node.ex b/lib/verk/node.ex index 98f8e38..712018e 100644 --- a/lib/verk/node.ex +++ b/lib/verk/node.ex @@ -85,14 +85,6 @@ defmodule Verk.Node do ["SADD", @verk_nodes_key, verk_node_id] end - def add_queue!(verk_node_id, queue, redis) do - Redix.command!(redis, ["SADD", verk_node_queues_key(verk_node_id), queue]) - end - - def remove_queue!(verk_node_id, queue, redis) do - Redix.command!(redis, ["SREM", verk_node_queues_key(verk_node_id), queue]) - end - defp verk_node_key(verk_node_id), do: "verk:node:#{verk_node_id}" defp verk_node_queues_key(verk_node_id), do: "verk:node:#{verk_node_id}:queues" end diff --git a/test/manager_test.exs b/test/manager_test.exs index 1e63345..3fd68d1 100644 --- a/test/manager_test.exs +++ b/test/manager_test.exs @@ -27,8 +27,6 @@ defmodule Verk.ManagerTest do describe "init/1" do test "creates an ETS table with queues" do queues = [default: 25, low_priority: 10] - expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok) - expect(Verk.Node, :add_queue!, [@node_id, :low_priority, Verk.Redis], :ok) init(queues) assert :ets.tab2list(:verk_manager) == [ @@ -128,7 +126,6 @@ defmodule Verk.ManagerTest do init_table([]) expect(Verk.Manager.Supervisor, :start_child, [:default, 25], {:ok, :child}) - expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok) assert add(:default, 25) == {:ok, :child} assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}] @@ -142,7 +139,6 @@ defmodule Verk.ManagerTest do init_table(queues) expect(Verk.Manager.Supervisor, :stop_child, [:default], :ok) - expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok) assert remove(:default) == :ok assert :ets.tab2list(:verk_manager) == [{:low_priority, 10, :running}] @@ -154,7 +150,6 @@ defmodule Verk.ManagerTest do init_table(queues) expect(Verk.Manager.Supervisor, :stop_child, [:default], {:error, :not_found}) - expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok) assert remove(:default) == {:error, :not_found} assert validate(Verk.Manager.Supervisor) diff --git a/test/node_test.exs b/test/node_test.exs index e47a1f1..74c3526 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -90,10 +90,10 @@ defmodule Verk.NodeTest do describe "queues!/2" do setup %{redis: redis} do - add_queue!(@node, "queue_1", redis) - add_queue!(@node, "queue_2", redis) - add_queue!(@node, "queue_3", redis) - add_queue!(@node, "queue_4", redis) + Redix.command!(redis, add_queue_redis_command(@node, "queue_1")) + Redix.command!(redis, add_queue_redis_command(@node, "queue_2")) + Redix.command!(redis, add_queue_redis_command(@node, "queue_3")) + Redix.command!(redis, add_queue_redis_command(@node, "queue_4")) :ok end @@ -103,21 +103,4 @@ defmodule Verk.NodeTest do assert MapSet.equal?(queues, MapSet.new(["queue_1", "queue_2", "queue_3", "queue_4"])) end end - - describe "add_queue!/3" do - test "add queue to verk:node::queues", %{redis: redis} do - queue_name = "default" - assert add_queue!(@node, queue_name, redis) - assert Redix.command!(redis, ["SMEMBERS", @node_queues_key]) == [queue_name] - end - end - - describe "remove_queue!/3" do - test "remove queue from verk:node::queues", %{redis: redis} do - queue_name = "default" - assert Redix.command!(redis, ["SADD", @node_queues_key, queue_name]) == 1 - assert remove_queue!(@node, queue_name, redis) - assert Redix.command!(redis, ["SMEMBERS", @node_queues_key]) == [] - end - end end From 0bb216447566566e8bd9e91f710a2fedbde8b704 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Tue, 22 Jan 2019 14:58:18 +1300 Subject: [PATCH 6/7] Remove Verk.Node.register/3 Verk.Node.register/3 is not necessary anymore. Verk.Node.expire_in/3 can be used instead as QueueManager adds the running node to the nodes key --- lib/verk/node.ex | 12 ------------ lib/verk/node/manager.ex | 3 ++- test/node/manager_test.exs | 2 +- test/node_test.exs | 34 ++++++++++++---------------------- 4 files changed, 15 insertions(+), 36 deletions(-) diff --git a/lib/verk/node.ex b/lib/verk/node.ex index 712018e..7e980b4 100644 --- a/lib/verk/node.ex +++ b/lib/verk/node.ex @@ -5,18 +5,6 @@ defmodule Verk.Node do @verk_nodes_key "verk_nodes" - @spec register(String.t(), non_neg_integer, GenServer.t()) :: - :ok | {:error, :verk_node_id_already_running} - def register(verk_node_id, ttl, redis) do - case Redix.pipeline!(redis, [ - ["SADD", @verk_nodes_key, verk_node_id], - ["PSETEX", "verk:node:#{verk_node_id}", ttl, "alive"] - ]) do - [1, _] -> :ok - _ -> {:error, :node_id_already_running} - end - end - @spec deregister!(String.t(), GenServer.t()) :: :ok def deregister!(verk_node_id, redis) do Redix.pipeline!(redis, [ diff --git a/lib/verk/node/manager.ex b/lib/verk/node/manager.ex index 881bcc4..319aaab 100644 --- a/lib/verk/node/manager.ex +++ b/lib/verk/node/manager.ex @@ -19,7 +19,8 @@ defmodule Verk.Node.Manager do "Node Manager started for node #{local_verk_node_id}. Heartbeat will run every #{frequency} milliseconds" ) - :ok = Verk.Node.register(local_verk_node_id, 2 * frequency, Verk.Redis) + heartbeat(local_verk_node_id, frequency) + Process.send_after(self(), :heartbeat, frequency) Process.flag(:trap_exit, true) Verk.Scripts.load(Verk.Redis) diff --git a/test/node/manager_test.exs b/test/node/manager_test.exs index 818e12d..1d074dc 100644 --- a/test/node/manager_test.exs +++ b/test/node/manager_test.exs @@ -23,7 +23,7 @@ defmodule Verk.Node.ManagerTest do describe "init/1" do test "registers local verk node id" do - expect(Verk.Node, :register, [@verk_node_id, 2 * @frequency, Verk.Redis], :ok) + expect(Verk.Node, :expire_in, [@verk_node_id, 2 * @frequency, Verk.Redis], {:ok, 1}) expect(Verk.Scripts, :load, 1, :ok) assert init([]) == {:ok, {@verk_node_id, @frequency}} diff --git a/test/node_test.exs b/test/node_test.exs index 74c3526..ef8e950 100644 --- a/test/node_test.exs +++ b/test/node_test.exs @@ -15,25 +15,14 @@ defmodule Verk.NodeTest do {:ok, %{redis: redis}} end - defp register(%{redis: redis}), do: register(@node, 555, redis) - - describe "register/3" do - test "add to verk_nodes", %{redis: redis} do - assert register(@node, 555, redis) == :ok - assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == ["123"] - assert register(@node, 555, redis) == {:error, :node_id_already_running} - end - - test "set verk:node: key", %{redis: redis} do - assert register(@node, 555, redis) == :ok - assert Redix.command!(redis, ["GET", @node_key]) == "alive" - end + defp register(%{redis: redis}) do + register(redis, @node) + :ok + end - test "expire verk:node: key", %{redis: redis} do - assert register(@node, 555, redis) == :ok - ttl = Redix.command!(redis, ["PTTL", @node_key]) - assert_in_delta ttl, 555, 5 - end + defp register(redis, verk_node) do + expire_in(verk_node, 555, redis) + Redix.command!(redis, add_node_redis_command(verk_node)) end describe "deregister/2" do @@ -60,10 +49,11 @@ defmodule Verk.NodeTest do describe "members/3" do setup %{redis: redis} do - register("node_1", 555, redis) - register("node_2", 555, redis) - register("node_3", 555, redis) - register("node_4", 555, redis) + register(redis, "node_1") + register(redis, "node_2") + register(redis, "node_3") + register(redis, "node_4") + :ok end test "list verk nodes", %{redis: redis} do From be7d7693beec411c120ef39a9633ce19c3ad40e1 Mon Sep 17 00:00:00 2001 From: edgurgel Date: Wed, 23 Jan 2019 11:00:45 +1300 Subject: [PATCH 7/7] Add integration test to ensure node ids are tracked --- test/integration/test/integration_test.exs | 32 ++++++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/test/integration/test/integration_test.exs b/test/integration/test/integration_test.exs index ca52308..8abd16f 100644 --- a/test/integration/test/integration_test.exs +++ b/test/integration/test/integration_test.exs @@ -18,14 +18,15 @@ defmodule IntegrationTest do end end - setup_all do + setup do + Application.put_env(:verk, :generate_node_id, false, persistent: true) + Application.delete_env(:verk, :local_node_id, persistent: true) {:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url)) Redix.command!(redis, ["FLUSHDB"]) {:ok, redis: redis} end - @tag integration: true - test "shutdown", %{redis: redis} do + defp enqueue_jobs!(redis) do for _x <- 0..10 do Verk.enqueue( %Verk.Job{queue: "queue_one", class: Integration.SleepWorker, args: [1_500]}, @@ -37,9 +38,34 @@ defmodule IntegrationTest do redis ) end + end + + @tag integration: true + test "shutdown gracefully stops queues", %{redis: redis} do + enqueue_jobs!(redis) Application.ensure_all_started(:integration) {:ok, _consumer} = Consumer.start() + assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == [] + Application.stop(:integration) + + assert_receive %Verk.Events.QueuePausing{queue: :queue_one} + assert_receive %Verk.Events.QueuePausing{queue: :queue_two} + assert_receive %Verk.Events.QueuePaused{queue: :queue_one} + assert_receive %Verk.Events.QueuePaused{queue: :queue_two} + end + + @tag integration: true + test "generate_node_id true maintains verk_nodes", %{redis: redis} do + enqueue_jobs!(redis) + + Application.put_env(:verk, :generate_node_id, true, persistent: true) + Application.ensure_all_started(:integration) + {:ok, _consumer} = Consumer.start() + node_id = Application.fetch_env!(:verk, :local_node_id) + assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == [node_id] + assert Redix.command!(redis, ["TTL", "verk:node:#{node_id}"]) > 0 + Application.stop(:integration) assert_receive %Verk.Events.QueuePausing{queue: :queue_one}