Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulk operations #37

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 70 additions & 47 deletions lib/arke_postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ defmodule ArkePostgres do
case check_env() do
{:ok, nil} ->
try do
projects = Query.get_project_record()

projects =Query.get_project_record()
Enum.each(projects, fn %{id: project_id} = _project ->
start_managers(project_id)
end)
Expand All @@ -31,10 +31,16 @@ defmodule ArkePostgres do
_ in DBConnection.ConnectionError ->
IO.inspect("ConnectionError")
:error

err in Postgrex.Error ->
%{message: message,postgres: %{code: code, message: postgres_message}} = err
parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"}
IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ])
%{message: message, postgres: %{code: code, message: postgres_message}} = err

parsed_message = %{
context: "postgrex_error",
message: "#{message || postgres_message}"
}

IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan])
:error
end

Expand Down Expand Up @@ -67,64 +73,82 @@ defmodule ArkePostgres do
end
end

defp start_managers(project_id) when is_binary(project_id), do: start_managers(String.to_atom(project_id))
defp start_managers(project_id) when is_binary(project_id),
do: start_managers(String.to_atom(project_id))

defp start_managers(project_id) do
{parameters, arke_list, groups} = Query.get_manager_units(project_id)

Arke.handle_manager(parameters,project_id,:parameter)
Arke.handle_manager(arke_list,project_id,:arke)
Arke.handle_manager(groups,project_id,:group)

Arke.handle_manager(parameters, project_id, :parameter)
Arke.handle_manager(arke_list, project_id, :arke)
Arke.handle_manager(groups, project_id, :group)
end

def create(project, %{arke_id: arke_id} = unit) do
def create(project, unit, opts \\ [])

def create(project, %{arke_id: arke_id} = unit, opts),
do: create(project, [unit], opts)

def create(_project, [], _opts), do: {:ok, 0, [], []}

def create(project, [%{arke_id: arke_id} | _] = unit_list, opts) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)
case handle_create(project, arke, unit) do

case handle_create(project, arke, unit_list, opts) do
{:ok, unit} ->
{:ok,
Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project}))}

{:ok, count, valid, errors} ->
{:ok, count,
Enum.map(valid, fn unit ->
Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project}))
end), errors}

{:error, errors} ->
{:error, handle_changeset_errros(errors)}
{:error, errors}
end
end

defp handle_create(
project,
%{data: %{type: "table"}} = arke,
%{data: data, metadata: metadata} = unit
[%{data: data, metadata: metadata} = unit | _] = _,
_opts
) do
# todo: handle bulk?
# todo: remove once the project is not needed anymore
data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist
Table.insert(project, arke, data)
{:ok, unit}
end

defp handle_create(project, %{data: %{type: "arke"}} = arke, unit) do
case ArkeUnit.insert(project, arke, unit) do
{:ok, %{id: id, inserted_at: inserted_at, updated_at: updated_at}} ->
{:ok,
Arke.Core.Unit.update(unit, id: id, inserted_at: inserted_at, updated_at: updated_at)}

{:error, errors} ->
{:error, errors}
end
end
defp handle_create(project, %{data: %{type: "arke"}} = arke, unit_list, opts),
do: ArkeUnit.insert(project, arke, unit_list, opts)

defp handle_create(proj, arke, unit) do
defp handle_create(_project, _arke, _unit, _opts) do
{:error, "arke type not supported"}
end

def update(project, %{arke_id: arke_id} = unit) do
def update(project, unit, opts \\ [])

def update(project, %{arke_id: arke_id} = unit, opts),
do: update(project, [unit], opts)

def update(_project, [], _opts), do: {:ok, 0, [], []}

def update(project, [%{arke_id: arke_id} | _] = unit_list, opts) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)
{:ok, unit} = handle_update(project, arke, unit)
handle_update(project, arke, unit_list, opts)
end

def handle_update(
project,
%{data: %{type: "table"}} = arke,
%{data: data, metadata: metadata} = unit
[%{data: data, metadata: metadata} = unit | _] = _,
_opts
) do
# todo: handle bulk?
data =
unit
|> filter_primary_keys(false)
Expand All @@ -138,21 +162,30 @@ defmodule ArkePostgres do
{:ok, unit}
end

def handle_update(project, %{data: %{type: "arke"}} = arke, unit) do
ArkeUnit.update(project, arke, unit)
{:ok, unit}
end
def handle_update(project, %{data: %{type: "arke"}} = arke, unit_list, opts),
do: ArkeUnit.update(project, arke, unit_list, opts)

def handle_update(_, _, _) do
def handle_update(_project, _arke, _unit, _opts) do
{:error, "arke type not supported"}
end

def delete(project, %{arke_id: arke_id} = unit) do
def delete(project, unit, opts \\ [])

def delete(project, %{arke_id: arke_id} = unit, opts), do: delete(project, [unit], opts)

def delete(project, [], opts), do: {:ok, nil}

def delete(project, [%{arke_id: arke_id} | _] = unit_list, opts) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)
handle_delete(project, arke, unit)
handle_delete(project, arke, unit_list)
end

defp handle_delete(project, %{data: %{type: "table"}} = arke, %{metadata: metadata} = unit) do
defp handle_delete(
project,
%{data: %{type: "table"}} = arke,
[%{metadata: metadata} = unit | _] = _
) do
# todo: handle bulk?
metadata = Map.delete(metadata, :project)

where = unit |> filter_primary_keys(true) |> Map.put_new(:metadata, metadata) |> data_as_klist
Expand All @@ -163,11 +196,8 @@ defmodule ArkePostgres do
end
end

defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit) do
case ArkeUnit.delete(project, arke, unit) do
{:ok, _} -> {:ok, nil}
{:error, msg} -> {:error, msg}
end
defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit_list) do
ArkeUnit.delete(project, arke, unit_list)
end

defp handle_delete(_, _, _) do
Expand Down Expand Up @@ -202,13 +232,6 @@ defmodule ArkePostgres do
Enum.to_list(data)
end

defp handle_changeset_errros(errors)when is_binary(errors), do: errors
defp handle_changeset_errros(errors) do
Enum.map(errors, fn {field, detail} ->
"#{field}: #{render_detail(detail)}"
end)
end

defp render_detail({message, values}) do
Enum.reduce(values, message, fn {k, v}, acc ->
String.replace(acc, "%{#{k}}", to_string(v))
Expand Down
120 changes: 89 additions & 31 deletions lib/arke_postgres/arke_unit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,54 @@ defmodule ArkePostgres.ArkeUnit do

@record_fields [:id, :data, :metadata, :inserted_at, :updated_at]

def insert(project, arke, %{data: data} = unit) do
row = [
id: handle_id(unit.id),
arke_id: Atom.to_string(unit.arke_id),
data: encode_unit_data(arke, data),
metadata: unit.metadata,
inserted_at: unit.inserted_at,
updated_at: unit.updated_at
]

case ArkePostgres.Repo.insert(ArkePostgres.Tables.ArkeUnit.changeset(Enum.into(row, %{})),
prefix: project
) do
{:ok, record} ->
{:ok, record}

{:error, changeset} ->
{:error, changeset.errors}
def insert(project, arke, unit_list, opts \\ []) do
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)

%{unit_list: updated_unit_list, records: records} =
Enum.reduce(unit_list, %{unit_list: [], records: []}, fn unit, acc ->
id = handle_id(unit.id)

updated_unit =
unit |> Map.put(:id, id) |> Map.put(:inserted_at, now) |> Map.put(:updated_at, now)

acc
|> Map.put(:unit_list, [updated_unit | acc.unit_list])
|> Map.put(:records, [
%{
id: id,
arke_id: Atom.to_string(unit.arke_id),
data: encode_unit_data(arke, unit.data),
metadata: unit.metadata,
inserted_at: now,
updated_at: now
}
| acc.records
])
end)

case(
ArkePostgres.Repo.insert_all(
ArkePostgres.Tables.ArkeUnit,
records,
prefix: project,
returning: true
)
) do
{0, _} ->
{:error, Error.create(:insert, "no records inserted")}

{count, inserted} ->
inserted_ids = Enum.map(inserted, & &1.id)

{valid, errors} =
Enum.split_with(updated_unit_list, fn unit ->
unit.id in inserted_ids
end)

case opts[:bulk] do
true -> {:ok, count, valid, errors}
_ -> {:ok, List.first(valid)}
end
end
end

Expand All @@ -46,23 +76,51 @@ defmodule ArkePostgres.ArkeUnit do
# TODO handle error
defp handle_id(id), do: id

def update(project, arke, %{data: data} = unit, where \\ []) do
where = Keyword.put_new(where, :arke_id, Atom.to_string(unit.arke_id))
where = Keyword.put_new(where, :id, Atom.to_string(unit.id))
def update(project, arke, unit_list, opts) do
records =
Enum.map(unit_list, fn unit ->
%{
id: to_string(unit.id),
arke_id: to_string(arke.id),
data: encode_unit_data(arke, unit.data),
metadata: Map.delete(unit.metadata, :project),
inserted_at: DateTime.to_naive(unit.inserted_at) |> NaiveDateTime.truncate(:second),
updated_at: DateTime.to_naive(unit.updated_at)
}
end)

case ArkePostgres.Repo.insert_all(
ArkePostgres.Tables.ArkeUnit,
records,
prefix: project,
on_conflict: {:replace_all_except, [:id]},
conflict_target: :id,
returning: true
) do
{0, _} ->
{:error, Error.create(:update, "no records updated")}

row = [
data: encode_unit_data(arke, data),
metadata: Map.delete(unit.metadata, :project),
updated_at: unit.updated_at
]
{count, updated} ->
updated_ids = Enum.map(updated, & &1.id)

query = from("arke_unit", where: ^where, update: [set: ^row])
ArkePostgres.Repo.update_all(query, [], prefix: project)
{valid, errors} =
Enum.split_with(unit_list, fn unit ->
to_string(unit.id) in updated_ids
end)

case opts[:bulk] do
true -> {:ok, count, valid, errors}
_ -> {:ok, List.first(valid)}
end
end
end

def delete(project, arke, unit) do
where = [arke_id: Atom.to_string(arke.id), id: Atom.to_string(unit.id)]
query = from(a in "arke_unit", where: ^where)
def delete(project, arke, unit_list) do
query =
from(a in "arke_unit",
where: a.arke_id == ^Atom.to_string(arke.id),
where: a.id in ^Enum.map(unit_list, &Atom.to_string(&1.id))
)

case ArkePostgres.Repo.delete_all(query, prefix: project) do
{0, nil} ->
Expand Down
Loading