Skip to content

Commit

Permalink
Merge pull request #34 from ovnanova/repo_and_mst
Browse files Browse the repository at this point in the history
Blockstore and other repo building blocks
  • Loading branch information
mekaem authored Oct 4, 2024
2 parents 2330b4f + 723d3e5 commit 2d2699c
Show file tree
Hide file tree
Showing 34 changed files with 679 additions and 205 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
locals_without_parens: [from: 2],
locals_without_parens: [from: 2]
]
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ auth.ets
# Cargo build output
/target/

/repos/

# SQLite
pds
pds-shm
Expand All @@ -40,4 +42,4 @@ pds-wal
/target/

#DS_Store
.DS_Store
.DS_Store
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Config

config :hexpds,
plc_server: "plc.directory",
appview_server: "public.api.bsky.app",
appview_server: "api.bsky.app",
relay_server: "bsky.network",
# ignore pls for now
pds_host: "kawaii.social",
Expand All @@ -28,5 +28,6 @@ config :hexpds, Hexpds.Database,
# Replace with Postgres URL in production!
url: "sqlite3:///pds"


config :mnesia,
dir: ~c".mnesia/#{Mix.env()}/#{node()}"
8 changes: 4 additions & 4 deletions lib/hexpds/auth/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ defmodule Hexpds.Auth.Session do
{:error, reason} ->
{:error, reason}

json ->
case json["scope"] do
"com.atproto.access" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"}
"com.atproto.refresh" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"}
%{"sub" => sub} = json ->
case sub["scope"] do
"com.atproto.access" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"}
"com.atproto.refresh" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"}
_ -> {:error, "Invalid scope"}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/hexpds/blob.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule Hexpds.Blob do
though SQL is fast enough that it doesn't really matter.
(This is not part of the ATP spec, just a weird
hack I added)
hack I added) -- sj
"""
def hash(did, "bafkr" <> _ = cid) do
hash(did, CID.decode_cid!(cid))
Expand Down
54 changes: 25 additions & 29 deletions lib/hexpds/blockstore.ex
Original file line number Diff line number Diff line change
@@ -1,53 +1,49 @@
defmodule Hexpds.BlockStore do
@callback put_block(key :: binary(), value :: binary()) :: :ok | {:error, term()}
@callback put_block(value :: binary()) :: :ok | {:error, term()}
@callback get_block(key :: binary()) :: {:ok, binary()} | {:error, term()}
@callback del_block(key :: binary()) :: :ok | {:error, term()}
end

defmodule BlocksTable do
defmodule Hexpds.BlocksTable do
use Ecto.Schema

schema "blocks" do
field(:key, :string)
field(:value, :binary)
field(:block_cid, :string) # A CID
field(:block_value, :binary) # A Dag-CBOR blob

timestamps()
end
end

defmodule Hexpds.EctoBlockStore do
import Ecto.Query
@behaviour Hexpds.BlockStore

def init(_type, config) do
{:ok, Keyword.put(config, :database, :memory)}
end

def put_block(key, value) do
case Hexpds.Database.get_by(BlocksTable, key: key) do
nil ->
case Hexpds.Database.insert!(%BlocksTable{key: key, value: value}) do
{:ok, _} -> :ok
{:error, _} -> {:error, :insert_failed}
end

{:ok, res} when res == value ->
:ok

{:ok, _} ->
{:error, :different_value}

{:error, _} ->
{:error, :get_failed}
alias Hexpds.{BlockStore, DagCBOR, BlocksTable}
@behaviour BlockStore

@impl BlockStore
def put_block(value) do
cid = Hexpds.Repo.Helpers.term_to_dagcbor_cid(value)
case get_block(cid) do
{:error, :not_found} ->
%BlocksTable{
block_cid: cid,
block_value: DagCBOR.encode!(value)
}
|> Hexpds.User.Sqlite.insert!()
anything_else -> anything_else
end
end

@impl BlockStore
def get_block(key) do
case Hexpds.Database.get_by(BlocksTable, key: key) do
case Hexpds.User.Sqlite.get_by(BlocksTable, block_cid: key) do
nil -> {:error, :not_found}
block -> {:ok, block.value}
%BlocksTable{} = block -> block
end
end

@impl BlockStore
def del_block(key) do
Hexpds.Database.delete_all(from(b in BlocksTable, where: b.key == ^key))
Hexpds.User.Sqlite.delete_all(from(b in BlocksTable, where: b.block_cid == ^key))
end
end
8 changes: 8 additions & 0 deletions lib/hexpds/dagcbor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Hexpds.DagCBOR do
import Hexpds.Helpers
defmodule Internal do
use Rustler, otp_app: :hexpds, crate: "hexpds_dagcbor_internal"
@spec encode_dag_cbor(binary()) :: {:ok, binary()} | {:error, String.t()}
Expand Down Expand Up @@ -33,6 +34,8 @@ defmodule Hexpds.DagCBOR do
with {:ok, json} <- Jason.encode(l), do: encode(json)
end

def! encode(json)

@doc """
Decodes a CBOR binary into a JSON string.
"""
Expand All @@ -53,4 +56,9 @@ defmodule Hexpds.DagCBOR do
def decode(cbor) do
with {:ok, json} <- decode_json(cbor), do: Jason.decode(json)
end

def bytes(bytes) do
"hexpds_dagcbor_bytes" <> Base.encode16(bytes, case: :lower)
end

end
2 changes: 1 addition & 1 deletion lib/hexpds/db/cid.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ defmodule Ecto.Types.Cid do
do:
{:ok,
term
|> CID.encode!(:base32_lower)}
|> Hexpds.Repo.Helpers.cid_string}
end
13 changes: 12 additions & 1 deletion lib/hexpds/db/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Hexpds.User do
field(:did, :string)
field(:handle, :string)
field(:password_hash, :string)
field(:signing_key, Ecto.Type.ErlangTerm)
field(:signing_key, Ecto.Type.ErlangTerm) # Why store as erlangterms? So that the type of key is automatically encoded with the key
field(:rotation_key, Ecto.Type.ErlangTerm)
field(:data, :map)
end
Expand Down Expand Up @@ -68,5 +68,16 @@ defmodule Hexpds.User do
data: %{"preferences" => %{}}
}
|> tap(&Hexpds.Database.insert/1)
|> tap(&setup_repo_for/1)
end

def setup_repo_for(%__MODULE__{did: did} = u) do
File.mkdir_p!("./repos/#{did}")
Hexpds.User.Sqlite.exec(u,
fn ->
Hexpds.User.Sqlite.Migrations.all()
|> Hexpds.User.Sqlite.migrate()
end
)
end
end
34 changes: 34 additions & 0 deletions lib/hexpds/db/user/sqlite.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Hexpds.User.Sqlite do
@moduledoc """
Users' repos will be stored in individual sqlite DBs
"""

use Ecto.Repo,
otp_app: :hexpds,
adapter: Ecto.Adapters.SQLite3

def get_for_user(%Hexpds.User{did: did}) do
repo_path = "repos/#{did}/repo.db"
{:ok, db} = start_link(name: nil, database: repo_path)
db
end


def migrate(migrations) do
order_migrations =
migrations
|> Enum.with_index()
|> Enum.map(fn {m, i} -> {i, m} end)
Ecto.Migrator.run(__MODULE__, order_migrations, :up, all: true, dynamic_repo: get_dynamic_repo())
end

def exec(user, callback) do
repo = get_for_user(user)
try do
put_dynamic_repo(repo)
callback.()
after
Supervisor.stop(repo)
end
end
end
74 changes: 74 additions & 0 deletions lib/hexpds/db/user/sqlite/migrations.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Hexpds.User.Sqlite.Migrations.Macros do
defmacro migration(migration_name, do: change_block) do
# need to satisfy my DSL addiction - sj
quote do
@migrations __MODULE__.unquote(migration_name)
defmodule unquote(migration_name) do
use Ecto.Migration

def change do
unquote(change_block)
end
end
end
end
end

defmodule Hexpds.User.Sqlite.Migrations do
Module.register_attribute(__MODULE__, :migrations, accumulate: true)

import __MODULE__.Macros

migration SetupBlockstore do
create table(:blocks) do
add(:block_cid, :string, primary_key: true, null: false)
add(:block_value, :binary, null: false)

timestamps() # Maybe will be helpful if we do special sync APIs
end
end

migration CommitsAndRecords do
create table(:commits) do
add :seq, :integer, primary_key: true, null: false
add :cid, :string, null: false

timestamps() # Probably not strictly necessary, but why not? Better safe than sorry, can easily remove later
end
create table(:records) do
add :record_path, :text, primary_key: true, null: false
add :collection, :string, null: false
add :record_cid, :string, null: false

timestamps() # Will help with sorting for e.g. listRecords
end
end

migration MstNodes do
# Have probably gotten some of this wrong and may need to change implementation
create table(:mst_nodes) do
add :cid, :string, primary_key: true, null: false
add :left, :string, comment: "CID link, optional: link to sub-tree Node on a lower level and with all keys sorting before keys at this node"
add :parent_node_cid, :string
add :depth, :int, null: false
end
create table(:tree_entries) do
add :tree_entry_key, :text, primary_key: true, null: false # This is equivalent to record path
add :parent_node_cid, :string, null: false
add :value, :string, null: false # CID link to the record data (CBOR) for this entry
add :right, :string # link to a sub-tree Node at a lower level which has keys sorting after this TreeEntry's key (to the "right"), but before the next TreeEntry's key in this Node (if any)
end
end

migration SomeUsefulIndices do
# Admittedly I might also be getting this wrong
create index(:tree_entries, :parent_node_cid)
create index(:mst_nodes, :parent_node_cid)
create unique_index(:records, :record_cid)
create index(:records, :collection)
create unique_index(:tree_entries, :value)
create index(:mst_nodes, :depth)
end

def all, do: Enum.reverse(@migrations)
end
16 changes: 16 additions & 0 deletions lib/hexpds/firehose/firehose.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Hexpds.Firehose do
defp cbor_concat(header, op) do
[header, op]
|> Enum.map(&Hexpds.DagCBOR.encode!/1)
|> Enum.reverse()
|> Enum.reduce(&<>/2)
end

def error(type, message \\ nil) do
cbor_concat(%{op: -1}, optional_join(%{error: type}, if(message, do: %{message: message})))
end

defp optional_join(map, nil), do: map
defp optional_join(map1, map2), do: Map.merge(map1, map2)

end
26 changes: 26 additions & 0 deletions lib/hexpds/firehose/websocket.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Hexpds.Firehose.Websocket do
@behaviour WebSock

@impl WebSock
def init(_params) do
# We can ignore the params for now but we should use them for backfilling later
pid = self()
:syn.join(:firehose, :websockets, pid)
{:ok, nil}
end

@impl WebSock
def handle_info({:firehose_message, bindata}, _) do
{:push, {:binary, bindata}, nil}
end

@impl WebSock
def handle_in(_, _) do
# Ignore incoming messages
{:ok, nil}
end

def push_frame(frame) do
:syn.publish(:firehose, :websockets, {:firehose_message, frame})
end
end
2 changes: 1 addition & 1 deletion lib/hexpds/k256.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule Hexpds.K256 do
@doc """
Signs a binary message with a Secp256k1 private key. Returns a binary signature.
"""
def sign(%__MODULE__{privkey: privkey}, message)
def sign(%{privkey: privkey}, message)
when is_binary(message) and is_valid_key(privkey) do
with {:ok, sig} <- Hexpds.K256.Internal.sign_message(privkey, message),
{:ok, sig_bytes} <- Base.decode16(sig, case: :lower),
Expand Down
Empty file added lib/hexpds/mst_test.ex
Empty file.
Loading

0 comments on commit 2d2699c

Please sign in to comment.