From b793b72f5561fe83df00b64ea3e65c6f0ccf6a7a Mon Sep 17 00:00:00 2001 From: Michael Shapiro Date: Wed, 19 Feb 2020 17:32:31 -0800 Subject: [PATCH 1/4] Reset mnesia queue's run_at when queue restarts, issue #99. --- lib/honeydew/queue/mnesia.ex | 39 ++++++++++++++-- lib/honeydew/queue/mnesia/wrapped_job.ex | 14 +++--- .../queue/mnesia_queue_integration_test.exs | 45 ++++++++++++++++++- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/lib/honeydew/queue/mnesia.ex b/lib/honeydew/queue/mnesia.ex index be890f9..2d879b0 100644 --- a/lib/honeydew/queue/mnesia.ex +++ b/lib/honeydew/queue/mnesia.ex @@ -67,7 +67,7 @@ defmodule Honeydew.Queue.Mnesia do ]) # inspect/1 here becase queue_name can be of the form {:global, poolname} - table = ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom + table = table_name(queue_name) in_progress_table = ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom tables = %{table => [type: :ordered_set], @@ -94,7 +94,7 @@ defmodule Honeydew.Queue.Mnesia do in_progress_table: in_progress_table, access_context: access_context(table_opts)} - :ok = reset_after_crash(state) + :ok = reset_after_shutdown(state) time_warp_mode_warning() poll() @@ -232,7 +232,14 @@ defmodule Honeydew.Queue.Mnesia do {reply, state} end - defp reset_after_crash(%PState{in_progress_table: in_progress_table} = state) do + defp reset_after_shutdown(state) do + reset_all_in_progress_jobs(state) + reset_all_pending_jobs(state) + + :ok + end + + defp reset_all_in_progress_jobs(%PState{in_progress_table: in_progress_table} = state) do in_progress_table |> :mnesia.dirty_first() |> case do @@ -244,8 +251,26 @@ defmodule Honeydew.Queue.Mnesia do |> WrappedJob.id_from_key |> move_to_pending_table(%{}, state) - reset_after_crash(state) + reset_all_in_progress_jobs(state) end + + :ok + end + + defp reset_all_pending_jobs(%PState{access_context: access_context, table: table}) do + :mnesia.activity(access_context, fn -> + :mnesia.foldl(fn wrapped_job_record, _ -> + new_wrapped_job_record = + wrapped_job_record + |> WrappedJob.from_record() + |> WrappedJob.reset_run_at() + |> WrappedJob.to_record() + + :ok = :mnesia.delete_object(table, wrapped_job_record, :write) + :ok = :mnesia.write(table, new_wrapped_job_record, :write) + end, [], table) + end) + :ok end @@ -321,6 +346,12 @@ defmodule Honeydew.Queue.Mnesia do {:noreply, Queue.dispatch(queue_state)} end + def table_name(queue_name) do + ["honeydew", inspect(queue_name)] + |> Enum.join("_") + |> String.to_atom() + end + defp poll do {:ok, _} = :timer.send_after(@poll_interval, :__poll__) end diff --git a/lib/honeydew/queue/mnesia/wrapped_job.ex b/lib/honeydew/queue/mnesia/wrapped_job.ex index 8ec71d6..3d071b7 100644 --- a/lib/honeydew/queue/mnesia/wrapped_job.ex +++ b/lib/honeydew/queue/mnesia/wrapped_job.ex @@ -20,15 +20,13 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do def record_name, do: @record_name def record_fields, do: @record_fields - def new(%Job{delay_secs: delay_secs} = job) do + def new(%Job{} = job) do id = :erlang.unique_integer() - run_at = now() + delay_secs job = %{job | private: id} - %__MODULE__{run_at: run_at, - id: id, - job: job} + %__MODULE__{id: id, job: job} + |> reset_run_at() end def from_record({@record_name, {run_at, id}, job}) do @@ -55,6 +53,12 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do id end + def reset_run_at(%__MODULE__{job: %Job{delay_secs: delay_secs}} = wrapped_job) do + run_at = now() + delay_secs + + %__MODULE__{wrapped_job | run_at: run_at} + end + def id_pattern(id) do %__MODULE__{ id: id, diff --git a/test/honeydew/queue/mnesia_queue_integration_test.exs b/test/honeydew/queue/mnesia_queue_integration_test.exs index c2dbf58..1e74dca 100644 --- a/test/honeydew/queue/mnesia_queue_integration_test.exs +++ b/test/honeydew/queue/mnesia_queue_integration_test.exs @@ -1,6 +1,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do use ExUnit.Case, async: false # shares doctest queue name with ErlangQueue test alias Honeydew.Job + alias Honeydew.Queue.Mnesia.WrappedJob @moduletag :capture_log @@ -222,7 +223,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do assert Enum.count(monitors) < 20 end - test "resets in-progress jobs after crashing", %{queue: queue} do + test "resets in-progress jobs after restart", %{queue: queue} do Enum.each(1..10, fn _ -> Honeydew.async(fn -> Process.sleep(20_000) end, queue) end) @@ -243,6 +244,34 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do assert in_progress == 0 end + test "resets job run_at after restart", %{queue: queue} do + :ok = Honeydew.stop_workers(queue) + + num_jobs = 10 + delay_secs = 15 + + Enum.each(1..num_jobs, fn _ -> + Honeydew.async(fn -> Process.sleep(20_000) end, queue, delay_secs: delay_secs) + end) + + %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) + + :ok = Honeydew.stop_queue(queue) + jobs = wrapped_jobs(queue) + + Process.sleep(2_000) # let the monotonic clock tick + :ok = start_queue(queue) + %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) + + reset_jobs = wrapped_jobs(queue) + + Enum.zip(jobs, reset_jobs) + |> Enum.each(fn {%WrappedJob{run_at: old_run_at}, %WrappedJob{run_at: new_run_at}} -> + refute_in_delta old_run_at, new_run_at, 1 + assert_in_delta old_run_at, new_run_at, 3 + end) + end + @tag :skip_worker_pool test "when workers join a queue with existing jobs", %{queue: queue} do %Job{} = {:send_msg, [self(), :hi]} |> Honeydew.async(queue) @@ -417,4 +446,18 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do :ok end + + defp wrapped_jobs(queue_name) do + alias Honeydew.Queue.Mnesia, as: MnesiaQueue + + table = MnesiaQueue.table_name(queue_name) + + :mnesia.activity(:async_dirty, fn -> + :mnesia.foldl(fn wrapped_job_record, list -> + [wrapped_job_record | list] + end, [], table) + end) + |> Enum.map(&WrappedJob.from_record/1) + |> Enum.reverse() + end end From 1efec65103bd19a8098314445b681d383e9bac24 Mon Sep 17 00:00:00 2001 From: Michael Shapiro Date: Thu, 20 Feb 2020 11:38:11 -0800 Subject: [PATCH 2/4] reset delay_secs upon recovery --- lib/honeydew/queue/mnesia.ex | 2 +- lib/honeydew/queue/mnesia/wrapped_job.ex | 12 +++++------- .../queue/mnesia_queue_integration_test.exs | 16 ++++++---------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/lib/honeydew/queue/mnesia.ex b/lib/honeydew/queue/mnesia.ex index 2d879b0..2a8060d 100644 --- a/lib/honeydew/queue/mnesia.ex +++ b/lib/honeydew/queue/mnesia.ex @@ -263,7 +263,7 @@ defmodule Honeydew.Queue.Mnesia do new_wrapped_job_record = wrapped_job_record |> WrappedJob.from_record() - |> WrappedJob.reset_run_at() + |> WrappedJob.set_run_at_to_now() |> WrappedJob.to_record() :ok = :mnesia.delete_object(table, wrapped_job_record, :write) diff --git a/lib/honeydew/queue/mnesia/wrapped_job.ex b/lib/honeydew/queue/mnesia/wrapped_job.ex index 3d071b7..afe8bc5 100644 --- a/lib/honeydew/queue/mnesia/wrapped_job.ex +++ b/lib/honeydew/queue/mnesia/wrapped_job.ex @@ -20,13 +20,13 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do def record_name, do: @record_name def record_fields, do: @record_fields - def new(%Job{} = job) do + def new(%Job{delay_secs: delay_secs} = job) do id = :erlang.unique_integer() + run_at = now() + delay_secs job = %{job | private: id} - %__MODULE__{id: id, job: job} - |> reset_run_at() + %__MODULE__{id: id, job: job, run_at: run_at} end def from_record({@record_name, {run_at, id}, job}) do @@ -53,10 +53,8 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do id end - def reset_run_at(%__MODULE__{job: %Job{delay_secs: delay_secs}} = wrapped_job) do - run_at = now() + delay_secs - - %__MODULE__{wrapped_job | run_at: run_at} + def set_run_at_to_now(%__MODULE__{} = wrapped_job) do + %__MODULE__{wrapped_job | run_at: now()} end def id_pattern(id) do diff --git a/test/honeydew/queue/mnesia_queue_integration_test.exs b/test/honeydew/queue/mnesia_queue_integration_test.exs index 1e74dca..4d040dd 100644 --- a/test/honeydew/queue/mnesia_queue_integration_test.exs +++ b/test/honeydew/queue/mnesia_queue_integration_test.exs @@ -254,21 +254,17 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do Honeydew.async(fn -> Process.sleep(20_000) end, queue, delay_secs: delay_secs) end) - %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) - :ok = Honeydew.stop_queue(queue) - jobs = wrapped_jobs(queue) Process.sleep(2_000) # let the monotonic clock tick - :ok = start_queue(queue) - %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) - reset_jobs = wrapped_jobs(queue) + now = :erlang.monotonic_time(:second) + :ok = start_queue(queue) - Enum.zip(jobs, reset_jobs) - |> Enum.each(fn {%WrappedJob{run_at: old_run_at}, %WrappedJob{run_at: new_run_at}} -> - refute_in_delta old_run_at, new_run_at, 1 - assert_in_delta old_run_at, new_run_at, 3 + queue + |> wrapped_jobs() + |> Enum.each(fn %WrappedJob{run_at: run_at} -> + assert_in_delta run_at, now, 1 end) end From 3363f003ced56833b0f1a921bc322972d350caee Mon Sep 17 00:00:00 2001 From: Martin Chabot Date: Tue, 28 Feb 2023 15:31:05 -0500 Subject: [PATCH 3/4] Recalculate run_at on boot from the job last run --- lib/honeydew/queue/mnesia.ex | 2 +- lib/honeydew/queue/mnesia/wrapped_job.ex | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/honeydew/queue/mnesia.ex b/lib/honeydew/queue/mnesia.ex index 2a8060d..6564713 100644 --- a/lib/honeydew/queue/mnesia.ex +++ b/lib/honeydew/queue/mnesia.ex @@ -263,7 +263,7 @@ defmodule Honeydew.Queue.Mnesia do new_wrapped_job_record = wrapped_job_record |> WrappedJob.from_record() - |> WrappedJob.set_run_at_to_now() + |> WrappedJob.recalc_run_at() |> WrappedJob.to_record() :ok = :mnesia.delete_object(table, wrapped_job_record, :write) diff --git a/lib/honeydew/queue/mnesia/wrapped_job.ex b/lib/honeydew/queue/mnesia/wrapped_job.ex index afe8bc5..6d9351e 100644 --- a/lib/honeydew/queue/mnesia/wrapped_job.ex +++ b/lib/honeydew/queue/mnesia/wrapped_job.ex @@ -2,7 +2,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do alias Honeydew.Job @record_name :wrapped_job - @record_fields [:key, :job] + @record_fields [:key, :last_run, :job] job_filter_map = %Job{} @@ -14,6 +14,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do @job_filter struct(Job, job_filter_map) defstruct [:run_at, + :last_run, :id, :job] @@ -23,25 +24,28 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do def new(%Job{delay_secs: delay_secs} = job) do id = :erlang.unique_integer() run_at = now() + delay_secs + last_run = System.system_time(:millisecond) job = %{job | private: id} - %__MODULE__{id: id, job: job, run_at: run_at} + %__MODULE__{id: id, job: job, run_at: run_at, last_run: last_run} end - def from_record({@record_name, {run_at, id}, job}) do + def from_record({@record_name, {run_at, id}, last_run, job}) do %__MODULE__{run_at: run_at, + last_run: last_run, id: id, job: job} end def to_record(%__MODULE__{run_at: run_at, + last_run: last_run, id: id, job: job}) do - {@record_name, key(run_at, id), job} + {@record_name, key(run_at, id), last_run, job} end - def key({@record_name, key, _job}) do + def key({@record_name, key, _last_run, _job}) do key end @@ -57,10 +61,16 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do %__MODULE__{wrapped_job | run_at: now()} end + def recalc_run_at(%__MODULE__{last_run: last_run, job: job} = wrapped_job) do + delta_t = now - System.system_time(:second) + %__MODULE__{wrapped_job | run_at: round(last_run / 1000.0) + delta_t + job.delay_secs} + end + def id_pattern(id) do %__MODULE__{ id: id, run_at: :_, + last_run: :_, job: :_ } |> to_record @@ -72,6 +82,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do %__MODULE__{ id: :_, run_at: :_, + last_run: :_, job: job } |> to_record @@ -82,6 +93,7 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do %__MODULE__{ id: :_, run_at: :"$1", + last_run: :"_", job: :_ } |> to_record From bea8e336a087157f6c3d779305883d86147387d8 Mon Sep 17 00:00:00 2001 From: Martin Chabot Date: Wed, 1 Mar 2023 16:17:49 -0500 Subject: [PATCH 4/4] merging master to reset-mnesia-run-at-from-last-run --- .gitignore | 1 + .tool-versions | 4 +- README.md | 2 +- config/test.exs | 4 +- {assets => diagrams}/diagrams.key | Bin examples/ecto_poll_queue/config/config.exs | 7 +- examples/ecto_poll_queue/config/test.exs | 8 +- examples/ecto_poll_queue/mix.postgres.lock | 16 +-- .../test/ecto_poll_queue_example_test.exs | 7 +- examples/global/global.exs | 2 +- lib/honeydew.ex | 132 ++++-------------- lib/honeydew/application.ex | 2 + lib/honeydew/failure_mode/abandon.ex | 3 +- lib/honeydew/failure_mode/move.ex | 3 +- lib/honeydew/failure_mode/retry.ex | 3 +- lib/honeydew/job_runner.ex | 4 +- .../process_group_scope_supervisor.ex | 25 ++++ lib/honeydew/processes.ex | 96 +++++++++++++ lib/honeydew/queue.ex | 13 +- lib/honeydew/queues.ex | 5 +- lib/honeydew/sources/ecto/erlang_term.ex | 6 +- lib/honeydew/sources/ecto/sql.ex | 4 +- lib/honeydew/sources/ecto/sql/postgres.ex | 2 +- lib/honeydew/worker.ex | 9 +- lib/honeydew/worker_group_supervisor.ex | 6 +- lib/honeydew/worker_starter.ex | 10 +- lib/honeydew/workers.ex | 4 +- mix.exs | 11 +- mix.lock | 20 +-- .../queue/erlang_queue_integration_test.exs | 3 +- .../queue/mnesia_queue_integration_test.exs | 3 +- test/honeydew/worker_test.exs | 4 +- test/honeydew_test.exs | 25 +--- test/support/cluster_setups.ex | 5 +- 34 files changed, 249 insertions(+), 200 deletions(-) rename {assets => diagrams}/diagrams.key (100%) create mode 100644 lib/honeydew/process_group_scope_supervisor.ex create mode 100644 lib/honeydew/processes.ex diff --git a/.gitignore b/.gitignore index 0a67105..95d2b85 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ erl_crash.dump *.ez /Mnesia.* .DS_Store +.elixir_ls \ No newline at end of file diff --git a/.tool-versions b/.tool-versions index b707056..e4379ae 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -erlang 21.3.2 -elixir 1.8.1 +erlang 24.0 +elixir 1.12.0-otp-24 diff --git a/README.md b/README.md index 0034658..3e06cc5 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ In your mix.exs file: ```elixir defp deps do - [{:honeydew, "~> 1.4.5"}] + [{:honeydew, "~> 1.5.0"}] end ``` diff --git a/config/test.exs b/config/test.exs index 424e7f7..8df480e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,7 +1,9 @@ use Mix.Config config :logger, - compile_time_purge_level: :warn + compile_time_purge_matching: [ + [level_lower_than: :warn] + ] config :logger, :console, level: :warn diff --git a/assets/diagrams.key b/diagrams/diagrams.key similarity index 100% rename from assets/diagrams.key rename to diagrams/diagrams.key diff --git a/examples/ecto_poll_queue/config/config.exs b/examples/ecto_poll_queue/config/config.exs index ad62229..8ec815f 100644 --- a/examples/ecto_poll_queue/config/config.exs +++ b/examples/ecto_poll_queue/config/config.exs @@ -3,7 +3,10 @@ use Mix.Config config :ecto_poll_queue_example, ecto_repos: [EctoPollQueueExample.Repo] config :ecto_poll_queue_example, interval: 0.5 -config :logger, compile_time_purge_level: :warn -config :logger, :console, level: :warn +config :logger, + compile_time_purge_matching: [ + [level_lower_than: :warn] + ], + console: [level: :warn] import_config "#{Mix.env()}.exs" diff --git a/examples/ecto_poll_queue/config/test.exs b/examples/ecto_poll_queue/config/test.exs index 517aa59..c6ecde6 100644 --- a/examples/ecto_poll_queue/config/test.exs +++ b/examples/ecto_poll_queue/config/test.exs @@ -1,7 +1,9 @@ use Mix.Config -config :logger, compile_time_purge_level: :warn - -config :logger, :console, level: :warn +config :logger, + compile_time_purge_matching: [ + [level_lower_than: :warn] + ], + console: [level: : warn] config :ecto_poll_queue_example, interval: 0.5 diff --git a/examples/ecto_poll_queue/mix.postgres.lock b/examples/ecto_poll_queue/mix.postgres.lock index 05d1fc2..3a9e3f1 100644 --- a/examples/ecto_poll_queue/mix.postgres.lock +++ b/examples/ecto_poll_queue/mix.postgres.lock @@ -1,10 +1,10 @@ %{ - "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"}, - "db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, - "decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"}, - "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"}, - "ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"}, - "ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, - "postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, - "telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm"}, + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"}, + "db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "ced0780bed50430f770b74fcde870c4a50c815124ecf9fee20d67a465966eb4f"}, + "decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm", "bbd124e240e3ff40f407d50fced3736049e72a73d547f69201484d3a624ab569"}, + "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"}, + "ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "8acd54c5c92c7dbe5a9e76adc22ffb4e2e76e5298989eed2068a4f04bc8e6fef"}, + "ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5bd47a499d27084afaa3c2154cfedb478ea2fcc926ef59fa515ee089701e390"}, + "postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "a20f189bdd5a219c484818fde18e09ace20cd15fe630a828fde70bd6efdeb23b"}, + "telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm", "63d9f37d319ff331a51f6221310deb5aac8ea3dcf5e0369d689121b5e52f72d4"}, } diff --git a/examples/ecto_poll_queue/test/ecto_poll_queue_example_test.exs b/examples/ecto_poll_queue/test/ecto_poll_queue_example_test.exs index f9e1e80..665ed7c 100644 --- a/examples/ecto_poll_queue/test/ecto_poll_queue_example_test.exs +++ b/examples/ecto_poll_queue/test/ecto_poll_queue_example_test.exs @@ -8,6 +8,7 @@ defmodule EctoPollQueueExampleTest do alias Honeydew.EctoSource.State alias Honeydew.PollQueue.State, as: PollQueueState alias Honeydew.Queue.State, as: QueueState + alias Honeydew.Processes @moduletag :capture_log @@ -109,7 +110,7 @@ defmodule EctoPollQueueExampleTest do assert %{queue: %{stale: 1, ready: 0}} = Honeydew.status(User.notify_queue()) User.notify_queue() - |> Honeydew.get_queue + |> Processes.get_queue() |> send(:__reset_stale__) assert %{queue: %{stale: 0, ready: 1}} = Honeydew.status(User.notify_queue()) @@ -193,7 +194,7 @@ defmodule EctoPollQueueExampleTest do defp get_source_state(queue) do %QueueState{private: %PollQueueState{source: {EctoSource, state}}} = queue - |> Honeydew.get_queue() + |> Processes.get_queue() |> :sys.get_state state @@ -201,7 +202,7 @@ defmodule EctoPollQueueExampleTest do defp update_source_state(queue, state_fn) do queue - |> Honeydew.get_queue() + |> Processes.get_queue() |> :sys.replace_state(fn %QueueState{private: %PollQueueState{source: {EctoSource, state}} = poll_queue_state} = queue_state -> %QueueState{queue_state | private: %PollQueueState{poll_queue_state | diff --git a/examples/global/global.exs b/examples/global/global.exs index 88c7aa4..046918b 100644 --- a/examples/global/global.exs +++ b/examples/global/global.exs @@ -12,7 +12,7 @@ end defmodule QueueApp do def start do nodes = [node()] - :ok = Honeydew.start_queue({:global, :my_queue}, queue: {Honeydew.Queue.Mnesia, [nodes, [disc_copies: nodes], []]}) + :ok = Honeydew.start_queue({:global, :my_queue}, queue: {Honeydew.Queue.Mnesia, [disc_copies: nodes]}) end end diff --git a/lib/honeydew.ex b/lib/honeydew.ex index bdafdbe..8d99344 100644 --- a/lib/honeydew.ex +++ b/lib/honeydew.ex @@ -5,11 +5,12 @@ defmodule Honeydew do alias Honeydew.Job alias Honeydew.JobMonitor - alias Honeydew.Worker - alias Honeydew.WorkerStarter - alias Honeydew.WorkerGroupSupervisor + alias Honeydew.Processes alias Honeydew.Queue - alias Honeydew.{Queues, Workers} + alias Honeydew.Queues + alias Honeydew.Worker + alias Honeydew.Workers + require Logger @type mod_or_mod_args :: module | {module, args :: term} @@ -125,8 +126,10 @@ defmodule Honeydew do """ @spec suspend(queue_name) :: :ok def suspend(queue) do + Processes.start_process_group_scope(queue) + queue - |> get_all_members(Queues) + |> Processes.get_queues() |> Enum.each(&Queue.suspend/1) end @@ -135,13 +138,13 @@ defmodule Honeydew do """ @spec resume(queue_name) :: :ok def resume(queue) do + Processes.start_process_group_scope(queue) + queue - |> get_all_members(Queues) + |> Processes.get_queues() |> Enum.each(&Queue.resume/1) end - - @doc """ Returns the currrent status of the queue and all attached workers. @@ -153,9 +156,11 @@ defmodule Honeydew do @type status_opt :: {:timeout, pos_integer} @spec status(queue_name, [status_opt]) :: map() def status(queue, opts \\ []) do + Processes.start_process_group_scope(queue) + queue_status = queue - |> get_queue + |> Processes.get_queue() |> Queue.status(opts) busy_workers = @@ -174,7 +179,7 @@ defmodule Honeydew do workers = queue - |> get_all_members(Workers) + |> Processes.get_workers() |> Enum.map(&{&1, nil}) |> Enum.into(%{}) |> Map.merge(busy_workers) @@ -213,7 +218,7 @@ defmodule Honeydew do def filter(queue, filter) do {:ok, jobs} = queue - |> get_queue + |> Processes.get_queue() |> Queue.filter(filter) jobs @@ -231,7 +236,7 @@ defmodule Honeydew do @spec cancel(Job.t) :: :ok | {:error, :in_progress} | {:error, :not_found} def cancel(%Job{queue: queue} = job) do queue - |> get_queue + |> Processes.get_queue() |> Queue.cancel(job) end @@ -278,11 +283,7 @@ defmodule Honeydew do @doc false def enqueue(%Job{queue: queue} = job) do queue - |> get_queue - |> case do - nil -> raise RuntimeError, no_queues_running_error(job) - queue -> queue - end + |> Processes.get_queue() |> Queue.enqueue(job) end @@ -302,13 +303,13 @@ defmodule Honeydew do end @doc false - def no_queues_running_error(%Job{queue: {:global, _} = queue} = job) do - "can't enqueue job because there aren't any queue processes running for the distributed queue `#{inspect queue}, are you connected to the cluster? #{inspect job} `" + def no_queues_running_error({:global, _} = queue) do + "can't enqueue job because there aren't any queue processes running for the distributed queue `#{inspect queue}`, are you connected to the cluster?" end @doc false - def no_queues_running_error(%Job{queue: queue} = job) do - "can't enqueue job #{inspect job} because there aren't any queue processes running for `#{inspect queue}`" + def no_queues_running_error(queue) do + "can't enqueue job because there aren't any queue processes running for `#{inspect queue}`" end @deprecated "Honeydew now supervises your queue processes, please use `Honeydew.start_queue/2 instead.`" @@ -344,7 +345,7 @@ defmodule Honeydew do You can provide any of the following `opts`: - `queue`: is the module that queue will use. Defaults to - `Honeydew.Queue.ErlangQueue`. You may also provide args to the queue's + `Honeydew.Queue.Mnesia`. You may also provide args to the queue's `c:Honeydew.Queue.init/2` callback using the following format: `{module, args}`. - `dispatcher`: the job dispatching strategy, `{module, init_args}`. @@ -419,7 +420,7 @@ defmodule Honeydew do - `Honeydew.start_workers({:global, "my_awesome_queue"}, MyJobModule, nodes: [:clientfacing@dax, :queue@dax])` """ - defdelegate start_workers(name, module_and_args, opts \\ []), to: Honeydew.Workers + defdelegate start_workers(name, module_and_args, opts \\ []), to: Workers @deprecated "Honeydew now supervises your worker processes, please use `Honeydew.start_workers/3 instead.`" def worker_spec(_queue, _module_and_args, _opts) do @@ -434,7 +435,7 @@ defmodule Honeydew do @doc """ Re-initializes the given worker, this is intended to be used from - within a worker's `c:Honeydew.Worker.failed_init/0` callback. Using it otherwise + within a worker's `c:Honeydew.Worker.init_failed/0` callback. Using it otherwise may cause undefined behavior, at present, don't do it. """ @spec reinitialize_worker() :: :ok @@ -442,80 +443,6 @@ defmodule Honeydew do Worker.module_init(self()) end - @groups [Workers, Queues] - - Enum.each(@groups, fn group -> - @doc false - def group(queue, unquote(group)) do - name(queue, unquote(group)) - end - end) - - @processes [WorkerGroupSupervisor, WorkerStarter] - - Enum.each(@processes, fn process -> - @doc false - def process(queue, unquote(process)) do - name(queue, unquote(process)) - end - end) - - - @doc false - def create_groups(queue) do - Enum.each(@groups, fn name -> - queue |> group(name) |> :pg2.create - end) - end - - @doc false - def delete_groups(queue) do - Enum.each(@groups, fn name -> - queue |> group(name) |> :pg2.delete - end) - end - - @doc false - def get_all_members({:global, _} = queue, name) do - queue |> group(name) |> :pg2.get_members - end - - @doc false - def get_all_members(queue, name) do - get_all_local_members(queue, name) - end - - # we need to know local members to shut down local components - @doc false - def get_all_local_members(queue, name) do - queue |> group(name) |> :pg2.get_local_members - end - - - @doc false - def get_queue(queue) do - queue - |> get_all_queues - |> case do - {:error, {:no_such_group, _queue}} -> [] - queues -> queues - end - |> List.first - end - - @doc false - def get_all_queues({:global, _name} = queue) do - queue - |> group(Queues) - |> :pg2.get_members - end - - @doc false - def get_all_queues(queue) do - queue - |> group(Queues) - |> :pg2.get_local_members - end @doc false def table_name({:global, queue}) do @@ -527,14 +454,6 @@ defmodule Honeydew do to_string(queue) end - defp name({:global, queue}, component) do - name([:global, queue], component) - end - - defp name(queue, component) do - [component, queue] |> List.flatten |> Enum.join(".") |> String.to_atom - end - @doc false defmacro debug(ast) do quote do @@ -543,5 +462,4 @@ defmodule Honeydew do end end end - end diff --git a/lib/honeydew/application.ex b/lib/honeydew/application.ex index 43d892e..ade3e64 100644 --- a/lib/honeydew/application.ex +++ b/lib/honeydew/application.ex @@ -3,6 +3,7 @@ defmodule Honeydew.Application do alias Honeydew.Queues alias Honeydew.Workers + alias Honeydew.ProcessGroupScopeSupervisor use Application @@ -10,6 +11,7 @@ defmodule Honeydew.Application do children = [ {Queues, []}, {Workers, []}, + {ProcessGroupScopeSupervisor, []} ] opts = [strategy: :one_for_one, name: Honeydew.Supervisor] diff --git a/lib/honeydew/failure_mode/abandon.ex b/lib/honeydew/failure_mode/abandon.ex index a8f94d5..092b3c2 100644 --- a/lib/honeydew/failure_mode/abandon.ex +++ b/lib/honeydew/failure_mode/abandon.ex @@ -12,6 +12,7 @@ defmodule Honeydew.FailureMode.Abandon do require Logger alias Honeydew.Job alias Honeydew.Queue + alias Honeydew.Processes @behaviour Honeydew.FailureMode @@ -25,7 +26,7 @@ defmodule Honeydew.FailureMode.Abandon do # tell the queue that that job can be removed. queue - |> Honeydew.get_queue + |> Processes.get_queue() |> Queue.ack(job) # send the error to the awaiting process, if necessary diff --git a/lib/honeydew/failure_mode/move.ex b/lib/honeydew/failure_mode/move.ex index d9a775d..ae0aa8e 100644 --- a/lib/honeydew/failure_mode/move.ex +++ b/lib/honeydew/failure_mode/move.ex @@ -14,6 +14,7 @@ defmodule Honeydew.FailureMode.Move do alias Honeydew.Job alias Honeydew.Queue + alias Honeydew.Processes require Logger @@ -30,7 +31,7 @@ defmodule Honeydew.FailureMode.Move do # tell the queue that that job can be removed. queue - |> Honeydew.get_queue + |> Processes.get_queue() |> Queue.ack(job) {:ok, job} = diff --git a/lib/honeydew/failure_mode/retry.ex b/lib/honeydew/failure_mode/retry.ex index b268555..50ea0d6 100644 --- a/lib/honeydew/failure_mode/retry.ex +++ b/lib/honeydew/failure_mode/retry.ex @@ -1,6 +1,7 @@ defmodule Honeydew.FailureMode.Retry do alias Honeydew.Job alias Honeydew.Queue + alias Honeydew.Processes alias Honeydew.FailureMode.Abandon alias Honeydew.FailureMode.Move @@ -76,7 +77,7 @@ defmodule Honeydew.FailureMode.Retry do job = %Job{job | failure_private: private, delay_secs: delay_secs, result: {:retrying, reason}} queue - |> Honeydew.get_queue + |> Processes.get_queue() |> Queue.nack(job) # send the error to the awaiting process, if necessary diff --git a/lib/honeydew/job_runner.ex b/lib/honeydew/job_runner.ex index 3a8f9f9..5f6e48a 100644 --- a/lib/honeydew/job_runner.ex +++ b/lib/honeydew/job_runner.ex @@ -59,13 +59,13 @@ defmodule Honeydew.JobRunner do end {:ok, result} rescue e -> - {:error, Crash.new(:exception, e, System.stacktrace())} + {:error, Crash.new(:exception, e, __STACKTRACE__)} catch :exit, reason -> # catch exit signals and shut down in an orderly manner {:error, Crash.new(:exit, reason)} e -> - {:error, Crash.new(:throw, e, System.stacktrace())} + {:error, Crash.new(:throw, e, __STACKTRACE__)} end :ok = Worker.job_finished(worker, %{job | result: result}) diff --git a/lib/honeydew/process_group_scope_supervisor.ex b/lib/honeydew/process_group_scope_supervisor.ex new file mode 100644 index 0000000..152e2b1 --- /dev/null +++ b/lib/honeydew/process_group_scope_supervisor.ex @@ -0,0 +1,25 @@ +# +# Dynamic supervision of :pg scopes (one per queue). +# +defmodule Honeydew.ProcessGroupScopeSupervisor do + @moduledoc false + + use DynamicSupervisor + + def start_link([]) do + DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) + end + + def init(extra_args) do + DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: extra_args) + end + + def start_scope(name) do + child_spec = %{ + id: name, + start: {:pg, :start_link, [name]} + } + + DynamicSupervisor.start_child(__MODULE__, child_spec) + end +end diff --git a/lib/honeydew/processes.ex b/lib/honeydew/processes.ex new file mode 100644 index 0000000..15eac90 --- /dev/null +++ b/lib/honeydew/processes.ex @@ -0,0 +1,96 @@ +defmodule Honeydew.Processes do + @moduledoc false + + alias Honeydew.ProcessGroupScopeSupervisor + alias Honeydew.Queues + alias Honeydew.WorkerGroupSupervisor + alias Honeydew.WorkerStarter + alias Honeydew.Workers + + @processes [WorkerGroupSupervisor, WorkerStarter] + + for process <- @processes do + def process(queue, unquote(process)) do + name(queue, unquote(process)) + end + end + + def start_process_group_scope(queue) do + queue + |> scope() + |> ProcessGroupScopeSupervisor.start_scope() + |> case do + {:ok, _pid} -> + :ok + + {:error, {:already_started, _pid}} -> + :ok + end + end + + def join_group(component, queue, pid) do + queue + |> scope() + |> :pg.join(component, pid) + end + + # + # this function may be in a hot path, so we don't call Processes.start_process_group/1 unless necessary + # + def get_queue(queue) do + case get_queues(queue) do + [queue_process | _rest] -> + queue_process + + [] -> + start_process_group_scope(queue) + + case get_queues(queue) do + [queue_process | _rest] -> + queue_process + + [] -> + raise RuntimeError, Honeydew.no_queues_running_error(queue) + end + end + end + + def get_queues(queue) do + get_members(queue, Queues) + end + + def get_workers(queue) do + get_members(queue, Workers) + end + + def get_local_members(queue, group) do + queue + |> scope() + |> :pg.get_local_members(group) + end + + defp get_members({:global, _} = queue, group) do + queue + |> scope() + |> :pg.get_members(group) + end + + defp get_members(queue, name) do + get_local_members(queue, name) + end + + defp scope(queue) do + name(queue, "scope") + end + + defp name({:global, queue}, component) do + name([:global, queue], component) + end + + defp name(queue, component) do + [component, queue] + |> List.flatten() + |> Enum.join(".") + |> String.to_atom() + end +end diff --git a/lib/honeydew/queue.ex b/lib/honeydew/queue.ex index a09ef7e..61167a7 100644 --- a/lib/honeydew/queue.ex +++ b/lib/honeydew/queue.ex @@ -1,6 +1,4 @@ defmodule Honeydew.Queue do - @moduledoc false - use GenServer, restart: :transient require Logger require Honeydew @@ -9,6 +7,7 @@ defmodule Honeydew.Queue do alias Honeydew.Worker alias Honeydew.WorkerStarter alias Honeydew.Queues + alias Honeydew.Processes defmodule State do @moduledoc false @@ -79,15 +78,7 @@ defmodule Honeydew.Queue do def init([queue, module, args, {dispatcher, dispatcher_args}, failure_mode, success_mode, suspended]) do Process.flag(:trap_exit, true) - :ok = - queue - |> Honeydew.group(Queues) - |> :pg2.create - - :ok = - queue - |> Honeydew.group(Queues) - |> :pg2.join(self()) + :ok = Processes.join_group(Queues, queue, self()) with {:global, _name} <- queue, do: :ok = :net_kernel.monitor_nodes(true) diff --git a/lib/honeydew/queues.ex b/lib/honeydew/queues.ex index 1e65468..93fa7d2 100644 --- a/lib/honeydew/queues.ex +++ b/lib/honeydew/queues.ex @@ -7,6 +7,7 @@ defmodule Honeydew.Queues do alias Honeydew.Dispatcher.LRUNode alias Honeydew.Dispatcher.LRU alias Honeydew.FailureMode.Abandon + alias Honeydew.Processes @type name :: Honeydew.queue_name() @type queue_spec_opt :: Honeydew.queue_spec_opt() @@ -65,8 +66,6 @@ defmodule Honeydew.Queues do suspended = Keyword.get(opts, :suspended, false) - Honeydew.create_groups(name) - module.validate_args!(args) opts = [name, module, args, dispatcher, failure_mode, success_mode, suspended] @@ -81,6 +80,8 @@ defmodule Honeydew.Queues do opts end + Processes.start_process_group_scope(name) + with {:ok, _} <- Supervisor.start_child(__MODULE__, Queue.child_spec(name, opts)) do :ok end diff --git a/lib/honeydew/sources/ecto/erlang_term.ex b/lib/honeydew/sources/ecto/erlang_term.ex index 49ca046..261573b 100644 --- a/lib/honeydew/sources/ecto/erlang_term.ex +++ b/lib/honeydew/sources/ecto/erlang_term.ex @@ -2,7 +2,11 @@ if Code.ensure_loaded?(Ecto) do defmodule Honeydew.EctoSource.ErlangTerm do @moduledoc false - @behaviour Ecto.Type + if macro_exported?(Ecto.Type, :__using__, 1) do + use Ecto.Type + else + @behaviour Ecto.Type + end @impl true def type, do: :binary diff --git a/lib/honeydew/sources/ecto/sql.ex b/lib/honeydew/sources/ecto/sql.ex index 439491c..2a394fe 100644 --- a/lib/honeydew/sources/ecto/sql.ex +++ b/lib/honeydew/sources/ecto/sql.ex @@ -46,8 +46,8 @@ defmodule Honeydew.EctoSource.SQL do defmacro ready_fragment(module) do quote do - unquote(module).ready - |> fragment + unquote(module).ready() + |> fragment() end end diff --git a/lib/honeydew/sources/ecto/sql/postgres.ex b/lib/honeydew/sources/ecto/sql/postgres.ex index c560f4d..b31b900 100644 --- a/lib/honeydew/sources/ecto/sql/postgres.ex +++ b/lib/honeydew/sources/ecto/sql/postgres.ex @@ -35,7 +35,7 @@ if Code.ensure_loaded?(Ecto) do @impl true def delay_ready(state) do "UPDATE #{state.table} - SET #{state.lock_field} = (#{ready()} + $1 * 1000), + SET #{state.lock_field} = (#{ready()} + CAST($1 AS BIGINT) * 1000), #{state.private_field} = $2 WHERE #{SQL.where_keys_fragment(state, 3)}" diff --git a/lib/honeydew/worker.ex b/lib/honeydew/worker.ex index 3943d32..817123e 100644 --- a/lib/honeydew/worker.ex +++ b/lib/honeydew/worker.ex @@ -10,6 +10,7 @@ defmodule Honeydew.Worker do alias Honeydew.Job alias Honeydew.JobMonitor alias Honeydew.Logger, as: HoneydewLogger + alias Honeydew.Processes alias Honeydew.Queue alias Honeydew.JobRunner alias Honeydew.Workers @@ -65,9 +66,7 @@ defmodule Honeydew.Worker do def init([_supervisor, queue, %{ma: {module, init_args}, init_retry_secs: init_retry_secs}, queue_pid] = start_opts) do Process.flag(:trap_exit, true) - queue - |> Honeydew.group(Workers) - |> :pg2.join(self()) + :ok = Processes.join_group(Workers, queue, self()) has_init_fcn = :functions @@ -111,14 +110,14 @@ defmodule Honeydew.Worker do %{state | ready: false} end rescue e -> - HoneydewLogger.worker_init_crashed(module, Crash.new(:exception, e, System.stacktrace())) + HoneydewLogger.worker_init_crashed(module, Crash.new(:exception, e, __STACKTRACE__)) %{state | ready: false} catch :exit, reason -> HoneydewLogger.worker_init_crashed(module, Crash.new(:exit, reason)) %{state | ready: false} e -> - HoneydewLogger.worker_init_crashed(module, Crash.new(:throw, e, System.stacktrace())) + HoneydewLogger.worker_init_crashed(module, Crash.new(:throw, e, __STACKTRACE__)) %{state | ready: false} end |> send_ready_or_callback diff --git a/lib/honeydew/worker_group_supervisor.ex b/lib/honeydew/worker_group_supervisor.ex index 1f8b871..36f50bb 100644 --- a/lib/honeydew/worker_group_supervisor.ex +++ b/lib/honeydew/worker_group_supervisor.ex @@ -2,10 +2,12 @@ defmodule Honeydew.WorkerGroupSupervisor do @moduledoc false use DynamicSupervisor + alias Honeydew.WorkersPerQueueSupervisor + alias Honeydew.Processes def start_link([queue, opts]) do - DynamicSupervisor.start_link(__MODULE__, [queue, opts], name: Honeydew.process(queue, __MODULE__)) + DynamicSupervisor.start_link(__MODULE__, [queue, opts], name: Processes.process(queue, __MODULE__)) end def init(extra_args) do @@ -14,7 +16,7 @@ defmodule Honeydew.WorkerGroupSupervisor do def start_worker_group(queue, queue_pid) do queue - |> Honeydew.process(__MODULE__) + |> Processes.process(__MODULE__) |> DynamicSupervisor.start_child({WorkersPerQueueSupervisor, queue_pid}) end end diff --git a/lib/honeydew/worker_starter.ex b/lib/honeydew/worker_starter.ex index 2c63a62..58531ad 100644 --- a/lib/honeydew/worker_starter.ex +++ b/lib/honeydew/worker_starter.ex @@ -3,22 +3,24 @@ defmodule Honeydew.WorkerStarter do @moduledoc false use GenServer + alias Honeydew.WorkerGroupSupervisor + alias Honeydew.Processes + require Logger # called by a queue to tell the workerstarter to start workers def queue_available(queue, node) do - GenServer.cast({Honeydew.process(queue, __MODULE__), node}, {:queue_available, self()}) + GenServer.cast({Processes.process(queue, __MODULE__), node}, {:queue_available, self()}) end def start_link(queue) do - GenServer.start_link(__MODULE__, queue, name: Honeydew.process(queue, __MODULE__)) + GenServer.start_link(__MODULE__, queue, name: Processes.process(queue, __MODULE__)) end def init(queue) do - # this process starts after the WorkerGroupSupervisor, so we can send it start requests queue - |> Honeydew.get_all_queues + |> Processes.get_queues() |> Enum.each(&WorkerGroupSupervisor.start_worker_group(queue, &1)) {:ok, queue} diff --git a/lib/honeydew/workers.ex b/lib/honeydew/workers.ex index 44d6223..cc5f9d7 100644 --- a/lib/honeydew/workers.ex +++ b/lib/honeydew/workers.ex @@ -2,7 +2,9 @@ defmodule Honeydew.Workers do @moduledoc false use Supervisor + alias Honeydew.WorkerRootSupervisor + alias Honeydew.Processes @type name :: Honeydew.queue_name() @type mod_or_mod_args :: Honeydew.mod_or_mod_args() @@ -43,7 +45,7 @@ defmodule Honeydew.Workers do raise ArgumentError, invalid_module_error(module) end - Honeydew.create_groups(name) + Processes.start_process_group_scope(name) with {:ok, _} <- Supervisor.start_child(__MODULE__, {WorkerRootSupervisor, [name, opts]}) do :ok diff --git a/mix.exs b/mix.exs index 13d57c3..3b3fdae 100644 --- a/mix.exs +++ b/mix.exs @@ -1,12 +1,12 @@ defmodule Honeydew.Mixfile do use Mix.Project - @version "1.4.5" + @version "1.5.0" def project do [app: :honeydew, version: @version, - elixir: "~> 1.7", + elixir: "~> 1.12.0", start_permanent: Mix.env() == :prod, docs: docs(), deps: deps(), @@ -16,8 +16,8 @@ defmodule Honeydew.Mixfile do dialyzer: [ plt_add_apps: [:mnesia, :ex_unit], flags: [ - # :unmatched_returns, - # :error_handling, + :unmatched_returns, + :error_handling, :race_conditions, :no_opaque ] @@ -33,6 +33,7 @@ defmodule Honeydew.Mixfile do # Type `mix help compile.app` for more information def application do [extra_applications: [:logger], + included_applications: [:mnesia], mod: {Honeydew.Application, []}] end @@ -40,7 +41,7 @@ defmodule Honeydew.Mixfile do [ {:ecto, "~> 3.0", optional: true, only: [:dev, :prod]}, {:ex_doc, ">= 0.0.0", only: :dev}, - {:dialyxir, "~> 0.5", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false}, # {:eflame, git: "git@github.com:slfritchie/eflame", only: :dev}, ] end diff --git a/mix.lock b/mix.lock index d8f1d23..04e5b3a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,17 +1,21 @@ %{ - "decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"}, - "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"}, - "ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"}, + "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, + "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, + "earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm", "b42a23e9bd92d65d16db2f75553982e58519054095356a418bb8320bbacb58b1"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"}, + "ecto": {:hex, :ecto, "3.6.1", "7bb317e3fd0179ad725069fd0fe8a28ebe48fec6282e964ea502e4deccb0bd0f", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbb3294a990447b19f0725488a749f8cf806374e0d9d0dffc45d61e7aeaf6553"}, "eflame": {:git, "git@github.com:slfritchie/eflame", "a08518142126f5fc541a3a3c4a04c27f24448bae", []}, - "ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, + "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"}, "hamcrest": {:hex, :basho_hamcrest, "0.4.1", "fb7b2c92d252a1e9db936750b86089addaebeb8f87967fb4bbdda61e8863338e", [:make, :mix, :rebar3], [], "hexpm"}, - "makeup": {:hex, :makeup, "0.5.5", "9e08dfc45280c5684d771ad58159f718a7b5788596099bdfb0284597d368a882", [:mix], [{:nimble_parsec, "~> 0.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.10.0", "0f09c2ddf352887a956d84f8f7e702111122ca32fbbc84c2f0569b8b65cbf7fa", [:mix], [{:makeup, "~> 0.5.5", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "meck": {:hex, :meck, "0.8.4", "59ca1cd971372aa223138efcf9b29475bde299e1953046a0c727184790ab1520", [:make, :rebar], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.4.0", "ee261bb53214943679422be70f1658fff573c5d0b0a1ecd0f18738944f818efe", [:mix], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"}, "protobuffs": {:hex, :protobuffs, "0.8.4", "d38ca5f7380d8477c274680273372011890f8d0037c0d7e7db5c0207b89a4e0b", [:make, :rebar], [{:meck, "~> 0.8.4", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"}, "riak_pb": {:hex, :riak_pb, "2.3.2", "48ffbf66dbb3f136ab9a7134bac4e496754baa5ef58c4f50a61326736d996390", [:make, :mix, :rebar3], [{:hamcrest, "~> 0.4.1", [hex: :basho_hamcrest, repo: "hexpm", optional: false]}], "hexpm"}, "riakc": {:hex, :riakc, "2.5.3", "6132d9e687a0dfd314b2b24c4594302ca8b55568a5d733c491d8fb6cd4004763", [:make, :mix, :rebar3], [{:riak_pb, "~> 2.3", [hex: :riak_pb, repo: "hexpm", optional: false]}], "hexpm"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } diff --git a/test/honeydew/queue/erlang_queue_integration_test.exs b/test/honeydew/queue/erlang_queue_integration_test.exs index d8ed585..a5b9101 100644 --- a/test/honeydew/queue/erlang_queue_integration_test.exs +++ b/test/honeydew/queue/erlang_queue_integration_test.exs @@ -1,6 +1,7 @@ defmodule Honeydew.ErlangQueueIntegrationTest do use ExUnit.Case, async: false # shares doctest queue name with mnesia queue test alias Honeydew.Job + alias Honeydew.Processes setup [ :setup_queue_name, @@ -182,7 +183,7 @@ defmodule Honeydew.ErlangQueueIntegrationTest do end test "should not leak monitors", %{queue: queue} do - queue_process = Honeydew.get_queue(queue) + queue_process = Processes.get_queue(queue) Enum.each(0..500, fn _ -> me = self() diff --git a/test/honeydew/queue/mnesia_queue_integration_test.exs b/test/honeydew/queue/mnesia_queue_integration_test.exs index 4d040dd..4b42c99 100644 --- a/test/honeydew/queue/mnesia_queue_integration_test.exs +++ b/test/honeydew/queue/mnesia_queue_integration_test.exs @@ -2,6 +2,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do use ExUnit.Case, async: false # shares doctest queue name with ErlangQueue test alias Honeydew.Job alias Honeydew.Queue.Mnesia.WrappedJob + alias Honeydew.Processes @moduletag :capture_log @@ -211,7 +212,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do end test "should not leak monitors", %{queue: queue} do - queue_process = Honeydew.get_queue(queue) + queue_process = Processes.get_queue(queue) Enum.each(0..500, fn _ -> me = self() diff --git a/test/honeydew/worker_test.exs b/test/honeydew/worker_test.exs index a5d3439..1f48b55 100644 --- a/test/honeydew/worker_test.exs +++ b/test/honeydew/worker_test.exs @@ -1,6 +1,8 @@ defmodule Honeydew.WorkerTest do use ExUnit.Case + alias Honeydew.Processes + import Honeydew.CrashLoggerHelpers defmodule WorkerWithBadInit do @@ -17,7 +19,7 @@ defmodule Honeydew.WorkerTest do @tag :start_workers test "workers should die when their queue dies", %{queue: queue} do - queue_pid = Honeydew.get_queue(queue) + queue_pid = Processes.get_queue(queue) %{workers: workers} = Honeydew.status(queue) Process.exit(queue_pid, :kill) diff --git a/test/honeydew_test.exs b/test/honeydew_test.exs index 34aa886..2f77c9a 100644 --- a/test/honeydew_test.exs +++ b/test/honeydew_test.exs @@ -5,8 +5,7 @@ defmodule HoneydewTest do import Honeydew.CrashLoggerHelpers alias Honeydew.Job - alias Honeydew.WorkerGroupSupervisor - alias Honeydew.Workers + alias Honeydew.Processes @moduletag :capture_log @@ -24,7 +23,7 @@ defmodule HoneydewTest do # tell the queue that that job can be removed. queue - |> Honeydew.get_queue + |> Processes.get_queue |> Queue.ack(job) # send the error to the awaiting process, if necessary @@ -71,20 +70,6 @@ defmodule HoneydewTest do assert {:error, _} = Honeydew.start_workers(:a_queue, Stateless) end - test "group/1" do - assert Honeydew.group(:my_queue, Workers) == :"Elixir.Honeydew.Workers.my_queue" - end - - describe "process/2" do - test "local" do - assert Honeydew.process(:my_queue, WorkerGroupSupervisor) == :"Elixir.Honeydew.WorkerGroupSupervisor.my_queue" - end - - test "global" do - assert Honeydew.process({:global, :my_queue}, WorkerGroupSupervisor) == :"Elixir.Honeydew.WorkerGroupSupervisor.global.my_queue" - end - end - test "table_name/1" do assert Honeydew.table_name({:global, :my_queue}) == "global_my_queue" assert Honeydew.table_name(:my_queue) == "my_queue" @@ -125,7 +110,7 @@ defmodule HoneydewTest do :ok = Honeydew.start_queue(queue) :ok = Honeydew.start_workers(queue, Stateless, num: 5) - queue_pid = Honeydew.get_queue(queue) + queue_pid = Processes.get_queue(queue) Honeydew.suspend(queue) Enum.each(0..200, fn _ -> @@ -147,7 +132,7 @@ defmodule HoneydewTest do :ok end - assert queue_pid == Honeydew.get_queue(queue) + assert queue_pid == Processes.get_queue(queue) end test "workers don't restart after a successful job" do @@ -398,7 +383,7 @@ defmodule HoneydewTest do end defp assert_crash_logged(%Job{private: private}) do - assert_receive {:honeydew_crash_log, {level, pid, {_mod, _msg, _ts, metadata}}} + assert_receive {:honeydew_crash_log, {_level, _pid, {_mod, _msg, _ts, metadata}}} assert Keyword.has_key?(metadata, :honeydew_crash_reason) assert ^private = Keyword.fetch!(metadata, :honeydew_job) |> Map.fetch!(:private) diff --git a/test/support/cluster_setups.ex b/test/support/cluster_setups.ex index d27ce6b..b35ffdb 100644 --- a/test/support/cluster_setups.ex +++ b/test/support/cluster_setups.ex @@ -1,5 +1,6 @@ defmodule Honeydew.Support.ClusterSetups do alias Honeydew.Support.Cluster + alias Honeydew.Processes def start_queue_node(queue) do fn -> @@ -23,8 +24,8 @@ defmodule Honeydew.Support.ClusterSetups do me = self() - # seems to be necessary to get pg2 to sync with the slaves - Honeydew.create_groups(queue) + # seems to be necessary to get :pg to sync with the slaves + Processes.start_process_group_scope(queue) Node.spawn_link(node, fn -> function.()