Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle function deletion in workers #215

Merged
merged 11 commits into from
Feb 25, 2024
12 changes: 11 additions & 1 deletion core/lib/core/adapters/commands/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Core.Adapters.Commands.Test do
alias Data.InvokeResult

@impl true
def send_invoke(_worker, name, _ns, _args) do
def send_invoke(_worker, name, _ns, _hash, _args) do
{:ok, %InvokeResult{result: name}}
end

Expand All @@ -32,4 +32,14 @@ defmodule Core.Adapters.Commands.Test do
def send_store_function(_worker, _func) do
:ok
end

@impl true
def send_delete_function(_worker, _func, _mod, _hash) do
:ok
end

@impl true
def send_update_function(_worker, _prev_hash, _func) do
:ok
end
end
28 changes: 25 additions & 3 deletions core/lib/core/adapters/commands/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ defmodule Core.Adapters.Commands.Worker do
# Only the send_invoke call should return this.

@impl true
def send_invoke(worker, name, mod, args) do
def send_invoke(worker, name, mod, hash, args) do
worker_addr = {:worker, worker}
cmd = {:invoke, %{name: name, module: mod}, args}
cmd = {:invoke, %{name: name, module: mod, hash: hash}, args}
Logger.info("Sending invoke for #{mod}/#{name} to #{inspect(worker_addr)}")

case GenServer.call(worker_addr, cmd, 60_000) do
Expand All @@ -45,7 +45,7 @@ defmodule Core.Adapters.Commands.Worker do
end

@impl true
def send_invoke_with_code(_worker, worker_handler, %FunctionStruct{code: _} = func) do
def send_invoke_with_code(_worker, worker_handler, %FunctionStruct{code: _, hash: _} = func) do
worker_addr = worker_handler
cmd = {:invoke, func}

Expand All @@ -68,4 +68,26 @@ defmodule Core.Adapters.Commands.Worker do

GenServer.call(worker_addr, cmd, 60_000)
end

@impl true
def send_delete_function(worker, name, mod, hash) do
worker_addr = {:worker, worker}
cmd = {:delete_function, name, mod, hash}

Logger.info("Sending delete_function for #{mod}/#{name} to #{inspect(worker_addr)}")

GenServer.call(worker_addr, cmd, 60_000)
end

@impl true
def send_update_function(worker, prev_hash, %FunctionStruct{} = function) do
worker_addr = {:worker, worker}
cmd = {:update_function, prev_hash, function}

Logger.info(
"Sending update_function for #{function.module}/#{function.name} to #{inspect(worker_addr)}"
)

GenServer.call(worker_addr, cmd, 60_000)
end
end
4 changes: 2 additions & 2 deletions core/lib/core/domain/functions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ defmodule Core.Domain.Functions do
join: m in Module,
on: f.module_id == m.id,
where: m.name == ^mod_name and f.name == ^fun_name,
select: %Function{id: f.id, name: f.name, module_id: f.module_id}
select: %Function{id: f.id, name: f.name, module_id: f.module_id, hash: f.hash}
)

Repo.all(q)
Expand All @@ -107,7 +107,7 @@ defmodule Core.Domain.Functions do
join: m in Module,
on: f.module_id == m.id,
where: m.name == ^mod_name and f.name == ^fun_name,
select: %Function{code: f.code}
select: %Function{code: f.code, hash: f.hash}
)

Repo.all(q)
Expand Down
22 changes: 12 additions & 10 deletions core/lib/core/domain/invoker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,20 @@ defmodule Core.Domain.Invoker do

case Functions.get_code_by_name_in_mod!(ivk.function, ivk.module) do
[f] ->
func = %FunctionStruct{
name: ivk.function,
module: ivk.module,
code: f.code,
metadata: struct(FunctionMetadata, %{})
}
func =
struct(FunctionStruct, %{
name: ivk.function,
module: ivk.module,
code: f.code,
hash: f.hash,
metadata: struct(FunctionMetadata, %{})
})

with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func) do
update_concurrent(worker, +1)

out =
case invoke_without_code(worker, ivk) do
case invoke_without_code(worker, ivk, f.hash) do
{:error, :code_not_found, handler} ->
worker
|> invoke_with_code(handler, ivk, func)
Expand Down Expand Up @@ -113,12 +115,12 @@ defmodule Core.Domain.Invoker do
end
end

@spec invoke_without_code(atom(), InvokeParams.t()) ::
@spec invoke_without_code(atom(), InvokeParams.t(), binary()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | invoke_errors()
def invoke_without_code(worker, ivk) do
def invoke_without_code(worker, ivk, hash) do
Logger.debug("Invoker: invoking #{ivk.module}/#{ivk.function} without code")
# send invocation without code
Commands.send_invoke(worker, ivk.function, ivk.module, ivk.args)
Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args)
end

@spec invoke_with_code(atom(), pid(), InvokeParams.t(), FunctionStruct.t()) ::
Expand Down
13 changes: 9 additions & 4 deletions core/lib/core/domain/policies/default.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
alias Data.Configurations.Empty

# select the worker with the highest available memory; if no memory information is available, it's treated as 0
# NOTE: we choose to include workers for which we have no data,
# since otherwise in situations where e.g. Prometheus is down, we would always have no workers
@spec select(Empty.t(), [Data.Worker.t()], Data.FunctionStruct.t()) ::
{:ok, Data.Worker.t()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select(
Expand All @@ -24,12 +27,14 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
) do
selected_worker =
workers
|> Enum.filter(fn %Data.Worker{resources: %{memory: %{available: available}}} ->
c <= available
|> Enum.filter(fn
%Data.Worker{resources: %{memory: %{available: available}}} -> c <= available
%Data.Worker{} -> true
end)
|> Enum.max_by(
fn %Data.Worker{resources: %{memory: %{available: available}}} ->
available
fn
%Data.Worker{resources: %{memory: %{available: available}}} -> available
%Data.Worker{} -> 0
end,
fn -> nil end
)
Expand Down
59 changes: 55 additions & 4 deletions core/lib/core/domain/ports/commands.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ defmodule Core.Domain.Ports.Commands do

@adapter :core |> Application.compile_env!(__MODULE__) |> Keyword.fetch!(:adapter)

@callback send_invoke(atom(), String.t(), String.t(), map()) ::
@callback send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
@callback send_invoke_with_code(atom(), pid(), FunctionStruct.t()) ::
{:ok, InvokeResult.t()} | {:error, any()}
@callback send_store_function(atom(), FunctionStruct.t()) ::
:ok | {:error, :invalid_input} | {:error, any()}
@callback send_delete_function(atom(), String.t(), String.t(), binary()) ::
:ok | {:error, any()}
@callback send_update_function(atom(), binary(), FunctionStruct.t()) :: :ok | {:error, any()}

@doc """
Sends an invoke command to a worker passing the function name, module and args.
Sends an invoke command to a worker passing the function name, module, hash and args.
It requires a worker (a fully qualified name of another node with the :worker actor on) and function arguments can be empty.
"""
@spec send_invoke(atom(), String.t(), String.t(), map()) ::
@spec send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
defdelegate send_invoke(worker, f_name, ns, args), to: @adapter
defdelegate send_invoke(worker, f_name, ns, hash, args), to: @adapter

@doc """
Sends an invoke command to a worker passing a struct with the function name, module and the code (wasm file binary).
Expand All @@ -58,4 +61,52 @@ defmodule Core.Domain.Ports.Commands do
@spec send_store_function(atom(), FunctionStruct.t()) ::
:ok | {:error, :invalid_input} | {:error, any()}
defdelegate send_store_function(worker, function), to: @adapter

@doc """
Sends a delete_function command to a worker, passing the function's name and module.
The worker should delete the function's code stored locally (both raw and pre-compiled).
It requires a worker, the function's name, the function's module, and a hash of the function code,
to ensure the targeted version hasn't already been updated.
"""
@spec send_delete_function(atom(), String.t(), String.t(), binary()) :: :ok | {:error, any()}
defdelegate send_delete_function(worker, f_name, f_mod, hash), to: @adapter

@doc """
Sends an update_function command to a worker, passing a function struct containing (at least) the function's name and module.
The worker will substitute the local version of the function with the new one. In case the pre-compiled code was already cached, the new version
will be pre-compiled and cached instead.
It requires a worker (a fully qualified name of another node with the :worker actor on),
the hash of the previous version of the function, and a function struct.
"""
@spec send_update_function(atom(), binary(), FunctionStruct.t()) :: :ok | {:error, any()}
defdelegate send_update_function(worker, prev_hash, function), to: @adapter

@doc """
Sends one of the commands defined in this behaviour to all specified workers, without waiting for them to respond.
It requires a list of workers, the function to call for all workers, and the arguments for the function.
It should immediately return :ok.

This is a default implementation for this Port,
and at the time of writing there's no need to make it overridable.
"""
@spec send_to_multiple_workers([atom()], fun(), [any()]) :: :ok
def send_to_multiple_workers(workers, command, args) do
stream = Task.async_stream(workers, fn wrk -> apply(command, [wrk | args]) end)
Process.spawn(fn -> Stream.run(stream) end, [])
:ok
end

@doc """
Sends one of the commands defined in this behaviour to all specified workers, and waits for their response.
It requires a list of workers, the function to call for all workers, and the arguments for the function.
It returns a list with the response of each of the workers.

This is a default implementation for this Port,
and at the time of writing there's no need to make it overridable.
"""
@spec send_to_multiple_workers_sync([atom()], fun(), [any()]) :: [any()]
def send_to_multiple_workers_sync(workers, command, args) do
stream = Task.async_stream(workers, fn wrk -> apply(command, [wrk | args]) end)
Enum.reduce(stream, [], fn response, acc -> [response | acc] end)
end
end
16 changes: 16 additions & 0 deletions core/lib/core/schemas/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Core.Schemas.Function do
schema "functions" do
field(:code, :binary)
field(:name, :string)
field(:hash, :binary)

timestamps()

Expand All @@ -39,7 +40,22 @@ defmodule Core.Schemas.Function do
|> validate_required([:name, :code, :module_id])
|> validate_format(:name, regex, message: msg)
|> validate_length(:name, min: 1, max: 160)
|> insert_hash()
|> unique_constraint(:function_module_index_constraint, name: :function_module_index)
|> foreign_key_constraint(:module_id)
end

defp insert_hash(changeset) do
case changeset do
%Ecto.Changeset{valid?: true, changes: %{code: code}} ->
put_change(changeset, :hash, create_hash(code))

_ ->
changeset
end
end

defp create_hash(code) do
:crypto.hash(:sha3_256, code)
end
end
Loading
Loading