diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex new file mode 100644 index 000000000..5914f5682 --- /dev/null +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex @@ -0,0 +1,218 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Astarte.AppEngine.API.Device.Aliases do + alias Astarte.DataAccess.Astarte.Realm + alias Astarte.DataAccess.Realms.Device + alias Astarte.DataAccess.Realms.Name + alias Ecto.Changeset + + alias Astarte.DataAccess.Repo + + import Ecto.Query + + require Logger + + defstruct to_update: [], to_delete: [] + + @type input :: %{alias_tag => alias_value} | [alias] + @type alias_tag :: String.t() + @type alias_value :: String.t() + @type alias :: {alias_tag, alias_value} + @type t :: %__MODULE__{ + to_update: [alias], + to_delete: [alias_tag] + } + + @spec validate(input() | nil, String.t(), Device.t()) :: {:ok, t()} | term() + def validate(nil, _, _), do: {:ok, %__MODULE__{to_delete: [], to_update: []}} + + def validate(aliases, realm_name, device) do + with :ok <- validate_format(aliases) do + {to_delete, to_update} = aliases |> Enum.split_with(fn {_key, value} -> is_nil(value) end) + to_delete = to_delete |> Enum.map(fn {tag, nil} -> tag end) + state = %__MODULE__{to_delete: to_delete, to_update: to_update} + + with :ok <- validate_device_ownership(state, realm_name, device) do + {:ok, state} + end + end + end + + @spec apply(Changeset.t(), t()) :: Changeset.t() + def apply(changeset, aliases) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + changeset + |> apply_delete(to_delete) + |> apply_update(to_update) + end + + @spec validate_format(input()) :: :ok | {:error, :invalid_alias} + defp validate_format(aliases) do + Enum.find_value(aliases, :ok, fn + {_tag, ""} -> + :invalid_value + + {"", _value} -> + :invalid_tag + + _valid_format_tag -> + false + end) + |> case do + :ok -> + :ok + + :invalid_tag -> + Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key) + {:error, :invalid_alias} + + :invalid_value -> + Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value) + {:error, :invalid_alias} + end + end + + @spec validate_device_ownership(t(), String.t(), Device.t()) :: :ok + defp validate_device_ownership(aliases, realm_name, device) do + keyspace = Realm.keyspace_name(realm_name) + + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + to_delete = device.aliases |> Map.take(to_delete) |> Enum.map(fn {_tag, value} -> value end) + to_update = to_update |> Enum.map(fn {_tag, value} -> value end) + + chunked_aliases = Enum.concat(to_delete, to_update) |> Enum.chunk_every(99) + + results = + for alias_chunk <- chunked_aliases do + from(n in Name, where: n.object_type == 1 and n.object_name in ^alias_chunk) + |> Repo.all(prefix: keyspace) + end + |> List.flatten() + + invalid_name = + results |> Enum.find(fn name -> name.object_uuid != device.device_id end) + + if is_nil(invalid_name) do + :ok + else + existing_aliases = + Enum.find(device.aliases, fn {_tag, value} -> value == invalid_name.object_name end) + + inconsistent? = !is_nil(existing_aliases) + + if inconsistent? do + {invalid_tag, _value} = existing_aliases + + Logger.error("Inconsistent alias for #{invalid_tag}.", + device_id: device.device_id, + tag: "inconsistent_alias" + ) + + {:error, :database_error} + else + {:error, :alias_already_in_use} + end + end + end + + @spec generate_batch_queries(t(), String.t(), Device.t()) :: [{String.t(), list()}] + def generate_batch_queries(aliases, keyspace, device) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + {update_tags, update_values} = Enum.unzip(to_update) + + all_tags = to_delete ++ update_tags + + tags_to_delete = + device.aliases + |> Enum.filter(fn {tag, _value} -> tag in all_tags end) + + # we delete both aliases we mean to delete, and also existing aliases we want to update + # as the name is part of the primary key for the names table + delete_queries = + tags_to_delete + |> Enum.map(fn {_tag, value} -> value end) + |> Enum.chunk_every(99) + |> Enum.map(fn alias_chunk -> + query = + from n in Name, + prefix: ^keyspace, + where: n.object_type == 1 and n.object_name in ^alias_chunk + + Repo.to_sql(:delete_all, query) + end) + + insert_queries = + update_values + |> Enum.map(&update_batch_query(keyspace, device.device_id, &1)) + + delete_queries ++ insert_queries + end + + defp update_batch_query(keyspace, device_id, value) do + names_table = %Name{}.__meta__.source + + query = + "INSERT INTO #{keyspace}.#{names_table} (object_type, object_name, object_uuid) VALUES (1, ?, ?)" + + params = [value, device_id] + {query, params} + end + + @spec apply_delete(Changeset.t(), [alias]) :: Changeset.t() + defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases), + do: changeset + + defp apply_delete(changeset, delete_aliases) when length(delete_aliases) == 0, + do: changeset + + defp apply_delete(changeset, delete_aliases) do + aliases = changeset |> Changeset.fetch_field!(:aliases) + + delete_tags = delete_aliases |> MapSet.new() + + device_aliases = aliases |> Map.keys() |> MapSet.new() + + if MapSet.subset?(delete_tags, device_aliases) do + aliases = aliases |> Map.drop(delete_aliases) + + changeset + |> Changeset.put_change(:aliases, aliases) + else + Changeset.add_error(changeset, :aliases, "", reason: :alias_tag_not_found) + end + end + + @spec apply_update(Changeset.t(), [alias]) :: Changeset.t() + defp apply_update(%Changeset{valid?: false} = changeset, _update_aliases), + do: changeset + + defp apply_update(changeset, update_aliases) when length(update_aliases) == 0, + do: changeset + + defp apply_update(changeset, update_aliases) do + aliases = + changeset |> Changeset.fetch_field!(:aliases) + + aliases = Map.merge(aliases, Map.new(update_aliases)) + + Changeset.put_change(changeset, :aliases, aliases) + end +end diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex new file mode 100644 index 000000000..db33857a8 --- /dev/null +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex @@ -0,0 +1,116 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Astarte.AppEngine.API.Device.Attributes do + alias Ecto.Changeset + + require Logger + + defstruct to_update: %{}, to_delete: %{} + + @type input :: %{alias_tag => alias_value} | [alias] + @type alias_tag :: String.t() + @type alias_value :: String.t() + @type alias :: {alias_tag, alias_value} + @type t :: %__MODULE__{ + to_update: [alias], + to_delete: [alias] + } + + @spec validate(input() | nil) :: {:ok, t()} | term() + def validate(attributes) do + attributes = + case attributes do + nil -> [] + attributes -> attributes + end + + with :ok <- validate_format(attributes) do + # :ok <- validate_device_ownership(aliases, realm_name, device_id) do + {to_delete, to_update} = + attributes + |> Enum.split_with(fn {_key, value} -> is_nil(value) end) + + {:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}} + end + end + + @spec apply(Changeset.t(), t()) :: Changeset.t() + def apply(changeset, aliases) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + changeset + |> apply_delete(to_delete) + |> apply_update(to_update) + end + + @spec validate_format(input()) :: :ok | {:error, :invalid_alias} + defp validate_format(attributes) do + invalid_attribute? = + Enum.any?(attributes, fn {attribute_key, _value} -> attribute_key == "" end) + + if invalid_attribute? do + Logger.warning("Attribute key cannot be an empty string.", + tag: :invalid_attribute_empty_key + ) + + {:error, :invalid_attributes} + else + :ok + end + end + + @spec apply_delete(Changeset.t(), [alias]) :: Changeset.t() + defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases), do: changeset + + defp apply_delete(changeset, delete_attributes) when length(delete_attributes) == 0, + do: changeset + + defp apply_delete(changeset, delete_attributes) do + attributes = changeset |> Changeset.fetch_field!(:attributes) + + {delete_keys, _delete_values} = Enum.unzip(delete_attributes) + attributes_to_delete = delete_keys |> MapSet.new() + + device_attributes = attributes |> Map.keys() |> MapSet.new() + + if MapSet.subset?(attributes_to_delete, device_attributes) do + attributes = attributes |> Map.drop(delete_keys) + + changeset + |> Changeset.put_change(:attributes, attributes) + else + Changeset.add_error(changeset, :attributes, "", reason: :attribute_key_not_found) + end + end + + @spec apply_update(Changeset.t(), [alias]) :: Changeset.t() + defp apply_update(%Changeset{valid?: false} = changeset, _update_attributes), do: changeset + + defp apply_update(changeset, update_attributes) when length(update_attributes) == 0, + do: changeset + + defp apply_update(changeset, update_attributes) do + attributes = + changeset |> Changeset.fetch_field!(:attributes) + + attributes = Map.merge(attributes, Map.new(update_attributes)) + + # Works because inserts in CQL are upserts + Changeset.put_change(changeset, :attributes, attributes) + end +end diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex index 78aa832a6..ac45706d4 100644 --- a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex @@ -20,6 +20,8 @@ defmodule Astarte.AppEngine.API.Device do The Device context. """ alias Astarte.AppEngine.API.DataTransmitter + alias Astarte.AppEngine.API.Device.Aliases + alias Astarte.AppEngine.API.Device.Attributes alias Astarte.AppEngine.API.Device.Data alias Astarte.AppEngine.API.Device.DevicesList alias Astarte.AppEngine.API.Device.DevicesListOptions @@ -35,10 +37,11 @@ defmodule Astarte.AppEngine.API.Device do alias Astarte.Core.Mapping alias Astarte.Core.Mapping.EndpointsAutomaton alias Astarte.Core.Mapping.ValueType - alias Astarte.DataAccess.Database + alias Astarte.DataAccess.Astarte.Realm alias Astarte.DataAccess.Device, as: DeviceQueries alias Astarte.DataAccess.Interface, as: InterfaceQueries alias Astarte.DataAccess.Mappings + alias Astarte.DataAccess.Realms.Device, as: DatabaseDevice alias Astarte.DataAccess.Repo alias Ecto.Changeset @@ -94,96 +97,86 @@ defmodule Astarte.AppEngine.API.Device do end def merge_device_status(realm_name, encoded_device_id, device_status_merge) do - with {:ok, client} <- Database.connect(realm: realm_name), - {:ok, device_id} <- Device.decode_device_id(encoded_device_id), - {:ok, device_status} <- retrieve_device_status(realm_name, device_id), - changeset = DeviceStatus.changeset(device_status, device_status_merge), - {:ok, updated_device_status} <- Ecto.Changeset.apply_action(changeset, :update), - credentials_inhibited_change = Map.get(changeset.changes, :credentials_inhibited), - :ok <- change_credentials_inhibited(client, device_id, credentials_inhibited_change), - aliases_change = Map.get(changeset.changes, :aliases, %{}), - attributes_change = Map.get(changeset.changes, :attributes, %{}), - :ok <- update_aliases(client, device_id, aliases_change), - :ok <- update_attributes(client, device_id, attributes_change) do - # Manually merge aliases since changesets don't perform maps deep merge - merged_aliases = merge_data(device_status.aliases, updated_device_status.aliases) - merged_attributes = merge_data(device_status.attributes, updated_device_status.attributes) - - updated_map = - updated_device_status - |> Map.put(:aliases, merged_aliases) - |> Map.put(:attributes, merged_attributes) - - {:ok, updated_map} + keyspace = Realm.keyspace_name(realm_name) + + with {:ok, device_id} <- Device.decode_device_id(encoded_device_id), + {:ok, device} <- + Repo.fetch(DatabaseDevice, device_id, prefix: keyspace, error: :device_not_found), + status = DeviceStatus.from_device(device, realm_name), + {:ok, device_status} <- + update_device_status(realm_name, device, status, device_status_merge) do + {:ok, device_status} end end - defp update_attributes(client, device_id, attributes) do - Enum.reduce_while(attributes, :ok, fn - {"", _attribute_value}, _acc -> - Logger.warning("Attribute key cannot be an empty string.", - tag: :invalid_attribute_empty_key - ) - - {:halt, {:error, :invalid_attributes}} - - {attribute_key, nil}, _acc -> - case Queries.delete_attribute(client, device_id, attribute_key) do - :ok -> - {:cont, :ok} + defp update_device_status(realm_name, device, status, device_status_merge) do + keyspace = Realm.keyspace_name(realm_name) + aliases = device_status_merge["aliases"] + attributes = device_status_merge["attributes"] - {:error, reason} -> - {:halt, {:error, reason}} + with {:ok, aliases} <- Aliases.validate(aliases, realm_name, device), + {:ok, attributes} <- Attributes.validate(attributes) do + params = + case Map.fetch(device_status_merge, "credentials_inhibited") do + {:ok, credentials_inhibited} -> %{credentials_inhibited: credentials_inhibited} + :error -> %{} end - {attribute_key, attribute_value}, _acc -> - case Queries.insert_attribute(client, device_id, attribute_key, attribute_value) do - :ok -> - {:cont, :ok} + changeset = + status + |> Changeset.cast(params, [:credentials_inhibited]) + |> Aliases.apply(aliases) + |> Attributes.apply(attributes) - {:error, reason} -> - {:halt, {:error, reason}} - end - end) + case Changeset.apply_action(changeset, :update) do + {:ok, status} -> + execute_merge_queries(keyspace, device, changeset, aliases) + {:ok, status} + + {:error, changeset} -> + {:error, sanitize_error(changeset)} + end + end end - defp update_aliases(client, device_id, aliases) do - Enum.reduce_while(aliases, :ok, fn - {_alias_key, ""}, _acc -> - Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value) - {:halt, {:error, :invalid_alias}} + defp execute_merge_queries(_keyspace, _device, %Changeset{changes: changes}, _aliases) + when map_size(changes) == 0, + do: :ok - {"", _alias_value}, _acc -> - Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key) - {:halt, {:error, :invalid_alias}} + defp execute_merge_queries(keyspace, device, changeset, aliases) do + changes = + case Map.fetch(changeset.changes, :credentials_inhibited) do + {:ok, inhibit_credentials_request} -> + changeset.changes + |> Map.delete(:credentials_inhibited) + |> Map.put(:inhibit_credentials_request, inhibit_credentials_request) - {alias_key, nil}, _acc -> - case Queries.delete_alias(client, device_id, alias_key) do - :ok -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} - end + :error -> + changeset.changes + end + |> Keyword.new() - {alias_key, alias_value}, _acc -> - case Queries.insert_alias(client, device_id, alias_key, alias_value) do - :ok -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} - end - end) - end + device_query = + from d in DatabaseDevice, + prefix: ^keyspace, + where: d.device_id == ^device.device_id, + update: [set: ^changes] - defp merge_data(old_data, new_data) when is_map(old_data) and is_map(new_data) do - Map.merge(old_data, new_data) - |> Enum.reject(fn {_, v} -> v == nil end) - |> Enum.into(%{}) - end + device_query = Repo.to_sql(:update_all, device_query) + aliases_queries = Aliases.generate_batch_queries(aliases, keyspace, device) - defp change_credentials_inhibited(_client, _device_id, nil) do - :ok + queries = [device_query | aliases_queries] + + Exandra.execute_batch(Repo, %Exandra.Batch{queries: queries}, consistency: :each_quorum) end - defp change_credentials_inhibited(client, device_id, credentials_inhibited) - when is_boolean(credentials_inhibited) do - Queries.set_inhibit_credentials_request(client, device_id, credentials_inhibited) + defp sanitize_error(changeset) do + # if there is a custom error, return it: it was created by Aliases.apply or Attributes.apply + Enum.find_value(changeset.errors, changeset, fn + {:aliases, {"", [reason: reason]}} -> reason + {:attributes, {"", [reason: reason]}} -> reason + _ -> false + end) end @doc """ diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex index 44ea404dd..3d298b33c 100644 --- a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex @@ -22,8 +22,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do alias Astarte.AppEngine.API.Config alias Astarte.AppEngine.API.Device.InterfaceValuesOptions alias Astarte.Core.CQLUtils - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Result, as: DatabaseResult alias Astarte.DataAccess.Realms.Device, as: DatabaseDevice alias Astarte.DataAccess.Realms.Endpoint, as: DatabaseEndpoint @@ -34,7 +32,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do alias Astarte.DataAccess.Astarte.KvStore alias Astarte.DataAccess.Astarte.Realm - require CQEx require Logger def retrieve_interfaces_list(realm_name) do @@ -158,252 +155,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do limit: ^limit end - def insert_attribute(client, device_id, attribute_key, attribute_value) do - insert_attribute_statement = """ - UPDATE devices - SET attributes[:attribute_key] = :attribute_value - WHERE device_id = :device_id - """ - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(insert_attribute_statement) - |> DatabaseQuery.put(:attribute_key, attribute_key) - |> DatabaseQuery.put(:attribute_value, attribute_value) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - - with {:ok, _result} <- DatabaseQuery.call(client, query) do - :ok - else - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - end - - def delete_attribute(client, device_id, attribute_key) do - retrieve_attribute_statement = """ - SELECT attributes FROM devices WHERE device_id = :device_id - """ - - retrieve_attribute_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(retrieve_attribute_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) - - with {:ok, result} <- DatabaseQuery.call(client, retrieve_attribute_query), - [attributes: attributes] <- DatabaseResult.head(result), - {^attribute_key, _attribute_value} <- - Enum.find(attributes || [], fn m -> match?({^attribute_key, _}, m) end) do - delete_attribute_statement = """ - DELETE attributes[:attribute_key] - FROM devices - WHERE device_id = :device_id - """ - - delete_attribute_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(delete_attribute_statement) - |> DatabaseQuery.put(:attribute_key, attribute_key) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - - case DatabaseQuery.call(client, delete_attribute_query) do - {:ok, _result} -> - :ok - - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - else - nil -> - {:error, :attribute_key_not_found} - - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - end - - def insert_alias(client, device_id, alias_tag, alias_value) do - insert_alias_to_names_statement = """ - INSERT INTO names - (object_name, object_type, object_uuid) - VALUES (:alias, 1, :device_id) - """ - - insert_alias_to_names_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(insert_alias_to_names_statement) - |> DatabaseQuery.put(:alias, alias_value) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - |> DatabaseQuery.convert() - - insert_alias_to_device_statement = """ - UPDATE devices - SET aliases[:alias_tag] = :alias - WHERE device_id = :device_id - """ - - insert_alias_to_device_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(insert_alias_to_device_statement) - |> DatabaseQuery.put(:alias_tag, alias_tag) - |> DatabaseQuery.put(:alias, alias_value) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - |> DatabaseQuery.convert() - - insert_batch = - CQEx.cql_query_batch( - consistency: :each_quorum, - mode: :logged, - queries: [insert_alias_to_names_query, insert_alias_to_device_query] - ) - - with {:existing, {:error, :device_not_found}} <- - {:existing, device_alias_to_device_id(client, alias_value)}, - :ok <- try_delete_alias(client, device_id, alias_tag), - {:ok, _result} <- DatabaseQuery.call(client, insert_batch) do - :ok - else - {:existing, {:ok, _device_uuid}} -> - {:error, :alias_already_in_use} - - {:existing, {:error, reason}} -> - {:error, reason} - - {:error, :device_not_found} -> - {:error, :device_not_found} - - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - end - - def delete_alias(client, device_id, alias_tag) do - retrieve_aliases_statement = """ - SELECT aliases - FROM devices - WHERE device_id = :device_id - """ - - retrieve_aliases_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(retrieve_aliases_statement) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:quorum) - - with {:ok, result} <- DatabaseQuery.call(client, retrieve_aliases_query), - [aliases: aliases] <- DatabaseResult.head(result), - {^alias_tag, alias_value} <- - Enum.find(aliases || [], fn a -> match?({^alias_tag, _}, a) end), - {:check, {:ok, ^device_id}} <- {:check, device_alias_to_device_id(client, alias_value)} do - delete_alias_from_device_statement = """ - DELETE aliases[:alias_tag] - FROM devices - WHERE device_id = :device_id - """ - - delete_alias_from_device_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(delete_alias_from_device_statement) - |> DatabaseQuery.put(:alias_tag, alias_tag) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - |> DatabaseQuery.convert() - - delete_alias_from_names_statement = """ - DELETE FROM names - WHERE object_name = :alias AND object_type = 1 - """ - - delete_alias_from_names_query = - DatabaseQuery.new() - |> DatabaseQuery.statement(delete_alias_from_names_statement) - |> DatabaseQuery.put(:alias, alias_value) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - |> DatabaseQuery.convert() - - delete_batch = - CQEx.cql_query_batch( - consistency: :each_quorum, - mode: :logged, - queries: [delete_alias_from_device_query, delete_alias_from_names_query] - ) - - with {:ok, _result} <- DatabaseQuery.call(client, delete_batch) do - :ok - else - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - else - {:check, _} -> - _ = - Logger.error("Inconsistent alias for #{alias_tag}.", - device_id: device_id, - tag: "inconsistent_alias" - ) - - {:error, :database_error} - - :empty_dataset -> - {:error, :device_not_found} - - nil -> - {:error, :alias_tag_not_found} - - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - end - - defp try_delete_alias(client, device_id, alias_tag) do - case delete_alias(client, device_id, alias_tag) do - :ok -> - :ok - - {:error, :alias_tag_not_found} -> - :ok - - not_ok -> - not_ok - end - end - def device_alias_to_device_id(realm_name, device_alias) do keyspace = Realm.keyspace_name(realm_name) @@ -413,33 +164,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do where: [object_type: 1, object_name: ^device_alias] end - def set_inhibit_credentials_request(client, device_id, inhibit_credentials_request) do - statement = """ - UPDATE devices - SET inhibit_credentials_request = :inhibit_credentials_request - WHERE device_id = :device_id - """ - - query = - DatabaseQuery.new() - |> DatabaseQuery.statement(statement) - |> DatabaseQuery.put(:inhibit_credentials_request, inhibit_credentials_request) - |> DatabaseQuery.put(:device_id, device_id) - |> DatabaseQuery.consistency(:each_quorum) - - with {:ok, _result} <- DatabaseQuery.call(client, query) do - :ok - else - %{acc: _, msg: error_message} -> - _ = Logger.warning("Database error: #{error_message}.", tag: "db_error") - {:error, :database_error} - - {:error, reason} -> - _ = Logger.warning("Update failed, reason: #{inspect(reason)}.", tag: "db_error") - {:error, :database_error} - end - end - def retrieve_object_datastream_values( realm_name, device_id, @@ -461,28 +185,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do |> limit(^query_limit) end - def get_results_count(_client, _count_query, %InterfaceValuesOptions{downsample_to: nil}) do - # Count will be ignored since theres no downsample_to - nil - end - - def get_results_count(client, count_query, opts) do - with {:ok, result} <- DatabaseQuery.call(client, count_query), - [{_count_key, count}] <- DatabaseResult.head(result) do - limit = opts.limit || Config.max_results_limit!() - - min(count, limit) - else - error -> - _ = - Logger.warning("Can't retrieve count for #{inspect(count_query)}: #{inspect(error)}.", - tag: "db_error" - ) - - nil - end - end - def all_properties_for_endpoint!(realm_name, device_id, interface_row, endpoint_id) do table = interface_row.storage interface_id = interface_row.interface_id @@ -701,19 +403,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do |> Map.merge(value_attributes) end - def to_db_friendly_type(array) when is_list(array) do - # If we have an array, we convert its elements to a db friendly type - Enum.map(array, &to_db_friendly_type/1) - end - - def to_db_friendly_type(%DateTime{} = datetime) do - DateTime.to_unix(datetime, :millisecond) - end - - def to_db_friendly_type(value) do - value - end - def timestamp_and_submillis(%DateTime{} = datetime) do timestamp_sub = datetime |> DateTime.to_unix(:microsecond) |> rem(100) {datetime, timestamp_sub}