Skip to content

Commit

Permalink
Support trigger a job that uses a generic action
Browse files Browse the repository at this point in the history
  • Loading branch information
tunchamroeun committed Nov 29, 2024
1 parent 9a5ce56 commit bf93fce
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ if Mix.env() == :test do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]

config :ash_oban, actor_persister: AshOban.Test.ActorPersister
Expand Down
67 changes: 67 additions & 0 deletions lib/transformers/define_schedulers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,73 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end

defp work(trigger, worker, _atomic?, :action, pro?, _read_action, resource, domain) do
function_name =
if pro? do
:process
else
:perform
end

if trigger.state != :active do
quote location: :keep do
@impl unquote(worker)
def unquote(function_name)(_) do
{:discard, unquote(trigger.state)}
end
end
else
quote location: :keep, generated: true do
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{args: %{"primary_key" => primary_key} = args} = job) do
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
authorize? = AshOban.authorize?()

AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
unquote(trigger.debug?)
)

input =
build_input(
args,
unquote(Macro.escape(trigger.action_input || %{})),
unquote(Macro.escape(trigger.read_metadata))
)

unquote(resource)
|> Ash.ActionInput.for_action(
unquote(trigger.action),
input,
authorize?: authorize?,
actor: actor,
domain: unquote(domain),
skip_unknown_inputs: Map.keys(input)
)
|> Ash.run_action!()

:ok

{:error, error} ->
raise Ash.Error.to_ash_error(error)
end
end

defp build_input(args, action_input, read_metadata) do
primary_key = Map.take(args, ["primary_key"])

metadata = if is_nil(read_metadata), do: %{}, else: %{metadata: args["metadata"]}

metadata
|> Map.merge(args["action_arguments"] || %{})
|> Map.merge(action_input)
|> Map.merge(primary_key)
end
end
end
end

defp work(trigger, worker, atomic?, trigger_action_type, pro?, read_action, resource, domain) do
function_name =
if pro? do
Expand Down
29 changes: 21 additions & 8 deletions test/ash_oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ defmodule AshObanTest do

setup do
Enum.each(
[:triggered_process, :triggered_process_2, :triggered_say_hello],
[
:triggered_process,
:triggered_process_2,
:triggered_say_hello,
:triggered_process_generic
],
&Oban.drain_queue(queue: &1)
)
end

test "nothing happens if no records exist" do
assert %{success: 2} = AshOban.Test.schedule_and_run_triggers(Triggered)
assert %{success: 3} = AshOban.Test.schedule_and_run_triggers(Triggered)
end

test "if a record exists, it is processed" do
Expand Down Expand Up @@ -66,15 +71,16 @@ defmodule AshObanTest do
|> Ash.Changeset.for_create(:create)
|> Ash.create!()

assert %{success: 3, failure: 1} =
assert %{success: 5, failure: 1} =
AshOban.Test.schedule_and_run_triggers(Triggered)
end

test "dsl introspection" do
assert [
%AshOban.Trigger{action: :process},
%AshOban.Trigger{action: :process_atomically},
%AshOban.Trigger{action: :process, scheduler: nil}
%AshOban.Trigger{action: :process, scheduler: nil},
%AshOban.Trigger{name: :process_generic}
] = AshOban.Info.oban_triggers(Triggered)
end

Expand All @@ -87,7 +93,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]
)

Expand All @@ -97,6 +104,7 @@ defmodule AshObanTest do
[
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []}
]
Expand All @@ -105,7 +113,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]
] = config
end
Expand All @@ -125,7 +134,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]}
],
queues: false
Expand All @@ -141,6 +151,8 @@ defmodule AshObanTest do
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process,
Expand All @@ -151,7 +163,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]}
],
queues: false
Expand Down
6 changes: 6 additions & 0 deletions test/support/triggered.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule AshOban.Test.Triggered do
worker_read_action(:read)
scheduler_cron false
end

trigger :process_generic do
action :say_hello
max_attempts 2
scheduler_cron "* * * * *"
end
end

scheduled_actions do
Expand Down

0 comments on commit bf93fce

Please sign in to comment.