Skip to content

Commit

Permalink
wip: init exandra
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Noacco <[email protected]>
  • Loading branch information
noaccOS committed Apr 29, 2024
1 parent 7dd941c commit b16c476
Show file tree
Hide file tree
Showing 20 changed files with 669 additions and 272 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"test/**/*.{ex,exs}",
"mix.exs"
],
import_deps: [:skogsra]
import_deps: [:skogsra, :ecto]
]
23 changes: 5 additions & 18 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017-2024 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -20,23 +20,10 @@
# and its dependencies with the aid of the Mix.Config module.
import Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.
config :astarte_data_access, ecto_repos: [Astarte.DataAccess.Repo]

# You can configure your application as:
#
# config :astarte_data_access, key: :value
#
# and access this configuration in your application as:
#
# Application.get_env(:astarte_data_access, :key)
#
# You can also configure a 3rd-party app:
#
# config :logger, level: :info
#
config :astarte_data_access, Astarte.DataAccess.Repo,
keyspace: "astarte",
sync_connect: 5000

import_config "#{config_env()}.exs"
4 changes: 2 additions & 2 deletions lib/astarte_data_access.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ defmodule Astarte.DataAccess do
def init(init_arg) do
xandra_options =
Keyword.fetch!(init_arg, :xandra_options)
|> Keyword.put(:name, :astarte_data_access_xandra)
# TODO move to string keys
|> Keyword.put(:atom_keys, true)

children = [
{Xandra.Cluster, xandra_options}
{Xandra.Cluster, xandra_options},
Astarte.DataAccess.Repo
]

opts = [strategy: :one_for_one, name: Astarte.DataAccess.Supervisor]
Expand Down
10 changes: 10 additions & 0 deletions lib/astarte_data_access/astarte/kv_store.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Astarte.DataAccess.Astarte.KvStore do
use Ecto.Schema

@primary_key false
schema "kv_store" do
field :group, :string, primary_key: true
field :key, :string, primary_key: true
field :value, :binary
end
end
7 changes: 7 additions & 0 deletions lib/astarte_data_access/astarte/realm.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Astarte.DataAccess.Astarte.Realm do
use Ecto.Schema

@primary_key {:realm_name, :string, autogenerate: false}
schema "realms" do
end
end
1 change: 1 addition & 0 deletions lib/astarte_data_access/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ defmodule Astarte.DataAccess.Config do
@spec xandra_options!() :: Xandra.start_options()
def xandra_options! do
[
name: :astarte_data_access_xandra,
nodes: xandra_nodes!(),
authentication: xandra_authentication_options!(),
pool_size: pool_size!(),
Expand Down
213 changes: 69 additions & 144 deletions lib/astarte_data_access/data.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2018 - 2023 SECO Mind Srl
# Copyright 2018-2024 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,13 +18,15 @@

defmodule Astarte.DataAccess.Data do
require Logger
alias Astarte.DataAccess.XandraUtils
alias Astarte.DataAccess.Repo
alias Astarte.DataAccess.Realms.IndividualProperty
# alias Astarte.DataAccess.XandraUtils
alias Astarte.Core.CQLUtils
alias Astarte.Core.Device
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping

@individual_properties_table "individual_properties"
alias Ecto.UUID
import Ecto.Query

@spec fetch_property(
String.t(),
Expand All @@ -42,48 +44,48 @@ defmodule Astarte.DataAccess.Data do
path
)
when is_binary(device_id) and is_binary(path) do
XandraUtils.run(
realm,
&do_fetch_property(&1, &2, device_id, interface_descriptor, mapping, path)
)
end

defp do_fetch_property(conn, realm_name, device_id, interface_descriptor, mapping, path) do
value_column = CQLUtils.type_to_db_column_name(mapping.value_type)

statement = """
SELECT #{value_column}
FROM #{realm_name}."#{interface_descriptor.storage}"
WHERE device_id=:device_id AND interface_id=:interface_id
AND endpoint_id=:endpoint_id AND path=:path
"""

params = %{
device_id: device_id,
interface_id: interface_descriptor.interface_id,
endpoint_id: mapping.endpoint_id,
path: path
}

with {:ok, %Xandra.Page{} = page} <-
XandraUtils.retrieve_page(conn, statement, params, consistency: :quorum) do
retrieve_property_value(page, value_column)
end
end

defp retrieve_property_value(%Xandra.Page{} = page, value_column) do
value_atom = String.to_existing_atom(value_column)

case Enum.to_list(page) do
[] ->
{:error, :property_not_set}

[%{^value_atom => value}] ->
if value != nil do
{:ok, value}
else
{:error, :undefined_property}
end
column =
mapping.value_type
|> CQLUtils.type_to_db_column_name()
|> String.to_existing_atom()

device_id = UUID.cast!(device_id)
interface_id = UUID.cast!(interface_descriptor.interface_id)
endpoint_id = UUID.cast!(mapping.endpoint_id)
# select all possible column types, we don't care even if it's nil
query =
from interface_descriptor.storage,
prefix: ^realm,
where: [
device_id: ^device_id,
interface_id: ^interface_id,
endpoint_id: ^endpoint_id,
path: ^path
],
select: [
:binaryblob_value,
:binaryblobarray_value,
:boolean_value,
:booleanarray_value,
:datetime_value,
:datetimearray_value,
:double_value,
:doublearray_value,
:integer_value,
:integerarray_value,
:longinteger_value,
:longintegerarray_value,
:string_value,
:stringarray_value
]

with {:ok, %{^column => value}} <-
Repo.fetch_one(query, consistency: :quorum, error: :property_not_set) do
case value do
nil -> {:error, :undefined_property}
value -> {:ok, value}
end
end
end

Expand All @@ -102,51 +104,11 @@ defmodule Astarte.DataAccess.Data do
path
)
when is_binary(device_id) and is_binary(path) do
XandraUtils.run(
realm,
&do_path_exists?(&1, &2, device_id, interface_descriptor, mapping, path)
)
end

defp do_path_exists?(conn, realm_name, device_id, interface_descriptor, mapping, path) do
# TODO: do not hardcode individual_properties here
statement = """
SELECT COUNT(*)
FROM #{realm_name}.#{@individual_properties_table}
WHERE device_id=:device_id AND interface_id=:interface_id
AND endpoint_id=:endpoint_id AND path=:path
"""

params = %{
device_id: device_id,
interface_id: interface_descriptor.interface_id,
endpoint_id: mapping.endpoint_id,
path: path
}

with {:ok, %Xandra.Page{} = page} <-
XandraUtils.retrieve_page(conn, statement, params, consistency: :quorum),
{:ok, value} <- retrieve_path_count(page) do
case value do
0 ->
{:ok, false}

1 ->
{:ok, true}
end
end
end

defp retrieve_path_count(page) do
case Enum.to_list(page) do
[] ->
{:error, :property_not_set}

[%{count: nil}] ->
{:error, :undefined_property}

[%{count: value}] ->
{:ok, value}
fetch(realm, device_id, interface_descriptor, mapping, path)
|> Repo.aggregate(:count, consistency: :quorum)
|> case do
0 -> {:ok, false}
1 -> {:ok, true}
end
end

Expand All @@ -167,63 +129,26 @@ defmodule Astarte.DataAccess.Data do
path
)
when is_binary(device_id) and is_binary(path) do
XandraUtils.run(
realm,
&do_fetch_last_path_update(&1, &2, device_id, interface_descriptor, mapping, path)
)
end

defp do_fetch_last_path_update(
conn,
realm_name,
device_id,
interface_descriptor,
mapping,
path
) do
# TODO: do not hardcode individual_properties here
statement = """
SELECT datetime_value, reception_timestamp, reception_timestamp_submillis
FROM #{realm_name}.#{@individual_properties_table}
WHERE device_id=:device_id AND interface_id=:interface_id
AND endpoint_id=:endpoint_id AND path=:path
"""
query =
fetch(realm, device_id, interface_descriptor, mapping, path)
|> select([:datetime_value, :reception_timestamp, :reception_timestamp_submillis])

params = %{
device_id: device_id,
interface_id: interface_descriptor.interface_id,
endpoint_id: mapping.endpoint_id,
path: path
}
with {:ok, last_update} <- Repo.fetch_one(query, error: :path_not_set) do
value_timestamp = last_update.datetime_value |> DateTime.truncate(:millisecond)
reception_timestamp = IndividualProperty.reception(last_update)

with {:ok, %Xandra.Page{} = page} <-
XandraUtils.retrieve_page(conn, statement, params, consistency: :quorum) do
retrieve_last_path_update(page)
{:ok, %{value_timestamp: value_timestamp, reception_timestamp: reception_timestamp}}
end
end

defp retrieve_last_path_update(page) do
case Enum.to_list(page) do
[] ->
{:error, :path_not_set}

[columns] ->
%{
reception_timestamp: reception_timestamp,
reception_timestamp_submillis: reception_timestamp_submillis,
datetime_value: datetime_value
} = columns

if is_integer(reception_timestamp) and is_integer(datetime_value) do
with {:ok, value_t} <- DateTime.from_unix(datetime_value, :millisecond),
reception_unix =
reception_timestamp * 1000 + div(reception_timestamp_submillis || 0, 10),
{:ok, reception_t} <- DateTime.from_unix(reception_unix, :microsecond) do
{:ok, %{value_timestamp: value_t, reception_timestamp: reception_t}}
end
else
{:error, :invalid_result}
end
end
defp fetch(source \\ IndividualProperty, realm, device_id, interface_descriptor, mapping, path) do
from source,
prefix: ^realm,
where: [
device_id: ^device_id,
interface_id: ^interface_descriptor.interface_id,
endpoint_id: ^mapping.endpoint_id,
path: ^path
]
end
end
32 changes: 9 additions & 23 deletions lib/astarte_data_access/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,26 @@

defmodule Astarte.DataAccess.Device do
require Logger
alias Astarte.DataAccess.XandraUtils
alias Astarte.Core.Device
alias Astarte.DataAccess.Repo
import Ecto.Query

@spec interface_version(String.t(), Device.device_id(), String.t()) ::
{:ok, integer} | {:error, atom}
def interface_version(realm, device_id, interface_name) do
XandraUtils.run(realm, &do_interface_version(&1, &2, device_id, interface_name))
end

defp do_interface_version(conn, realm_name, device_id, interface_name) do
statement = """
SELECT introspection
FROM #{realm_name}.devices
WHERE device_id=:device_id
"""
query =
from d in "devices",
prefix: ^realm,
where: d.device_id == ^device_id,
select: d.introspection

with {:ok, %Xandra.Page{} = page} <-
XandraUtils.retrieve_page(conn, statement, %{device_id: device_id}),
{:ok, introspection} <- retrieve_introspection(page),
with {:ok, introspection} <-
Repo.fetch_one(query, error: :device_not_found),
{:ok, major} <- retrieve_major(introspection, interface_name) do
{:ok, major}
end
end

defp retrieve_introspection(page) do
case Enum.to_list(page) do
[] ->
{:error, :device_not_found}

[%{introspection: introspection}] ->
{:ok, introspection}
end
end

defp retrieve_major(introspection, interface_name) do
with :error <- Map.fetch(introspection, interface_name) do
{:error, :interface_not_in_introspection}
Expand Down
Loading

0 comments on commit b16c476

Please sign in to comment.