From bf93fcee49d013f2b36dc01f4673d027fb30767c Mon Sep 17 00:00:00 2001 From: Tun Cham Roeun Date: Thu, 28 Nov 2024 10:32:50 +0700 Subject: [PATCH] Support trigger a job that uses a generic action --- config/config.exs | 3 +- lib/transformers/define_schedulers.ex | 67 +++++++++++++++++++++++++++ test/ash_oban_test.exs | 29 ++++++++---- test/support/triggered.ex | 6 +++ 4 files changed, 96 insertions(+), 9 deletions(-) diff --git a/config/config.exs b/config/config.exs index bf01fdb..67cf875 100644 --- a/config/config.exs +++ b/config/config.exs @@ -22,7 +22,8 @@ if Mix.env() == :test do queues: [ triggered_process: 10, triggered_process_2: 10, - triggered_say_hello: 10 + triggered_say_hello: 10, + triggered_process_generic: 10 ] config :ash_oban, actor_persister: AshOban.Test.ActorPersister diff --git a/lib/transformers/define_schedulers.ex b/lib/transformers/define_schedulers.ex index 2169e3c..42b7794 100644 --- a/lib/transformers/define_schedulers.ex +++ b/lib/transformers/define_schedulers.ex @@ -574,6 +574,73 @@ defmodule AshOban.Transformers.DefineSchedulers do end end + defp work(trigger, worker, _atomic?, :action, pro?, _read_action, resource, domain) do + function_name = + if pro? do + :process + else + :perform + end + + if trigger.state != :active do + quote location: :keep do + @impl unquote(worker) + def unquote(function_name)(_) do + {:discard, unquote(trigger.state)} + end + end + else + quote location: :keep, generated: true do + @impl unquote(worker) + def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key} = args} = job) do + case AshOban.lookup_actor(args["actor"]) do + {:ok, actor} -> + authorize? = AshOban.authorize?() + + AshOban.debug( + "Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}", + unquote(trigger.debug?) + ) + + input = + build_input( + args, + unquote(Macro.escape(trigger.action_input || %{})), + unquote(Macro.escape(trigger.read_metadata)) + ) + + unquote(resource) + |> Ash.ActionInput.for_action( + unquote(trigger.action), + input, + authorize?: authorize?, + actor: actor, + domain: unquote(domain), + skip_unknown_inputs: Map.keys(input) + ) + |> Ash.run_action!() + + :ok + + {:error, error} -> + raise Ash.Error.to_ash_error(error) + end + end + + defp build_input(args, action_input, read_metadata) do + primary_key = Map.take(args, ["primary_key"]) + + metadata = if is_nil(read_metadata), do: %{}, else: %{metadata: args["metadata"]} + + metadata + |> Map.merge(args["action_arguments"] || %{}) + |> Map.merge(action_input) + |> Map.merge(primary_key) + end + end + end + end + defp work(trigger, worker, atomic?, trigger_action_type, pro?, read_action, resource, domain) do function_name = if pro? do diff --git a/test/ash_oban_test.exs b/test/ash_oban_test.exs index 93e942d..33c77e7 100644 --- a/test/ash_oban_test.exs +++ b/test/ash_oban_test.exs @@ -14,13 +14,18 @@ defmodule AshObanTest do setup do Enum.each( - [:triggered_process, :triggered_process_2, :triggered_say_hello], + [ + :triggered_process, + :triggered_process_2, + :triggered_say_hello, + :triggered_process_generic + ], &Oban.drain_queue(queue: &1) ) end test "nothing happens if no records exist" do - assert %{success: 2} = AshOban.Test.schedule_and_run_triggers(Triggered) + assert %{success: 3} = AshOban.Test.schedule_and_run_triggers(Triggered) end test "if a record exists, it is processed" do @@ -66,7 +71,7 @@ defmodule AshObanTest do |> Ash.Changeset.for_create(:create) |> Ash.create!() - assert %{success: 3, failure: 1} = + assert %{success: 5, failure: 1} = AshOban.Test.schedule_and_run_triggers(Triggered) end @@ -74,7 +79,8 @@ defmodule AshObanTest do assert [ %AshOban.Trigger{action: :process}, %AshOban.Trigger{action: :process_atomically}, - %AshOban.Trigger{action: :process, scheduler: nil} + %AshOban.Trigger{action: :process, scheduler: nil}, + %AshOban.Trigger{name: :process_generic} ] = AshOban.Info.oban_triggers(Triggered) end @@ -87,7 +93,8 @@ defmodule AshObanTest do queues: [ triggered_process: 10, triggered_process_2: 10, - triggered_say_hello: 10 + triggered_say_hello: 10, + triggered_process_generic: 10 ] ) @@ -97,6 +104,7 @@ defmodule AshObanTest do [ crontab: [ {"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []}, + {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric, []}, {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically, []}, {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []} ] @@ -105,7 +113,8 @@ defmodule AshObanTest do queues: [ triggered_process: 10, triggered_process_2: 10, - triggered_say_hello: 10 + triggered_say_hello: 10, + triggered_process_generic: 10 ] ] = config end @@ -125,7 +134,8 @@ defmodule AshObanTest do queues: [ triggered_process: 10, triggered_process_2: 10, - triggered_say_hello: 10 + triggered_say_hello: 10, + triggered_process_generic: 10 ]} ], queues: false @@ -141,6 +151,8 @@ defmodule AshObanTest do crontab: [ {"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, [paused: false]}, + {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric, + [paused: false]}, {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically, [paused: false]}, {"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, @@ -151,7 +163,8 @@ defmodule AshObanTest do queues: [ triggered_process: 10, triggered_process_2: 10, - triggered_say_hello: 10 + triggered_say_hello: 10, + triggered_process_generic: 10 ]} ], queues: false diff --git a/test/support/triggered.ex b/test/support/triggered.ex index 6aaf427..9a63068 100644 --- a/test/support/triggered.ex +++ b/test/support/triggered.ex @@ -30,6 +30,12 @@ defmodule AshOban.Test.Triggered do worker_read_action(:read) scheduler_cron false end + + trigger :process_generic do + action :say_hello + max_attempts 2 + scheduler_cron "* * * * *" + end end scheduled_actions do