Skip to content

Commit

Permalink
Support trigger a job that uses a generic action
Browse files Browse the repository at this point in the history
  • Loading branch information
tunchamroeun committed Nov 28, 2024
1 parent 9a5ce56 commit 8cdbd97
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 17 deletions.
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ if Mix.env() == :test do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]

config :ash_oban, actor_persister: AshOban.Test.ActorPersister
Expand Down
90 changes: 82 additions & 8 deletions lib/transformers/define_schedulers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,32 @@ defmodule AshOban.Transformers.DefineSchedulers do
fn _ -> %{} end
end

trigger_action = Ash.Resource.Info.action(unquote(resource), unquote(trigger.action))

case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
unquote(resource)
|> stream(actor)
|> Stream.map(fn record ->
if(trigger_action.type == :action) do

Check warning on line 217 in lib/transformers/define_schedulers.ex

View workflow job for this annotation

GitHub Actions / ash-ci / mix credo --strict

The condition of `if` should not be wrapped in parentheses.
action_input = unquote(Macro.escape(trigger.action_input || %{}))

unquote(worker_module_name).new(%{
primary_key: Map.take(record, unquote(primary_key)),
metadata: metadata.(record),
actor: args["actor"]
actor: args["actor"],
action_arguments: action_input
})
end)
|> insert()
|> Oban.insert!()

:ok
else
unquote(resource)
|> stream(actor)
|> Stream.map(fn record ->
unquote(worker_module_name).new(%{
primary_key: Map.take(record, unquote(primary_key)),
metadata: metadata.(record),
actor: args["actor"]
})
end)
|> insert()
end

{:error, e} ->
raise Ash.Error.to_ash_error(e)
Expand Down Expand Up @@ -574,6 +588,66 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end

defp work(trigger, worker, _atomic?, :action, pro?, _read_action, resource, domain) do
function_name =
if pro? do
:process
else
:perform
end

if trigger.state != :active do
quote location: :keep do
@impl unquote(worker)
def unquote(function_name)(_) do
{:discard, unquote(trigger.state)}
end
end
else
quote location: :keep, generated: true do
@impl unquote(worker)
def unquote(function_name)(%Oban.Job{args: args} = job) do
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
authorize? = AshOban.authorize?()

AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered",
unquote(trigger.debug?)
)

action_input = unquote(Macro.escape(trigger.action_input || %{}))

input =
if unquote(is_nil(trigger.read_metadata)) do
%{}
else
%{metadata: args["metadata"]}
end
|> Map.merge(args["action_arguments"] || %{})
|> Map.merge(action_input)

unquote(resource)
|> Ash.ActionInput.for_action(
unquote(trigger.action),
input,
authorize?: authorize?,
actor: actor,
domain: unquote(domain),
skip_unknown_inputs: Map.keys(input)
)
|> Ash.run_action!()

:ok

{:error, error} ->
raise Ash.Error.to_ash_error(error)
end
end
end
end
end

defp work(trigger, worker, atomic?, trigger_action_type, pro?, read_action, resource, domain) do
function_name =
if pro? do
Expand Down
29 changes: 21 additions & 8 deletions test/ash_oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ defmodule AshObanTest do

setup do
Enum.each(
[:triggered_process, :triggered_process_2, :triggered_say_hello],
[
:triggered_process,
:triggered_process_2,
:triggered_say_hello,
:triggered_process_generic
],
&Oban.drain_queue(queue: &1)
)
end

test "nothing happens if no records exist" do
assert %{success: 2} = AshOban.Test.schedule_and_run_triggers(Triggered)
assert %{success: 4} = AshOban.Test.schedule_and_run_triggers(Triggered)
end

test "if a record exists, it is processed" do
Expand Down Expand Up @@ -66,15 +71,16 @@ defmodule AshObanTest do
|> Ash.Changeset.for_create(:create)
|> Ash.create!()

assert %{success: 3, failure: 1} =
assert %{success: 5, failure: 1} =
AshOban.Test.schedule_and_run_triggers(Triggered)
end

test "dsl introspection" do
assert [
%AshOban.Trigger{action: :process},
%AshOban.Trigger{action: :process_atomically},
%AshOban.Trigger{action: :process, scheduler: nil}
%AshOban.Trigger{action: :process, scheduler: nil},
%AshOban.Trigger{name: :process_generic}
] = AshOban.Info.oban_triggers(Triggered)
end

Expand All @@ -87,7 +93,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]
)

Expand All @@ -97,6 +104,7 @@ defmodule AshObanTest do
[
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically, []},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process, []}
]
Expand All @@ -105,7 +113,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]
] = config
end
Expand All @@ -125,7 +134,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]}
],
queues: false
Expand All @@ -141,6 +151,8 @@ defmodule AshObanTest do
crontab: [
{"0 0 1 1 *", AshOban.Test.Triggered.AshOban.ActionWorker.SayHello,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessGeneric,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.ProcessAtomically,
[paused: false]},
{"* * * * *", AshOban.Test.Triggered.AshOban.Scheduler.Process,
Expand All @@ -151,7 +163,8 @@ defmodule AshObanTest do
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
triggered_say_hello: 10,
triggered_process_generic: 10
]}
],
queues: false
Expand Down
6 changes: 6 additions & 0 deletions test/support/triggered.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule AshOban.Test.Triggered do
worker_read_action(:read)
scheduler_cron false
end

trigger :process_generic do
action :say_hello
max_attempts 2
scheduler_cron "* * * * *"
end
end

scheduled_actions do
Expand Down

0 comments on commit 8cdbd97

Please sign in to comment.