Skip to content

Commit

Permalink
feat: bulk operations
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyichv committed May 30, 2024
1 parent d15d644 commit 29671f3
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 58 deletions.
82 changes: 73 additions & 9 deletions lib/arke/query_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ defmodule Arke.QueryManager do
alias Arke.Utils.DatetimeHandler, as: DatetimeHandler
alias Arke.Core.{Arke, Unit, Query, Parameter}


@persistence Application.get_env(:arke, :persistence)
@record_fields [:id, :data, :metadata, :inserted_at, :updated_at]

Expand Down Expand Up @@ -142,31 +141,87 @@ defmodule Arke.QueryManager do
@spec create(project :: atom(), arke :: Arke.t(), args :: list()) :: func_return()
def create(project, arke, args) do
persistence_fn = @persistence[:arke_postgres][:create]

with %Unit{} = unit <- Unit.load(arke, args, :create),
{:ok, unit} <- Validator.validate(unit, :create, project),
{:ok, unit} <- ArkeManager.call_func(arke, :before_create, [arke, unit]),
%{valid: valid, errors: errors} <- Validator.validate(unit, :create, project),
# todo better valid / error handling
{:ok, unit} <- ArkeManager.call_func(arke, :before_create, [arke, List.first(valid)]),
{:ok, unit} <- handle_group_call_func(arke, unit, :before_unit_create),
{:ok, unit} <- handle_link_parameters_unit(arke, unit),
{:ok, unit} <- persistence_fn.(project, unit),
{:ok, unit} <- persistence_fn.(project, unit, []),
{:ok, unit} <- ArkeManager.call_func(arke, :on_create, [arke, unit]),
{:ok, unit} <- handle_link_parameters(unit, %{}),
{:ok, unit} <- handle_group_call_func(arke, unit, :on_unit_create),
do: {:ok, unit},
else: ({:error, errors} -> {:error, errors})
end

@spec create_bulk(project :: atom(), arke :: Arke.t(), data :: list(Arke.t()), args :: list()) ::
func_return()
def create_bulk(project, arke, data, args) do
persistence_fn = @persistence[:arke_postgres][:create]

with %{valid: valid, errors: errors} <- load_bulk_units(arke, data, :create, args),
%{valid: valid, errors: errors} <- Validator.validate(valid, :create, project),
%{valid: valid, errors: errors} <- process_bulk({valid, errors}, arke, :before),
{:ok, valid, persistence_errors} <- persistence_fn.(project, valid, bulk: true),
%{valid: valid, errors: errors} <-
process_bulk({valid, errors ++ persistence_errors}, arke, :after),
do: {:ok, valid, errors},
else: ({:error, errors} -> {:error, errors})
end

defp process_bulk({valid, errors}, arke, phase) do
Enum.reduce(valid, %{valid: [], errors: errors}, fn unit, acc ->
with {:ok, unit} <- call_persistence_fn(arke, unit, :create, phase),
{:ok, unit} <- call_group_persistence_fn(arke, unit, :create, phase),
{:ok, unit} <- call_link_parameters_fn(arke, unit, :create, phase),
do: Map.put(acc, :valid, [unit | acc.valid]),
else: ({:error, e} -> Map.put(acc, :errors, [{unit, e} | acc.errors]))
end)
end

defp load_bulk_units(arke, units, persistence_fn, args),
do:
Enum.reduce(units, %{valid: [], errors: []}, fn item, acc ->
case Unit.load(arke, data_as_klist(item) ++ args, persistence_fn) do
%Unit{} = unit -> Map.put(acc, :valid, [unit | acc.valid])
{:error, error} -> Map.put(acc, :errors, [error | acc.errors])
end
end)

defp call_persistence_fn(arke, unit, :create, :before),
do: ArkeManager.call_func(arke, :before_create, [arke, unit])

defp call_persistence_fn(arke, unit, :create, :after),
do: ArkeManager.call_func(arke, :on_create, [arke, unit])

defp call_group_persistence_fn(arke, unit, :create, :before),
do: handle_group_call_func(arke, unit, :before_unit_create)

defp call_group_persistence_fn(arke, unit, :create, :after),
do: handle_group_call_func(arke, unit, :on_unit_create)

defp call_link_parameters_fn(arke, unit, :create, :before),
do: handle_link_parameters_unit(arke, unit)

defp call_link_parameters_fn(arke, unit, :create, :after),
do: handle_link_parameters(unit, %{})

# todo: remove after atoms removal
defp data_as_klist(data) do
Enum.map(data, fn {key, value} -> {String.to_existing_atom(key), value} end)
end

defp handle_link_parameters_unit(%{id: :arke_link} = _, unit), do: {:ok, unit}

defp handle_link_parameters_unit(
%{data: parameters} = arke,
%{metadata: %{project: project}} = unit
) do


{errors, link_units} =
Enum.filter(ArkeManager.get_parameters(arke), fn p -> p.arke_id == :link end)
|> Enum.reduce({[], []}, fn p, {errors, link_units} ->

arke = ArkeManager.get(String.to_existing_atom(p.data.arke_or_group_id), project)

case handle_create_on_link_parameters_unit(
Expand Down Expand Up @@ -200,6 +255,11 @@ defmodule Arke.QueryManager do
end
end

defp handle_link_parameters_unit(any, any2) do
IO.inspect(any)
IO.inspect(any2)
end

defp handle_create_on_link_parameters_unit(project, unit, parameter, arke, value)
when is_nil(value),
do: {:ok, parameter, value}
Expand All @@ -222,7 +282,6 @@ defmodule Arke.QueryManager do
do: {:ok, parameter, value}

def handle_group_call_func(arke, unit, func) do

GroupManager.get_groups_by_arke(arke)
|> Enum.reduce_while(unit, fn group, new_unit ->
with {:ok, new_unit} <- GroupManager.call_func(group, func, [arke, new_unit]),
Expand Down Expand Up @@ -256,9 +315,11 @@ defmodule Arke.QueryManager do
def update(%{arke_id: arke_id, metadata: %{project: project}, data: data} = current_unit, args) do
persistence_fn = @persistence[:arke_postgres][:update]
arke = ArkeManager.get(arke_id, project)

with %Unit{} = unit <- Unit.update(current_unit, args),
{:ok, unit} <- update_at_on_update(unit),
{:ok, unit} <- Validator.validate(unit, :update, project),
%{valid: valid, errors: errors} <- Validator.validate(unit, :update, project),
# todo better valid / error handling
{:ok, unit} <- ArkeManager.call_func(arke, :before_update, [arke, unit]),
{:ok, unit} <- handle_group_call_func(arke, unit, :before_unit_update),
{:ok, unit} <- handle_link_parameters_unit(arke, unit),
Expand All @@ -269,10 +330,12 @@ defmodule Arke.QueryManager do
do: {:ok, unit},
else: ({:error, errors} -> {:error, errors})
end

defp update_at_on_update(unit) do
updated_at = DatetimeHandler.now(:datetime)
{:ok, Unit.update(unit, updated_at: updated_at)}
end

@doc """
Function to delete a given unit
## Parameters
Expand Down Expand Up @@ -524,6 +587,7 @@ defmodule Arke.QueryManager do

defp handle_filter(query, :group_id, :eq, value, negate) do
%{id: id} = group = get_group(value, query.project)

arke_list =
Enum.map(GroupManager.get_arke_list(group), fn a ->
Atom.to_string(a.id)
Expand Down
Loading

0 comments on commit 29671f3

Please sign in to comment.