Skip to content

Commit

Permalink
Merge pull request #219 from mattrent/improvements
Browse files Browse the repository at this point in the history
Improvements
  • Loading branch information
mattrent authored Jun 11, 2024
2 parents cb75e37 + 9dc5698 commit a0b4246
Show file tree
Hide file tree
Showing 28 changed files with 735 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
defmodule Core.APPScripts do
defmodule Core.Domain.APPScripts do
@moduledoc """
The APPScripts context.
"""

import Ecto.Query, warn: false
alias Core.Repo

alias Core.APPScripts.APP
alias Core.Schemas.APPScripts.APP

@doc """
Returns the list of app_scripts.
Expand Down
1 change: 1 addition & 0 deletions core/lib/core/domain/functions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Core.Domain.Functions do
[]
"""
@spec get_by_name_in_mod!(String.t(), String.t()) :: [any()]
def get_by_name_in_mod!(fun_name, mod_name) do
q =
from(f in Function,
Expand Down
73 changes: 73 additions & 0 deletions core/lib/core/domain/functions_metadata.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2024 Giuseppe De Palma, Matteo Trentin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Core.FunctionsMetadata do
@moduledoc """
The FunctionsMetadata context.
"""

import Ecto.Query, warn: false
alias Core.Repo

alias Core.Schemas.FunctionMetadata

@doc """
Creates a function_metadata.
## Examples
iex> create_function_metadata(%{field: value})
{:ok, %FunctionMetadata{}}
iex> create_function_metadata(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_function_metadata(attrs \\ %{}) do
%FunctionMetadata{}
|> FunctionMetadata.changeset(attrs)
|> Repo.insert()
end

@doc """
Updates a function_metadata.
## Examples
iex> update_function_metadata(function_metadata, %{field: new_value})
{:ok, %FunctionMetadata{}}
iex> update_function_metadata(function_metadata, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_function_metadata(%FunctionMetadata{} = function_metadata, attrs) do
function_metadata
|> FunctionMetadata.changeset(attrs)
|> Repo.update()
end

@doc """
Gets function metadata from a function's ID.
Returns `{:ok, %FunctionMetadata}` if the metadata is found,
`{:error, :not_found}` otherwise.
"""
def get_function_metadata_by_function_id(function_id) do
case Repo.get_by(FunctionMetadata, function_id: function_id) do
nil -> {:error, :not_found}
m -> {:ok, m}
end
end
end
62 changes: 34 additions & 28 deletions core/lib/core/domain/invoker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Core.Domain.Invoker do
"""
require Logger

alias Core.FunctionsMetadata
alias Data.FunctionMetadata

alias Core.Domain.{
Expand Down Expand Up @@ -56,38 +57,43 @@ defmodule Core.Domain.Invoker do
def invoke(ivk) do
Logger.info("Invoker: invocation for #{ivk.module}/#{ivk.function} requested")

case Functions.get_code_by_name_in_mod!(ivk.function, ivk.module) do
[f] ->
func =
struct(FunctionStruct, %{
name: ivk.function,
module: ivk.module,
code: f.code,
hash: f.hash,
metadata: struct(FunctionMetadata, %{})
})

with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func) do
update_concurrent(worker, +1)

out =
case invoke_without_code(worker, ivk, f.hash) do
{:error, :code_not_found, handler} ->
worker
|> invoke_with_code(handler, ivk, func)
|> save_to_sinks(ivk.module, ivk.function)

res ->
save_to_sinks(res, ivk.module, ivk.function)
end

update_concurrent(worker, -1)
with [f] <- Functions.get_by_name_in_mod!(ivk.function, ivk.module),
{:ok, metadata} <- FunctionsMetadata.get_function_metadata_by_function_id(f.id) do
func =
struct(FunctionStruct, %{
name: ivk.function,
module: ivk.module,
hash: f.hash,
code: nil,
metadata: struct(FunctionMetadata, %{tag: metadata.tag, capacity: metadata.capacity})
})

with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config) do
update_concurrent(worker, +1)

out =
case invoke_without_code(worker, ivk, f.hash) do
{:error, :code_not_found, handler} ->
[%{code: code}] = Functions.get_code_by_name_in_mod!(ivk.function, ivk.module)

worker
|> invoke_with_code(handler, ivk, func |> Map.put(:code, code))
|> save_to_sinks(ivk.module, ivk.function)

res ->
save_to_sinks(res, ivk.module, ivk.function)
end

out
end
update_concurrent(worker, -1)

out
end
else
[] ->
{:error, :not_found}

e ->
e
end
end

Expand Down
65 changes: 65 additions & 0 deletions core/lib/core/domain/policies/parsers/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,69 @@ defmodule Core.Domain.Policies.Parsers.APP do
def parse_block(_) do
{:error, :no_block_workers}
end

@doc """
Converts a given APP script struct to a simple map.
"""
@spec to_map(Data.Configurations.APP.t()) :: map()
def to_map(app_script) do
%{tags: tags} = outer_map = Map.from_struct(app_script)

tag_maps =
tags
|> Map.new(fn {k, v} -> {k, v |> Map.from_struct() |> blocks_to_maps()} end)

outer_map |> Map.put(:tags, tag_maps)
end

@spec blocks_to_maps(Data.Configurations.APP.Tag.t()) :: map()
defp blocks_to_maps(%{blocks: blocks} = tag_map) do
block_maps =
blocks
|> Enum.map(&Map.from_struct(&1))

tag_map |> Map.put(:blocks, block_maps)
end

@doc """
Builds an APP script from a map with string keys (assuming it has the correct structure).
"""
@spec from_string_keys(map()) :: Data.Configurations.APP.t() | {:error, :unknown_construct}
def from_string_keys(%{"tags" => tags}) do
atom_tags = tags |> Map.new(fn {k, v} -> {k, v |> tag_from_string_keys} end)
%Data.Configurations.APP{tags: atom_tags}
end

def from_string_keys(_) do
{:error, :unknown_construct}
end

defp tag_from_string_keys(%{"blocks" => blocks, "followup" => followup}) do
atom_followup = followup |> String.to_existing_atom()
atom_blocks = blocks |> Enum.map(&block_from_string_keys/1)
%Data.Configurations.APP.Tag{blocks: atom_blocks, followup: atom_followup}
end

defp block_from_string_keys(%{
"affinity" => %{"affinity" => affinity, "antiaffinity" => antiaffinity},
"invalidate" => invalidate,
"strategy" => strategy,
"workers" => workers
}) do
atom_invalidate =
invalidate
|> Map.new(fn {k, v} ->
case v do
"infinity" -> {String.to_existing_atom(k), :infinity}
val -> {String.to_existing_atom(k), val}
end
end)

%Data.Configurations.APP.Block{
affinity: %{affinity: affinity, antiaffinity: antiaffinity},
invalidate: atom_invalidate,
strategy: String.to_existing_atom(strategy),
workers: workers
}
end
end
12 changes: 7 additions & 5 deletions core/lib/core/domain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ defmodule Core.Domain.Scheduler do
require Logger

@type worker_atom :: atom()
@type configuration :: Data.Configurations.Empty.t() | Data.Configurations.APP.t()

@doc """
Receives a list of workers and chooses one which can be used for invocation.
"""
@spec select([worker_atom()], Data.FunctionStruct.t()) ::
@spec select([worker_atom()], Data.FunctionStruct.t(), configuration()) ::
{:ok, worker_atom()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select([], _) do
def select([], _, _) do
Logger.warn("Scheduler: tried selection with NO workers")
{:error, :no_workers}
end

def select(workers, function) do
# NOTE: if we move this to a NIF, we should only pass
# configuration information (to avoid serialising all parameters)
def select(workers, function, config) do
Logger.info("Scheduler: selection with #{length(workers)} workers")

# Get the resources
Expand All @@ -48,9 +51,8 @@ defmodule Core.Domain.Scheduler do
{:ok, Enum.random(workers)}

[_ | _] ->
# This will be expanded to allow use of multiple policies (depending on function metadata)
case SchedulingPolicy.select(
%Data.Configurations.Empty{},
config,
resources,
function
) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Core.APPScripts.APP do
defmodule Core.Schemas.APPScripts.APP do
@moduledoc """
The APP script schema.
"""
Expand All @@ -21,7 +21,7 @@ defmodule Core.APPScripts.APP do

schema "app_scripts" do
field(:name, :string)
field(:script, :string)
field(:script, :map)

timestamps()
end
Expand Down
3 changes: 3 additions & 0 deletions core/lib/core/schemas/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Core.Schemas.Function do
@moduledoc """
The Function schema.
"""
alias Core.Schemas.FunctionMetadata
use Ecto.Schema
import Ecto.Changeset

Expand All @@ -27,6 +28,8 @@ defmodule Core.Schemas.Function do
timestamps()

belongs_to(:module, Core.Schemas.Module, foreign_key: :module_id)

has_one(:metadata, FunctionMetadata, on_delete: :delete_all)
end

@doc false
Expand Down
38 changes: 38 additions & 0 deletions core/lib/core/schemas/function_metadata.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2024 Giuseppe De Palma, Matteo Trentin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Core.Schemas.FunctionMetadata do
@moduledoc """
The FunctionMetadata schema.
"""
use Ecto.Schema
import Ecto.Changeset

schema "function_metadata" do
field(:capacity, :integer)
field(:tag, :string)

timestamps()
belongs_to(:function, Core.Schemas.Function, foreign_key: :function_id)
end

@doc false
def changeset(function_metadata, attrs) do
function_metadata
|> cast(attrs, [:tag, :capacity, :function_id])
|> validate_required([:function_id])
|> unique_constraint(:function_id)
|> foreign_key_constraint(:function_id)
end
end
19 changes: 15 additions & 4 deletions core/lib/core_web/controllers/app_script_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
defmodule CoreWeb.APPScriptController do
use CoreWeb, :controller

alias Core.APPScripts
alias Core.APPScripts.APP
alias Core.Domain.APPScripts
alias Core.Domain.Policies.Parsers
alias Core.Schemas.APPScripts.APP

action_fallback(CoreWeb.FallbackController)

Expand All @@ -24,14 +25,24 @@ defmodule CoreWeb.APPScriptController do
render(conn, :index, app_scripts: app_scripts)
end

def create(conn, %{"app_script" => app_script_params}) do
with {:ok, %APP{} = app_script} <- APPScripts.create_app_script(app_script_params) do
def create(conn, %{"name" => script_name, "file" => %Plug.Upload{path: tmp_path}}) do
with {:ok, app_script_string} <- File.read(tmp_path),
{:ok, app_script} <- Parsers.APP.parse(app_script_string),
{:ok, %APP{} = app_script} <-
APPScripts.create_app_script(%{
name: script_name,
script: app_script |> Parsers.APP.to_map()
}) do
conn
|> put_status(:created)
|> render(:show, app_script: app_script)
end
end

def create(_, _) do
{:error, :bad_params}
end

def show(conn, %{"app_name" => name}) do
app_script = APPScripts.get_app_script_by_name(name)
render(conn, :show, app_script: app_script)
Expand Down
Loading

0 comments on commit a0b4246

Please sign in to comment.