Skip to content

Commit

Permalink
Merge pull request #226 from mattrent/cherrypicked-fixes
Browse files Browse the repository at this point in the history
Cherrypicked fixes
  • Loading branch information
mattrent authored Jan 23, 2025
2 parents a0b4246 + 5fc4a11 commit 42c9d3a
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 40 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ export SHELLOPTS:=$(if $(SHELLOPTS),$(SHELLOPTS):)pipefail:errexit

## Compile core docker image
build-core-image:
cd core
docker build \
-f core/Dockerfile \
--build-arg SECRET_KEY_BASE=local-make-secret \
--build-arg MIX_ENV="prod" \
-t core .

## Compile worker docker image
build-worker-image:
cd worker
docker build --build-arg MIX_ENV="prod" -t worker .
docker build -f worker/Dockerfile --build-arg MIX_ENV="prod" -t worker .

## Run credo --strict
credo-core:
Expand Down
2 changes: 1 addition & 1 deletion core/lib/core/adapters/commands/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Core.Adapters.Commands.Test do
alias Data.InvokeResult

@impl true
def send_invoke(_worker, name, _ns, _hash, _args) do
def send_invoke(_worker, name, _ns, _hash, _args, _metadata \\ {}) do
{:ok, %InvokeResult{result: name}}
end

Expand Down
4 changes: 2 additions & 2 deletions core/lib/core/adapters/commands/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ defmodule Core.Adapters.Commands.Worker do
# Only the send_invoke call should return this.

@impl true
def send_invoke(worker, name, mod, hash, args) do
def send_invoke(worker, name, mod, hash, args, metadata \\ {}) do
worker_addr = {:worker, worker}
cmd = {:invoke, %{name: name, module: mod, hash: hash}, args}
cmd = {:invoke, %{name: name, module: mod, hash: hash, metadata: metadata}, args}
Logger.info("Sending invoke for #{mod}/#{name} to #{inspect(worker_addr)}")

case GenServer.call(worker_addr, cmd, 60_000) do
Expand Down
3 changes: 2 additions & 1 deletion core/lib/core/adapters/telemetry/collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ defmodule Core.Adapters.Telemetry.Collector do
Enum.reduce(result_list, %{}, fn result, acc ->
case result do
%{"metric" => %{"__name__" => name}, "value" => [_, value]} ->
Map.put(acc, name, value)
{float_val, _} = Float.parse(value)
Map.put(acc, name, float_val)

_ ->
acc
Expand Down
12 changes: 6 additions & 6 deletions core/lib/core/domain/invoker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Core.Domain.Invoker do
require Logger

alias Core.FunctionsMetadata
alias Data.FunctionMetadata

alias Core.Domain.{
Functions,
Expand All @@ -30,6 +29,7 @@ defmodule Core.Domain.Invoker do
Scheduler
}

alias Data.FunctionMetadata
alias Data.FunctionStruct
alias Data.InvokeParams
alias Data.InvokeResult
Expand Down Expand Up @@ -68,11 +68,11 @@ defmodule Core.Domain.Invoker do
metadata: struct(FunctionMetadata, %{tag: metadata.tag, capacity: metadata.capacity})
})

with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config) do
with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config, ivk.args) do
update_concurrent(worker, +1)

out =
case invoke_without_code(worker, ivk, f.hash) do
case invoke_without_code(worker, ivk, f.hash, func.metadata) do
{:error, :code_not_found, handler} ->
[%{code: code}] = Functions.get_code_by_name_in_mod!(ivk.function, ivk.module)

Expand Down Expand Up @@ -121,12 +121,12 @@ defmodule Core.Domain.Invoker do
end
end

@spec invoke_without_code(atom(), InvokeParams.t(), binary()) ::
@spec invoke_without_code(atom(), InvokeParams.t(), binary(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | invoke_errors()
def invoke_without_code(worker, ivk, hash) do
def invoke_without_code(worker, ivk, hash, metadata \\ %FunctionMetadata{}) do
Logger.debug("Invoker: invoking #{ivk.module}/#{ivk.function} without code")
# send invocation without code
Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args)
Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args, metadata)
end

@spec invoke_with_code(atom(), pid(), InvokeParams.t(), FunctionStruct.t()) ::
Expand Down
17 changes: 11 additions & 6 deletions core/lib/core/domain/policies/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
- configuration: an APP script (Data.Configurations.APP), generally obtained through parsing using the Core.Domain.Policies.Parsers.APP module.
- workers: a list of Data.Worker structs, each with relevant worker metrics.
- function: a Data.FunctionStruct struct, with the necessary function information. It must contain function metadata, specifically a tag and a capacity.
- args: not used
## Returns
- {:ok, wrk} if a suitable worker was found, with `wrk` being the worker.
Expand All @@ -43,12 +44,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
- {:error, :no_function_metadata} if the given FunctionStruct does not include the necessary metadata (i.e. tag, capacity).
- {:error, :invalid_input} if the given input was invalid in any other way (e.g. wrong types).
"""
@spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t()) ::
@spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t(), map()) ::
{:ok, Data.Worker.t()} | select_errors()
def select(config, workers, function, args \\ %{})

def select(
%APP{tags: tags} = _configuration,
[_ | _] = workers,
%Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function
%Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function,
_
) do
default = tags |> Map.get("default")
tag = tags |> Map.get(tag_name, default)
Expand All @@ -75,15 +79,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
end
end

def select(%APP{tags: _}, [], _) do
def select(%APP{tags: _}, [], _, _) do
{:error, :no_workers}
end

def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}) do
def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}, _) do
{:error, :no_function_metadata}
end

def select(_, _, _) do
def select(_, _, _, _) do
{:error, :invalid_input}
end

Expand Down Expand Up @@ -158,7 +162,8 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
Core.Domain.Policies.SchedulingPolicy.select(
%Data.Configurations.Empty{},
wrk,
function
function,
%{}
)
end
end
Expand Down
9 changes: 6 additions & 3 deletions core/lib/core/domain/policies/default.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
# since otherwise in situations where e.g. Prometheus is down, we would always have no workers
@spec select(Empty.t(), [Data.Worker.t()], Data.FunctionStruct.t()) ::
{:ok, Data.Worker.t()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select(config, workers, function, args \\ %{})

def select(
%Empty{},
[_ | _] = workers,
%Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}}
%Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}},
_
) do
selected_worker =
workers
Expand All @@ -45,11 +48,11 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
end
end

def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}) do
def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}, _) do
{:error, :no_function_metadata}
end

def select(%Empty{}, [], _) do
def select(%Empty{}, [], _, _) do
{:error, :no_workers}
end
end
4 changes: 2 additions & 2 deletions core/lib/core/domain/policies/scheduling_policy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defprotocol Core.Domain.Policies.SchedulingPolicy do
@doc """
Should select a worker from a list of workers, given a specific configuration.
"""
@spec select(t, [Data.Worker.t()], Data.FunctionStruct.t()) ::
@spec select(t, [Data.Worker.t()], Data.FunctionStruct.t(), map()) ::
{:ok, Data.Worker.t()} | {:error, any}
def select(configuration, workers, function)
def select(configuration, workers, function, args)
end
7 changes: 4 additions & 3 deletions core/lib/core/domain/ports/commands.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ defmodule Core.Domain.Ports.Commands do
Port for sending commands to workers.
"""

alias Data.FunctionMetadata
alias Data.FunctionStruct
alias Data.InvokeResult

@adapter :core |> Application.compile_env!(__MODULE__) |> Keyword.fetch!(:adapter)

@callback send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
@callback send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
@callback send_invoke_with_code(atom(), pid(), FunctionStruct.t()) ::
{:ok, InvokeResult.t()} | {:error, any()}
Expand All @@ -36,9 +37,9 @@ defmodule Core.Domain.Ports.Commands do
Sends an invoke command to a worker passing the function name, module, hash and args.
It requires a worker (a fully qualified name of another node with the :worker actor on) and function arguments can be empty.
"""
@spec send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
@spec send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
defdelegate send_invoke(worker, f_name, ns, hash, args), to: @adapter
defdelegate send_invoke(worker, f_name, ns, hash, args, metadata), to: @adapter

@doc """
Sends an invoke command to a worker passing a struct with the function name, module and the code (wasm file binary).
Expand Down
9 changes: 6 additions & 3 deletions core/lib/core/domain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ defmodule Core.Domain.Scheduler do
"""
@spec select([worker_atom()], Data.FunctionStruct.t(), configuration()) ::
{:ok, worker_atom()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select([], _, _) do
def select(workers, function, config, args \\ %{})

def select([], _, _, _args) do
Logger.warn("Scheduler: tried selection with NO workers")
{:error, :no_workers}
end

# NOTE: if we move this to a NIF, we should only pass
# configuration information (to avoid serialising all parameters)
def select(workers, function, config) do
def select(workers, function, config, args) do
Logger.info("Scheduler: selection with #{length(workers)} workers")

# Get the resources
Expand All @@ -54,7 +56,8 @@ defmodule Core.Domain.Scheduler do
case SchedulingPolicy.select(
config,
resources,
function
function,
args
) do
{:ok, wrk} -> {:ok, wrk.name}
{:error, err} -> {:error, err}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/core_web/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule CoreWeb.Plug.Authenticate do
_error ->
conn
|> put_status(:unauthorized)
|> Phoenix.Controller.put_view(CoreWeb.ErrorView)
|> Phoenix.Controller.put_view(CoreWeb.ErrorJSON)
|> Phoenix.Controller.render(:"401")
|> halt()
end
Expand Down
8 changes: 4 additions & 4 deletions core/test/core/integration/invoke_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)

pars = %InvokeParams{function: function.name, module: module.name}
assert Invoker.invoke(pars) == {:error, {:exec_error, "some error"}}
Expand All @@ -99,7 +99,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)

{:ok, resources} = Core.Telemetry.Metrics.Mock.resources(:worker@localhost)
concurrent = resources |> Map.get(:concurrent_functions, 0)
Expand Down Expand Up @@ -133,7 +133,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:core@somewhere, :worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn worker, _, _, _, _ ->
|> Mox.expect(:send_invoke, fn worker, _, _, _, _, _ ->
{:ok, %InvokeResult{result: worker}}
end)

Expand Down Expand Up @@ -168,7 +168,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end)

Core.Commands.Mock
|> Mox.expect(:send_invoke_with_code, fn _worker, _handler, function ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 200)
Expand All @@ -467,7 +467,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}", args: %{name: "World"})

Expand All @@ -482,7 +482,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 200)
Expand Down Expand Up @@ -519,7 +519,9 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some reason"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ ->
{:error, {:exec_error, "some reason"}}
end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 422)
Expand Down
2 changes: 1 addition & 1 deletion data/lib/function_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ defmodule Data.FunctionMetadata do
tag: String.t(),
capacity: integer()
}
defstruct tag: nil, capacity: -1
defstruct tag: "", capacity: -1
end

0 comments on commit 42c9d3a

Please sign in to comment.