Skip to content

Commit

Permalink
refactor(appengine): merge_device_status
Browse files Browse the repository at this point in the history
rewrite merge_device_status so that all its queries now use exandra.

refactor some private functions in their own modules

Signed-off-by: Francesco Noacco <[email protected]>
  • Loading branch information
noaccOS committed Jan 22, 2025
1 parent 00f083f commit 870b897
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 321 deletions.
214 changes: 214 additions & 0 deletions apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Astarte.AppEngine.API.Device.Aliases do
alias Astarte.Core.Device, as: CoreDevice
alias Astarte.DataAccess.Astarte.Realm
alias Astarte.DataAccess.Realms.Device
alias Astarte.DataAccess.Realms.Name
alias Ecto.Changeset

alias Astarte.DataAccess.Repo

import Ecto.Query

require Logger

defstruct to_update: %{}, to_delete: %{}

@type input :: %{alias_tag => alias_value} | [alias]
@type alias_tag :: String.t()
@type alias_value :: String.t()
@type alias :: {alias_tag, alias_value}
@type t :: %__MODULE__{
to_update: [alias],
to_delete: [alias]
}

@spec validate(input() | nil, String.t(), CoreDevice.device_id()) :: {:ok, t()} | term()
def validate(nil, _, _), do: {:ok, %__MODULE__{to_delete: [], to_update: []}}

def validate(aliases, realm_name, device_id) do
with :ok <- validate_format(aliases),
:ok <- validate_device_ownership(aliases, realm_name, device_id) do
{to_delete, to_update} =
aliases
|> Enum.split_with(fn {_key, value} -> is_nil(value) end)

{:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}}
end
end

@spec apply(Changeset.t(), t()) :: Changeset.t()
def apply(changeset, aliases) do
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases

changeset
|> apply_delete(to_delete)
|> apply_update(to_update)
end

@spec validate_format(input()) :: :ok | {:error, :invalid_alias}
defp validate_format(aliases) do
Enum.find_value(aliases, :ok, fn
{_tag, ""} ->
:invalid_value

{"", _value} ->
:invalid_tag

_valid_format_tag ->
false
end)
|> case do
:ok ->
:ok

:invalid_tag ->
Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key)
{:error, :invalid_alias}

:invalid_value ->
Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value)
{:error, :invalid_alias}
end
end

@spec validate_device_ownership(input(), String.t(), CoreDevice.device_id()) :: :ok
defp validate_device_ownership(aliases, realm_name, device_id) do
keyspace = Realm.keyspace_name(realm_name)

chunked_aliases =
aliases |> Enum.map(fn {_tag, value} -> value end) |> Enum.chunk_every(99)

results =
for alias_chunk <- chunked_aliases do
from(n in Name, where: n.object_type == 1 and n.object_name in ^alias_chunk)
|> Repo.all(prefix: keyspace)
end
|> List.flatten()

invalid_name =
results |> Enum.find(fn name -> name.object_uuid != device_id end)

if is_nil(invalid_name) do
:ok
else
{invalid_tag, invalid_value} =
aliases |> Enum.find(fn {_tag, value} -> value == invalid_name.object_name end)

deleting? = is_nil(invalid_value)

if deleting? do
Logger.error("Inconsistent alias for #{invalid_tag}.",
device_id: device_id,
tag: "inconsistent_alias"
)

{:error, :database_error}
else
{:error, :alias_already_in_use}
end
end
end

@spec generate_batch_queries(t(), String.t(), Device.t()) :: [{String.t(), list()}]
def generate_batch_queries(aliases, keyspace, device) do
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases

{update_tags, update_values} = Enum.unzip(to_update)
{delete_tags, _delete_values} = Enum.unzip(to_delete)

all_tags = delete_tags ++ update_tags

tags_to_delete =
device.aliases
|> Enum.filter(fn {tag, _value} -> tag in all_tags end)

# we delete both aliases we mean to delete, and also existing aliases we want to update
# as the name is part of the primary key for the names table
delete_queries =
tags_to_delete
|> Enum.map(fn {_tag, value} -> value end)
|> Enum.chunk_every(99)
|> Enum.map(fn alias_chunk ->
query =
from n in Name,
prefix: ^keyspace,
where: n.object_type == 1 and n.object_name in ^alias_chunk

Repo.to_sql(:delete_all, query)
end)

insert_queries =
update_values
|> Enum.map(&update_batch_query(keyspace, device.device_id, &1))

delete_queries ++ insert_queries
end

defp update_batch_query(keyspace, device_id, value) do
names_table = %Name{}.__meta__.source

query =
"INSERT INTO #{keyspace}.#{names_table} (object_type, object_name, object_uuid) VALUES (1, ?, ?)"

params = [value, device_id]
{query, params}
end

@spec apply_delete(Changeset.t(), [alias]) :: Changeset.t()
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases),
do: changeset

defp apply_delete(changeset, delete_aliases) when length(delete_aliases) == 0,
do: changeset

defp apply_delete(changeset, delete_aliases) do
aliases = changeset |> Changeset.fetch_field!(:aliases)

{delete_tags_list, _delete_values} = Enum.unzip(delete_aliases)
delete_tags = delete_tags_list |> MapSet.new()

device_aliases = aliases |> Map.keys() |> MapSet.new()

if MapSet.subset?(delete_tags, device_aliases) do
aliases = aliases |> Map.drop(delete_tags_list)

changeset
|> Changeset.put_change(:aliases, aliases)
else
Changeset.add_error(changeset, :aliases, "", reason: :alias_tag_not_found)
end
end

@spec apply_update(Changeset.t(), [alias]) :: Changeset.t()
defp apply_update(%Changeset{valid?: false} = changeset, _update_aliases),
do: changeset

defp apply_update(changeset, update_aliases) when length(update_aliases) == 0,
do: changeset

defp apply_update(changeset, update_aliases) do
aliases =
changeset |> Changeset.fetch_field!(:aliases)

aliases = Map.merge(aliases, Map.new(update_aliases))

Changeset.put_change(changeset, :aliases, aliases)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# This file is part of Astarte.
#
# Copyright 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule Astarte.AppEngine.API.Device.Attributes do
alias Ecto.Changeset

require Logger

defstruct to_update: %{}, to_delete: %{}

@type input :: %{alias_tag => alias_value} | [alias]
@type alias_tag :: String.t()
@type alias_value :: String.t()
@type alias :: {alias_tag, alias_value}
@type t :: %__MODULE__{
to_update: [alias],
to_delete: [alias]
}

@spec validate(input() | nil) :: {:ok, t()} | term()
def validate(attributes) do
attributes =
case attributes do
nil -> []
attributes -> attributes
end

with :ok <- validate_format(attributes) do
# :ok <- validate_device_ownership(aliases, realm_name, device_id) do
{to_delete, to_update} =
attributes
|> Enum.split_with(fn {_key, value} -> is_nil(value) end)

{:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}}
end
end

@spec apply(Changeset.t(), t()) :: Changeset.t()
def apply(changeset, aliases) do
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases

changeset
|> apply_delete(to_delete)
|> apply_update(to_update)
end

@spec validate_format(input()) :: :ok | {:error, :invalid_alias}
defp validate_format(attributes) do
invalid_attribute? =
Enum.any?(attributes, fn {attribute_key, _value} -> attribute_key == "" end)

if invalid_attribute? do
Logger.warning("Attribute key cannot be an empty string.",
tag: :invalid_attribute_empty_key
)

{:error, :invalid_attributes}
else
:ok
end
end

@spec apply_delete(Changeset.t(), [alias]) :: Changeset.t()
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases), do: changeset

defp apply_delete(changeset, delete_attributes) when length(delete_attributes) == 0,
do: changeset

defp apply_delete(changeset, delete_attributes) do
attributes = changeset |> Changeset.fetch_field!(:attributes)

{delete_keys, _delete_values} = Enum.unzip(delete_attributes)
attributes_to_delete = delete_keys |> MapSet.new()

device_attributes = attributes |> Map.keys() |> MapSet.new()

if MapSet.subset?(attributes_to_delete, device_attributes) do
attributes = attributes |> Map.drop(delete_keys)

changeset
|> Changeset.put_change(:attributes, attributes)
else
Changeset.add_error(changeset, :attributes, "", reason: :attribute_key_not_found)
end
end

@spec apply_update(Changeset.t(), [alias]) :: Changeset.t()
defp apply_update(%Changeset{valid?: false} = changeset, _update_attributes), do: changeset

defp apply_update(changeset, update_attributes) when length(update_attributes) == 0,
do: changeset

defp apply_update(changeset, update_attributes) do
attributes =
changeset |> Changeset.fetch_field!(:attributes)

attributes = Map.merge(attributes, Map.new(update_attributes))

# Works because inserts in CQL are upserts
Changeset.put_change(changeset, :attributes, attributes)
end
end
Loading

0 comments on commit 870b897

Please sign in to comment.