From 32de50347409f4e6788056e264a7f472a00b8f35 Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Mon, 15 Jul 2024 21:23:15 -0700 Subject: [PATCH 01/10] mix format --- .formatter.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.formatter.exs b/.formatter.exs index 3520796..f353925 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,5 +1,5 @@ # Used by "mix format" [ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], - locals_without_parens: [from: 2], + locals_without_parens: [from: 2] ] From a476ef98362e26aa6f64be42078481707ad33d9a Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Mon, 15 Jul 2024 21:26:06 -0700 Subject: [PATCH 02/10] fix type issue --- lib/hexpds/blockstore.ex | 3 ++- lib/hexpds/service/http.ex | 9 +-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/hexpds/blockstore.ex b/lib/hexpds/blockstore.ex index 3b6cf3b..5f0d390 100644 --- a/lib/hexpds/blockstore.ex +++ b/lib/hexpds/blockstore.ex @@ -4,7 +4,7 @@ defmodule Hexpds.BlockStore do @callback del_block(key :: binary()) :: :ok | {:error, term()} end -defmodule BlocksTable do +defmodule Hexpds.BlocksTable do use Ecto.Schema schema "blocks" do @@ -15,6 +15,7 @@ end defmodule Hexpds.EctoBlockStore do import Ecto.Query + alias Hexpds.BlocksTable @behaviour Hexpds.BlockStore def init(_type, config) do diff --git a/lib/hexpds/service/http.ex b/lib/hexpds/service/http.ex index aa2a725..d6c8ac1 100644 --- a/lib/hexpds/service/http.ex +++ b/lib/hexpds/service/http.ex @@ -121,14 +121,7 @@ defmodule Hexpds.Http do {statuscode, json_resp} = xrpc_procedure(conn, method, body, get_context(conn)) case json_resp do - %{ - accessJwt: _accessJwt, - did: _did, - error: _error, - handle: _handle, - message: _message, - refreshJwt: _refreshJwt - } = blob -> + {:blob, blob} -> conn |> Plug.Conn.put_resp_content_type(blob.mime_type) |> Plug.Conn.send_resp(200, blob.data) From aba7ad04f1438c7364848376202acc7d680d454e Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Tue, 16 Jul 2024 16:07:36 -0700 Subject: [PATCH 03/10] commit schema pt 1 --- lib/hexpds/service/http.ex | 2 +- lib/hexpds/service/xrpc.ex | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/hexpds/service/http.ex b/lib/hexpds/service/http.ex index d6c8ac1..d9deda9 100644 --- a/lib/hexpds/service/http.ex +++ b/lib/hexpds/service/http.ex @@ -301,7 +301,7 @@ defmodule Hexpds.Http do case ctx do %{user: %Hexpds.User{} = user, token_type: :access} -> Hexpds.User.Preferences.put(user, prefs) - {200, {:blob, %{mime_type: "application/octet-stream", data: ""}}} + {200, XRPC.blank} _ -> {401, %{error: "Unauthorized", message: "Not authorized"}} diff --git a/lib/hexpds/service/xrpc.ex b/lib/hexpds/service/xrpc.ex index 815d1aa..b415b25 100644 --- a/lib/hexpds/service/xrpc.ex +++ b/lib/hexpds/service/xrpc.ex @@ -3,6 +3,10 @@ defmodule Hexpds.XRPC do Just some macros to make the XRPC interface easier to write """ + def blank do + {:blob, %{mime_type: "application/octet-stream", data: ""}} + end + defmacro query(conn, method, params, ctx, do: block) do quote do def xrpc_query(unquote(conn), unquote(method), unquote(params), unquote(ctx)) do From d419555d6f7cd739b300dd04fa14ecb925b04a98 Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Tue, 16 Jul 2024 16:08:59 -0700 Subject: [PATCH 04/10] commit schema actual oops --- lib/hexpds/repo/commit.ex | 22 +++++++++++++++++++ .../migrations/20240716214040_repos.exs | 17 ++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 lib/hexpds/repo/commit.ex create mode 100644 priv/database/migrations/20240716214040_repos.exs diff --git a/lib/hexpds/repo/commit.ex b/lib/hexpds/repo/commit.ex new file mode 100644 index 0000000..a46a482 --- /dev/null +++ b/lib/hexpds/repo/commit.ex @@ -0,0 +1,22 @@ +defmodule Hexpds.Repo.Commit do + defmodule UnsignedCommit do + defstruct [:did, :data, :rev, prev: nil, version: 3] + + @type t :: %UnsignedCommit{ + did: String.t(), + data: Hexpds.CID.cid_string(), + rev: Hexpds.Tid.t(), + prev: Hexpds.CID.cid_string() | nil, + version: 3 + } + + def to_dagcbor(%UnsignedCommit{rev: rev} = commit) do + %{Map.delete(commit, :__struct__) | rev: to_string(rev)} + |> Hexpds.DagCBOR.encode() + end + + end + + defstruct [:unsigned_commit, :sig] + +end diff --git a/priv/database/migrations/20240716214040_repos.exs b/priv/database/migrations/20240716214040_repos.exs new file mode 100644 index 0000000..4704a36 --- /dev/null +++ b/priv/database/migrations/20240716214040_repos.exs @@ -0,0 +1,17 @@ +defmodule Hexpds.Database.Migrations.Repos do + use Ecto.Migration + + def change do + # It would be really nice to have all these things go in per-user dbs later, but ¯\_(ツ)_/¯ for now + create table(:records) do + add :path, :string, null: false + add :did, :string, null: false + add :cid, :binary, null: false + end + create table(:commits) do + add :seq, :integer, null: false + add :did, :string, null: false + add :cid, :binary, null: false + end + end +end From dfdaa6598d3356e20cfe2d25a8dd23384a4c1279 Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Tue, 16 Jul 2024 20:22:30 -0700 Subject: [PATCH 05/10] commit signing + ungodly dagcbor bytes hack --- lib/hexpds/dagcbor.ex | 5 +++ lib/hexpds/k256.ex | 2 +- lib/hexpds/repo/commit.ex | 41 +++++++++++++++++++++-- native/hexpds_dagcbor_internal/Cargo.toml | 1 + native/hexpds_dagcbor_internal/src/lib.rs | 18 +++++++++- 5 files changed, 63 insertions(+), 4 deletions(-) diff --git a/lib/hexpds/dagcbor.ex b/lib/hexpds/dagcbor.ex index 761bc83..17bd099 100644 --- a/lib/hexpds/dagcbor.ex +++ b/lib/hexpds/dagcbor.ex @@ -53,4 +53,9 @@ defmodule Hexpds.DagCBOR do def decode(cbor) do with {:ok, json} <- decode_json(cbor), do: Jason.decode(json) end + + def bytes(bytes) do + "hexpds_dagcbor_bytes" <> Base.encode16(bytes, case: :lower) + end + end diff --git a/lib/hexpds/k256.ex b/lib/hexpds/k256.ex index c92e261..c25a9d4 100644 --- a/lib/hexpds/k256.ex +++ b/lib/hexpds/k256.ex @@ -81,7 +81,7 @@ defmodule Hexpds.K256 do @doc """ Signs a binary message with a Secp256k1 private key. Returns a binary signature. """ - def sign(%__MODULE__{privkey: privkey}, message) + def sign(%{privkey: privkey}, message) when is_binary(message) and is_valid_key(privkey) do with {:ok, sig} <- Hexpds.K256.Internal.sign_message(privkey, message), {:ok, sig_bytes} <- Base.decode16(sig, case: :lower), diff --git a/lib/hexpds/repo/commit.ex b/lib/hexpds/repo/commit.ex index a46a482..f855c18 100644 --- a/lib/hexpds/repo/commit.ex +++ b/lib/hexpds/repo/commit.ex @@ -1,8 +1,14 @@ defmodule Hexpds.Repo.Commit do + alias Hexpds.DagCBOR + import Hexpds.Helpers + defstruct [:unsigned_commit, :sig] + defmodule UnsignedCommit do defstruct [:did, :data, :rev, prev: nil, version: 3] - @type t :: %UnsignedCommit{ + import Hexpds.Helpers + + @type t :: %__MODULE__{ did: String.t(), data: Hexpds.CID.cid_string(), rev: Hexpds.Tid.t(), @@ -15,8 +21,39 @@ defmodule Hexpds.Repo.Commit do |> Hexpds.DagCBOR.encode() end + def! to_dagcbor(commit) + + def sign_with_privkey(%UnsignedCommit{} = commit, privkey) do + Hexpds.K256.PrivateKey.sign(privkey, to_dagcbor!(commit)) + end + + def sign_with_user_privkey(%UnsignedCommit{did: did} = commit) do + sign_with_privkey(commit, Hexpds.User.get(did).signing_key) + end + + def to_signed(commit, privkey \\ nil) do + %Hexpds.Repo.Commit{ + unsigned_commit: commit, + sig: case privkey do + nil -> sign_with_user_privkey(commit) + %Hexpds.K256.PrivateKey{} -> sign_with_privkey(commit, privkey) + end + } + end + end - defstruct [:unsigned_commit, :sig] + @type t :: %__MODULE__{ + unsigned_commit: UnsignedCommit.t(), + sig: Hexpds.K256.Signature.t() + } + + def to_dagcbor(%__MODULE__{unsigned_commit: raw_commit = %{rev: rev}, sig: %Hexpds.K256.Signature{sig: sig_bytes}}) do + %{Map.delete(raw_commit, :__struct__) | rev: to_string(rev)} + |> Map.put(:sig, DagCBOR.bytes(sig_bytes)) + |> DagCBOR.encode() + end + + def! to_dagcbor(comm) end diff --git a/native/hexpds_dagcbor_internal/Cargo.toml b/native/hexpds_dagcbor_internal/Cargo.toml index c863169..a058b32 100644 --- a/native/hexpds_dagcbor_internal/Cargo.toml +++ b/native/hexpds_dagcbor_internal/Cargo.toml @@ -12,6 +12,7 @@ crate-type = ["cdylib"] [dependencies] libipld = "0.16.0" rustler = "0.33.0" +hex = "0.4.3" data-encoding = "2.5.0" serde_json = "1.0.113" [features] diff --git a/native/hexpds_dagcbor_internal/src/lib.rs b/native/hexpds_dagcbor_internal/src/lib.rs index 255f384..6046c04 100644 --- a/native/hexpds_dagcbor_internal/src/lib.rs +++ b/native/hexpds_dagcbor_internal/src/lib.rs @@ -12,6 +12,7 @@ use serde_json::json; use serde_json::Value as JsonValue; use std::collections::BTreeMap; use std::str::FromStr; +use hex; mod atoms { rustler::atoms! { @@ -25,7 +26,7 @@ pub fn json_to_ipld(val: JsonValue) -> Ipld { match val { JsonValue::Null => Ipld::Null, JsonValue::Bool(b) => Ipld::Bool(b), - JsonValue::String(s) => Ipld::String(s), + JsonValue::String(s) => ipld_bytes_or_string(s), JsonValue::Number(v) => { if let Some(f) = v.as_f64() { if v.is_i64() { @@ -53,6 +54,20 @@ pub fn json_to_ipld(val: JsonValue) -> Ipld { } } +pub fn ipld_bytes_or_string(string: String) -> Ipld { + if string.starts_with("hexpds_dagcbor_bytes") { + let prefix_length = "hexpds_dagcbor_bytes".len(); + let bytes_string = &string[prefix_length..]; + + // Decode bytes from hexadecimal representation + let bytes = hex::decode(bytes_string).expect("Failed to decode hexadecimal bytes"); + + Ipld::Bytes(bytes) + } else { + Ipld::String(string) + } +} + // Taken from bnewbold/adenosine pub fn ipld_to_json(val: Ipld) -> JsonValue { match val { @@ -127,3 +142,4 @@ rustler::init!( "Elixir.Hexpds.DagCBOR.Internal", [encode_dag_cbor, decode_dag_cbor] ); + From 76e6e83345fbe7090af496df917bba2c83734883 Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Sat, 20 Jul 2024 08:13:14 -0700 Subject: [PATCH 06/10] more strange blockstore stuff? --- config/config.exs | 2 +- lib/hexpds/blockstore.ex | 43 ++++++++----------- lib/hexpds/dagcbor.ex | 3 ++ lib/hexpds/mst_test.ex | 0 lib/hexpds/repo/helpers.ex | 24 +++++++++++ .../20240720002413_blockstore_pt2.exs | 12 ++++++ .../20240720060416_blockstore_remove_did.exs | 9 ++++ .../20240720144549_blockstore_unique_cids.exs | 7 +++ 8 files changed, 73 insertions(+), 27 deletions(-) create mode 100644 lib/hexpds/mst_test.ex create mode 100644 lib/hexpds/repo/helpers.ex create mode 100644 priv/database/migrations/20240720002413_blockstore_pt2.exs create mode 100644 priv/database/migrations/20240720060416_blockstore_remove_did.exs create mode 100644 priv/database/migrations/20240720144549_blockstore_unique_cids.exs diff --git a/config/config.exs b/config/config.exs index 8d26c72..3f3cb2e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -5,7 +5,7 @@ import Config config :hexpds, plc_server: "plc.directory", - appview_server: "public.api.bsky.app", + appview_server: "api.bsky.app", relay_server: "bsky.network", # ignore pls for now pds_host: "abyss.computer", diff --git a/lib/hexpds/blockstore.ex b/lib/hexpds/blockstore.ex index 5f0d390..20ac672 100644 --- a/lib/hexpds/blockstore.ex +++ b/lib/hexpds/blockstore.ex @@ -1,5 +1,5 @@ defmodule Hexpds.BlockStore do - @callback put_block(key :: binary(), value :: binary()) :: :ok | {:error, term()} + @callback put_block(value :: binary()) :: :ok | {:error, term()} @callback get_block(key :: binary()) :: {:ok, binary()} | {:error, term()} @callback del_block(key :: binary()) :: :ok | {:error, term()} end @@ -8,47 +8,38 @@ defmodule Hexpds.BlocksTable do use Ecto.Schema schema "blocks" do - field(:key, :string) - field(:value, :binary) + field(:block_cid, :string) # A CID + field(:block_value, :binary) # A Dag-CBOR blob end end defmodule Hexpds.EctoBlockStore do import Ecto.Query + alias Hexpds.DagCBOR alias Hexpds.BlocksTable @behaviour Hexpds.BlockStore - def init(_type, config) do - {:ok, Keyword.put(config, :database, :memory)} - end - - def put_block(key, value) do - case Hexpds.Database.get_by(BlocksTable, key: key) do - nil -> - case Hexpds.Database.insert!(%BlocksTable{key: key, value: value}) do - {:ok, _} -> :ok - {:error, _} -> {:error, :insert_failed} - end - - {:ok, res} when res == value -> - :ok - - {:ok, _} -> - {:error, :different_value} - - {:error, _} -> - {:error, :get_failed} + def put_block(value) do + cid = Hexpds.Repo.Helpers.term_to_dagcbor_cid(value) + case get_block(cid) do + {:error, :not_found} -> + %BlocksTable{ + block_cid: cid, + block_value: DagCBOR.encode!(value) + } + |> Hexpds.Database.insert!() + anything_else -> anything_else end end def get_block(key) do - case Hexpds.Database.get_by(BlocksTable, key: key) do + case Hexpds.Database.get_by(BlocksTable, block_cid: key) do nil -> {:error, :not_found} - block -> {:ok, block.value} + block -> block.block_value end end def del_block(key) do - Hexpds.Database.delete_all(from(b in BlocksTable, where: b.key == ^key)) + Hexpds.Database.delete_all(from(b in BlocksTable, where: b.block_cid == ^key)) end end diff --git a/lib/hexpds/dagcbor.ex b/lib/hexpds/dagcbor.ex index 17bd099..398508c 100644 --- a/lib/hexpds/dagcbor.ex +++ b/lib/hexpds/dagcbor.ex @@ -1,4 +1,5 @@ defmodule Hexpds.DagCBOR do + import Hexpds.Helpers defmodule Internal do use Rustler, otp_app: :hexpds, crate: "hexpds_dagcbor_internal" @spec encode_dag_cbor(binary()) :: {:ok, binary()} | {:error, String.t()} @@ -33,6 +34,8 @@ defmodule Hexpds.DagCBOR do with {:ok, json} <- Jason.encode(l), do: encode(json) end + def! encode(json) + @doc """ Decodes a CBOR binary into a JSON string. """ diff --git a/lib/hexpds/mst_test.ex b/lib/hexpds/mst_test.ex new file mode 100644 index 0000000..e69de29 diff --git a/lib/hexpds/repo/helpers.ex b/lib/hexpds/repo/helpers.ex new file mode 100644 index 0000000..0fd3e56 --- /dev/null +++ b/lib/hexpds/repo/helpers.ex @@ -0,0 +1,24 @@ +defmodule Hexpds.Repo.Helpers do + alias Hexpds.{DagCBOR, CID} + + def dagcbor_cid(encoded_dagcbor) do + :crypto.hash(:sha256, encoded_dagcbor) + |> CID.cid!("dag-cbor") + end + + def cid_string(%Hexpds.CID{} = cid) do + CID.encode!(cid, :base32_lower) + end + + def term_to_dagcbor_cid(term, [string: stringify] \\ [string: true]) do + DagCBOR.encode!(term) + |> dagcbor_cid() + |> then(fn encoded -> {encoded, stringify} end) + |> case do + {encoded, true} -> cid_string(encoded) + {encoded, false} -> encoded + _ -> {:error, "term_to_dagcbor_cid got string: #{inspect(stringify)} but stringify must be a bool"} + end + end + +end diff --git a/priv/database/migrations/20240720002413_blockstore_pt2.exs b/priv/database/migrations/20240720002413_blockstore_pt2.exs new file mode 100644 index 0000000..22d4a93 --- /dev/null +++ b/priv/database/migrations/20240720002413_blockstore_pt2.exs @@ -0,0 +1,12 @@ +defmodule Hexpds.Database.Migrations.BlockstorePt2 do + use Ecto.Migration + + def change do + drop table(:blocks) + create table(:blocks) do + add :block_cid, :string + add :repo_did, :string + add :block_value, :binary + end + end +end diff --git a/priv/database/migrations/20240720060416_blockstore_remove_did.exs b/priv/database/migrations/20240720060416_blockstore_remove_did.exs new file mode 100644 index 0000000..9e8f6ce --- /dev/null +++ b/priv/database/migrations/20240720060416_blockstore_remove_did.exs @@ -0,0 +1,9 @@ +defmodule Hexpds.Database.Migrations.BlockstoreRemoveDid do + use Ecto.Migration + + def change do + alter table(:blocks) do + remove :repo_did + end + end +end diff --git a/priv/database/migrations/20240720144549_blockstore_unique_cids.exs b/priv/database/migrations/20240720144549_blockstore_unique_cids.exs new file mode 100644 index 0000000..92bdf4c --- /dev/null +++ b/priv/database/migrations/20240720144549_blockstore_unique_cids.exs @@ -0,0 +1,7 @@ +defmodule Hexpds.Database.Migrations.BlockstoreUniqueCids do + use Ecto.Migration + + def change do + create unique_index(:blocks, :block_cid) + end +end From b33b7ac418ab9913a684860a0e3313de6ff4351f Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Sat, 20 Jul 2024 09:40:44 -0700 Subject: [PATCH 07/10] per-user sqlite dbs with blockstores! --- .gitignore | 4 ++- config/config.exs | 2 +- lib/hexpds/blockstore.ex | 8 +++-- lib/hexpds/db/user.ex | 13 ++++++- lib/hexpds/db/user/sqlite.ex | 34 ++++++++++++++++++ lib/hexpds/db/user/sqlite/migrations.ex | 48 +++++++++++++++++++++++++ 6 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 lib/hexpds/db/user/sqlite.ex create mode 100644 lib/hexpds/db/user/sqlite/migrations.ex diff --git a/.gitignore b/.gitignore index 7bca3f6..02dc631 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,8 @@ auth.ets # Cargo build output /target/ +/repos/ + # SQLite pds pds-shm @@ -40,4 +42,4 @@ pds-wal /target/ #DS_Store -.DS_Store \ No newline at end of file +.DS_Store diff --git a/config/config.exs b/config/config.exs index 3f3cb2e..f635cda 100644 --- a/config/config.exs +++ b/config/config.exs @@ -13,7 +13,7 @@ config :hexpds, admin_password: "admin", # or Ecto.Adapters.Postgres in production ecto_adapter: Ecto.Adapters.SQLite3, - ecto_repos: [Hexpds.Database], + ecto_repos: [Hexpds.Database, Hexpds.User.Sqlite], port: (case Mix.env() do :prod -> 3999 diff --git a/lib/hexpds/blockstore.ex b/lib/hexpds/blockstore.ex index 20ac672..918b5df 100644 --- a/lib/hexpds/blockstore.ex +++ b/lib/hexpds/blockstore.ex @@ -10,6 +10,8 @@ defmodule Hexpds.BlocksTable do schema "blocks" do field(:block_cid, :string) # A CID field(:block_value, :binary) # A Dag-CBOR blob + + timestamps() end end @@ -27,19 +29,19 @@ defmodule Hexpds.EctoBlockStore do block_cid: cid, block_value: DagCBOR.encode!(value) } - |> Hexpds.Database.insert!() + |> Hexpds.User.Sqlite.insert!() anything_else -> anything_else end end def get_block(key) do - case Hexpds.Database.get_by(BlocksTable, block_cid: key) do + case Hexpds.User.Sqlite.get_by(BlocksTable, block_cid: key) do nil -> {:error, :not_found} block -> block.block_value end end def del_block(key) do - Hexpds.Database.delete_all(from(b in BlocksTable, where: b.block_cid == ^key)) + Hexpds.User.Sqlite.delete_all(from(b in BlocksTable, where: b.block_cid == ^key)) end end diff --git a/lib/hexpds/db/user.ex b/lib/hexpds/db/user.ex index fd9ca5c..de1f171 100644 --- a/lib/hexpds/db/user.ex +++ b/lib/hexpds/db/user.ex @@ -11,7 +11,7 @@ defmodule Hexpds.User do field(:did, :string) field(:handle, :string) field(:password_hash, :string) - field(:signing_key, Ecto.Type.ErlangTerm) + field(:signing_key, Ecto.Type.ErlangTerm) # Why store as erlangterms? So that the type of key is automatically encoded with the key field(:rotation_key, Ecto.Type.ErlangTerm) field(:data, :map) end @@ -68,5 +68,16 @@ defmodule Hexpds.User do data: %{"preferences" => %{}} } |> tap(&Hexpds.Database.insert/1) + |> tap(&setup_repo_for/1) + end + + def setup_repo_for(%__MODULE__{did: did} = u) do + File.mkdir_p!("./repos/#{did}") + Hexpds.User.Sqlite.exec(u, + fn -> + Hexpds.User.Sqlite.Migrations.all() + |> Hexpds.User.Sqlite.migrate() + end + ) end end diff --git a/lib/hexpds/db/user/sqlite.ex b/lib/hexpds/db/user/sqlite.ex new file mode 100644 index 0000000..7be46b0 --- /dev/null +++ b/lib/hexpds/db/user/sqlite.ex @@ -0,0 +1,34 @@ +defmodule Hexpds.User.Sqlite do + @moduledoc """ + Users' repos will be stored in individual sqlite DBs + """ + + use Ecto.Repo, + otp_app: :hexpds, + adapter: Ecto.Adapters.SQLite3 + + def get_for_user(%Hexpds.User{did: did}) do + repo_path = "repos/#{did}/repo.db" + {:ok, db} = start_link(name: nil, database: repo_path) + db + end + + + def migrate(migrations) do + order_migrations = + migrations + |> Enum.with_index() + |> Enum.map(fn {m, i} -> {i, m} end) + Ecto.Migrator.run(__MODULE__, order_migrations, :up, all: true, dynamic_repo: get_dynamic_repo()) + end + + def exec(user, callback) do + repo = get_for_user(user) + try do + put_dynamic_repo(repo) + callback.() + after + Supervisor.stop(repo) + end + end +end diff --git a/lib/hexpds/db/user/sqlite/migrations.ex b/lib/hexpds/db/user/sqlite/migrations.ex new file mode 100644 index 0000000..544d1b8 --- /dev/null +++ b/lib/hexpds/db/user/sqlite/migrations.ex @@ -0,0 +1,48 @@ +defmodule Hexpds.User.Sqlite.Migrations.Macros do + defmacro migration(migration_name, do: change_block) do + # need to satisfy my DSL addiction - sj + quote do + @migrations __MODULE__.unquote(migration_name) + defmodule unquote(migration_name) do + use Ecto.Migration + + def change do + unquote(change_block) + end + end + end + end +end + +defmodule Hexpds.User.Sqlite.Migrations do + Module.register_attribute(__MODULE__, :migrations, accumulate: true) + + import __MODULE__.Macros + + migration SetupBlockstore do + create table(:blocks) do + add(:block_cid, :string, primary_key: true, null: false) + add(:block_value, :binary, null: false) + + timestamps() # Maybe will be helpful if we do special sync APIs + end + end + + migration CommitsAndRecords do + create table(:commits) do + add :seq, :integer, primary_key: true, null: false + add :cid, :string, null: false + + timestamps() # Probably not strictly necessary, but why not? Better safe than sorry, can easily remove later + end + create table(:records) do + add :record_path, :text, primary_key: true, null: false + add :collection, :string, null: false + add :record_cid, :string, null: false + + timestamps() # Will help with sorting for e.g. listRecords + end + end + + def all, do: Enum.reverse(@migrations) +end From 5d15e0c0e8395599094b33566b14e02128bea8ee Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Tue, 23 Jul 2024 18:39:50 -0400 Subject: [PATCH 08/10] uploadBlob + first pass at MST db schema --- config/config.exs | 5 +-- lib/hexpds/auth/session.ex | 8 ++--- lib/hexpds/blob.ex | 2 +- lib/hexpds/db/cid.ex | 2 +- lib/hexpds/db/user/sqlite/migrations.ex | 26 +++++++++++++++ lib/hexpds/repo/commit.ex | 14 ++++++++ lib/hexpds/repo/helpers.ex | 7 ++++ lib/hexpds/repo/record.ex | 18 ++++++++++ lib/hexpds/repo/repo.ex | 3 ++ lib/hexpds/service/http.ex | 33 ++++++++++++++----- lib/hexpds/service/xrpc/listBlobs.ex | 2 +- ...0240720164549_no_more_redundant_tables.exs | 10 ++++++ test/hexpds_mst_helpers_test.exs | 11 +++++++ 13 files changed, 123 insertions(+), 18 deletions(-) create mode 100644 lib/hexpds/repo/record.ex create mode 100644 lib/hexpds/repo/repo.ex create mode 100644 priv/database/migrations/20240720164549_no_more_redundant_tables.exs create mode 100644 test/hexpds_mst_helpers_test.exs diff --git a/config/config.exs b/config/config.exs index f635cda..9e51cd2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,12 +8,12 @@ config :hexpds, appview_server: "api.bsky.app", relay_server: "bsky.network", # ignore pls for now - pds_host: "abyss.computer", + pds_host: "secretly-vast-elf.ngrok-free.app", # For testing multicodec_csv_path: "multicodec.csv", admin_password: "admin", # or Ecto.Adapters.Postgres in production ecto_adapter: Ecto.Adapters.SQLite3, - ecto_repos: [Hexpds.Database, Hexpds.User.Sqlite], + ecto_repos: [Hexpds.Database], port: (case Mix.env() do :prod -> 3999 @@ -28,5 +28,6 @@ config :hexpds, Hexpds.Database, # Replace with Postgres URL in production! url: "sqlite3:///pds" + config :mnesia, dir: ~c".mnesia/#{Mix.env()}/#{node()}" diff --git a/lib/hexpds/auth/session.ex b/lib/hexpds/auth/session.ex index efd63bd..2c9513e 100644 --- a/lib/hexpds/auth/session.ex +++ b/lib/hexpds/auth/session.ex @@ -84,10 +84,10 @@ defmodule Hexpds.Auth.Session do {:error, reason} -> {:error, reason} - json -> - case json["scope"] do - "com.atproto.access" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"} - "com.atproto.refresh" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"} + %{"sub" => sub} = json -> + case sub["scope"] do + "com.atproto.access" -> if find_a_jwt(jwt), do: json, else: {:error, "Invalid token"} + "com.atproto.refresh" -> if find_r_jwt(jwt), do: json, else: {:error, "Invalid token"} _ -> {:error, "Invalid scope"} end end diff --git a/lib/hexpds/blob.ex b/lib/hexpds/blob.ex index 49ebed2..8bd5d2a 100644 --- a/lib/hexpds/blob.ex +++ b/lib/hexpds/blob.ex @@ -65,7 +65,7 @@ defmodule Hexpds.Blob do though SQL is fast enough that it doesn't really matter. (This is not part of the ATP spec, just a weird - hack I added) + hack I added) -- sj """ def hash(did, "bafkr" <> _ = cid) do hash(did, CID.decode_cid!(cid)) diff --git a/lib/hexpds/db/cid.ex b/lib/hexpds/db/cid.ex index 16196a4..012d09b 100644 --- a/lib/hexpds/db/cid.ex +++ b/lib/hexpds/db/cid.ex @@ -16,5 +16,5 @@ defmodule Ecto.Types.Cid do do: {:ok, term - |> CID.encode!(:base32_lower)} + |> Hexpds.Repo.Helpers.cid_string} end diff --git a/lib/hexpds/db/user/sqlite/migrations.ex b/lib/hexpds/db/user/sqlite/migrations.ex index 544d1b8..b8a25ca 100644 --- a/lib/hexpds/db/user/sqlite/migrations.ex +++ b/lib/hexpds/db/user/sqlite/migrations.ex @@ -44,5 +44,31 @@ defmodule Hexpds.User.Sqlite.Migrations do end end + migration MstNodes do + # Have probably gotten some of this wrong and may need to change implementation + create table(:mst_nodes) do + add :cid, :string, primary_key: true, null: false + add :left, :string, comment: "CID link, optional: link to sub-tree Node on a lower level and with all keys sorting before keys at this node" + add :parent_node_cid, :string + add :depth, :int, null: false + end + create table(:tree_entries) do + add :tree_entry_key, :text, primary_key: true, null: false # This is equivalent to record path + add :parent_node_cid, :string, null: false + add :value, :string, null: false # CID link to the record data (CBOR) for this entry + add :right, :string # link to a sub-tree Node at a lower level which has keys sorting after this TreeEntry's key (to the "right"), but before the next TreeEntry's key in this Node (if any) + end + end + + migration SomeUsefulIndices do + # Admittedly I might also be getting this wrong + create index(:tree_entries, :parent_node_cid) + create index(:mst_nodes, :parent_node_cid) + create unique_index(:records, :record_cid) + create index(:records, :collection) + create unique_index(:tree_entries, :value) + create index(:mst_nodes, :depth) + end + def all, do: Enum.reverse(@migrations) end diff --git a/lib/hexpds/repo/commit.ex b/lib/hexpds/repo/commit.ex index f855c18..c0b58e2 100644 --- a/lib/hexpds/repo/commit.ex +++ b/lib/hexpds/repo/commit.ex @@ -57,3 +57,17 @@ defmodule Hexpds.Repo.Commit do def! to_dagcbor(comm) end + +defmodule Hexpds.Repo.CommitBlock do + + # These come from Hexpds.User.Sqlite!!! Not from Hexpds.Database!!! (This whole naming thing will probably need to be moved around later) + + use Ecto.Schema + + schema "commits" do + field :seq, :integer + field :cid, :string + + timestamps() + end +end diff --git a/lib/hexpds/repo/helpers.ex b/lib/hexpds/repo/helpers.ex index 0fd3e56..fe888b1 100644 --- a/lib/hexpds/repo/helpers.ex +++ b/lib/hexpds/repo/helpers.ex @@ -21,4 +21,11 @@ defmodule Hexpds.Repo.Helpers do end end + def key_depth(key) do + :crypto.hash(:sha256, key) + |> key_hash_depth() + end + defp key_hash_depth(bin, depth \\ 0) + defp key_hash_depth(<<0::integer-size(2), rest::bitstring>>, depth), do: key_hash_depth(rest, depth + 1) + defp key_hash_depth(_no_more_zeroes, depth), do: depth end diff --git a/lib/hexpds/repo/record.ex b/lib/hexpds/repo/record.ex new file mode 100644 index 0000000..ad5ccb0 --- /dev/null +++ b/lib/hexpds/repo/record.ex @@ -0,0 +1,18 @@ +defmodule Hexpds.Repo.Record do + use Ecto.Schema + import Ecto.Query + + # Again, this belongs in Hexpds.User.Sqlite + + schema "records" do + field :record_path, :string + field :collection, :string + field :record_cid, :string + timestamps() + end + + def all_in_collection(collection) do + from r in __MODULE__, where: r.collection == ^collection, order_by: r.inserted_at + end + +end diff --git a/lib/hexpds/repo/repo.ex b/lib/hexpds/repo/repo.ex new file mode 100644 index 0000000..8bf8eba --- /dev/null +++ b/lib/hexpds/repo/repo.ex @@ -0,0 +1,3 @@ +defmodule Hexpds.Repo do + +end diff --git a/lib/hexpds/service/http.ex b/lib/hexpds/service/http.ex index d9deda9..fcce9af 100644 --- a/lib/hexpds/service/http.ex +++ b/lib/hexpds/service/http.ex @@ -10,11 +10,11 @@ defmodule Hexpds.Http do plug(:match) plug(:dispatch) - plug(Plug.Parsers, - parsers: [:json], - pass: ["text/*"], - json_decoder: Jason - ) + # plug(Plug.Parsers, + # parsers: [:json], + # pass: ["text/*"], + # json_decoder: Jason + # ) options "/xrpc/:any" do conn @@ -116,7 +116,12 @@ defmodule Hexpds.Http do {:ok, body, _} = Plug.Conn.read_body(conn) body = - for {key, val} <- Jason.decode!(body), into: %{}, do: {String.to_atom(key), val} + case Jason.decode(body) do + {:ok, map} -> + for {key, value} <- map, into: %{}, do: {String.to_atom(key), value} + {:error, _} -> + {:blob, body} + end {statuscode, json_resp} = xrpc_procedure(conn, method, body, get_context(conn)) @@ -239,8 +244,6 @@ defmodule Hexpds.Http do end XRPC.query _, "com.atproto.server.describeServer", _, _ do - IO.puts("Describing server...") - {200, %{ # These will all change, obviously @@ -301,10 +304,22 @@ defmodule Hexpds.Http do case ctx do %{user: %Hexpds.User{} = user, token_type: :access} -> Hexpds.User.Preferences.put(user, prefs) - {200, XRPC.blank} + {200, XRPC.blank()} _ -> {401, %{error: "Unauthorized", message: "Not authorized"}} end end + + XRPC.procedure _, "com.atproto.repo.uploadBlob", {:blob, blob_bytes}, ctx do + case ctx do + %{user: %Hexpds.User{} = user, token_type: :access} -> + blob = + Hexpds.Blob.new(blob_bytes, user) + |> tap(&Hexpds.Blob.save/1) + + {200, %{cid: Hexpds.Repo.Helpers.cid_string(blob.cid)}} + _ -> {401, %{error: "Unauthorized", message: "Not authorized"}} + end + end end diff --git a/lib/hexpds/service/xrpc/listBlobs.ex b/lib/hexpds/service/xrpc/listBlobs.ex index cdd7e98..bbb9525 100644 --- a/lib/hexpds/service/xrpc/listBlobs.ex +++ b/lib/hexpds/service/xrpc/listBlobs.ex @@ -15,7 +15,7 @@ defmodule Hexpds.Xrpc.Query.ListBlobs do end def list_blobs(did, tid \\ nil, limit \\ nil, cursor \\ nil) do - query = ecto_query_for(did, tid || "#{Hexpds.Tid.empty()}") + query = ecto_query_for(did, tid || "#{Hexpds.Tid.empty()}") # Note that we just assume the Repo rev to be correct here cids = if cursor do diff --git a/priv/database/migrations/20240720164549_no_more_redundant_tables.exs b/priv/database/migrations/20240720164549_no_more_redundant_tables.exs new file mode 100644 index 0000000..7042026 --- /dev/null +++ b/priv/database/migrations/20240720164549_no_more_redundant_tables.exs @@ -0,0 +1,10 @@ +defmodule Hexpds.Database.Migrations.NoMoreRedundantTables do + use Ecto.Migration + + def change do + # All these tables now belong in per-user SQLite DBs + drop table(:blocks) + drop table(:records) + drop table(:commits) + end +end diff --git a/test/hexpds_mst_helpers_test.exs b/test/hexpds_mst_helpers_test.exs new file mode 100644 index 0000000..e898d26 --- /dev/null +++ b/test/hexpds_mst_helpers_test.exs @@ -0,0 +1,11 @@ +defmodule HexpdsMstHelpersTest do + use ExUnit.Case + doctest Hexpds.Repo.Helpers + + test "gets correct depth for paths" do + assert Hexpds.Repo.Helpers.key_depth("2653ae71") == 0 + assert Hexpds.Repo.Helpers.key_depth("blue") == 1 + assert Hexpds.Repo.Helpers.key_depth("app.bsky.feed.post/454397e440ec") == 4 + assert Hexpds.Repo.Helpers.key_depth("app.bsky.feed.post/9adeb165882c") == 8 + end +end From b841a6e9272dd3d6e3fed8342645e2781de35231 Mon Sep 17 00:00:00 2001 From: ShreyanJain9 Date: Fri, 2 Aug 2024 00:37:02 -0700 Subject: [PATCH 09/10] firehose skeleton --- lib/hexpds/blockstore.ex | 10 +- lib/hexpds/firehose/websocket.ex | 26 ++++ lib/hexpds/repo/helpers.ex | 1 + lib/hexpds/repo/repo.ex | 24 ++++ lib/hexpds/service/http.ex | 192 ++++++------------------------ lib/hexpds/service/http/routes.ex | 155 ++++++++++++++++++++++++ lib/hexpds/service/xrpc.ex | 12 ++ lib/main.ex | 2 +- mix.exs | 2 + mix.lock | 14 +++ 10 files changed, 275 insertions(+), 163 deletions(-) create mode 100644 lib/hexpds/firehose/websocket.ex create mode 100644 lib/hexpds/service/http/routes.ex diff --git a/lib/hexpds/blockstore.ex b/lib/hexpds/blockstore.ex index 918b5df..90cd48b 100644 --- a/lib/hexpds/blockstore.ex +++ b/lib/hexpds/blockstore.ex @@ -17,10 +17,10 @@ end defmodule Hexpds.EctoBlockStore do import Ecto.Query - alias Hexpds.DagCBOR - alias Hexpds.BlocksTable - @behaviour Hexpds.BlockStore + alias Hexpds.{BlockStore, DagCBOR, BlocksTable} + @behaviour BlockStore + @impl BlockStore def put_block(value) do cid = Hexpds.Repo.Helpers.term_to_dagcbor_cid(value) case get_block(cid) do @@ -34,13 +34,15 @@ defmodule Hexpds.EctoBlockStore do end end + @impl BlockStore def get_block(key) do case Hexpds.User.Sqlite.get_by(BlocksTable, block_cid: key) do nil -> {:error, :not_found} - block -> block.block_value + %BlocksTable{} = block -> block end end + @impl BlockStore def del_block(key) do Hexpds.User.Sqlite.delete_all(from(b in BlocksTable, where: b.block_cid == ^key)) end diff --git a/lib/hexpds/firehose/websocket.ex b/lib/hexpds/firehose/websocket.ex new file mode 100644 index 0000000..9e1a6fc --- /dev/null +++ b/lib/hexpds/firehose/websocket.ex @@ -0,0 +1,26 @@ +defmodule Hexpds.Firehose.Websocket do + @behaviour WebSock + + @impl WebSock + def init(_params) do + # We can ignore the params for now but we should use them for backfilling later + pid = self() + :syn.join(:firehose, :websockets, pid) + {:ok, nil} + end + + @impl WebSock + def handle_info({:firehose_message, bindata}, _) do + {:push, {:binary, bindata}, nil} + end + + @impl WebSock + def handle_in(_, _) do + # Ignore incoming messages + {:ok, nil} + end + + def push_frame(frame) do + :syn.publish(:firehose, :websockets, {:firehose_message, frame}) + end +end diff --git a/lib/hexpds/repo/helpers.ex b/lib/hexpds/repo/helpers.ex index fe888b1..c8f8c9b 100644 --- a/lib/hexpds/repo/helpers.ex +++ b/lib/hexpds/repo/helpers.ex @@ -3,6 +3,7 @@ defmodule Hexpds.Repo.Helpers do def dagcbor_cid(encoded_dagcbor) do :crypto.hash(:sha256, encoded_dagcbor) + |> then(fn hash -> {:ok, multihash} = Multihash.encode(:sha2_256, hash); multihash end) |> CID.cid!("dag-cbor") end diff --git a/lib/hexpds/repo/repo.ex b/lib/hexpds/repo/repo.ex index 8bf8eba..3e5553e 100644 --- a/lib/hexpds/repo/repo.ex +++ b/lib/hexpds/repo/repo.ex @@ -1,3 +1,27 @@ defmodule Hexpds.Repo do + def create_record( + %Hexpds.User{did: did} = user, + %{"$type" => collection} = record, + collection, + rkey \\ "#{Hexpds.Tid.now()}" + ) do + Hexpds.User.Sqlite.exec(user, fn -> + block = Hexpds.EctoBlockStore.put_block(record) + record = + %Hexpds.Repo.Record{ + record_cid: block.block_cid, + collection: collection, + record_path: "#{collection}/#{rkey}" + } + |> Hexpds.User.Sqlite.insert!() + + # Obviously MST and Commits stuff are missing here + + %{ + uri: "at://#{did}/#{record.record_path}", + cid: record.record_cid + } + end) + end end diff --git a/lib/hexpds/service/http.ex b/lib/hexpds/service/http.ex index fcce9af..97a2201 100644 --- a/lib/hexpds/service/http.ex +++ b/lib/hexpds/service/http.ex @@ -2,9 +2,6 @@ defmodule Hexpds.Http do @moduledoc """ The XRPC interface to the PDS, including AppView proxying """ - alias Hexpds.XRPC - require XRPC - use Plug.Router plug(:match) @@ -45,6 +42,34 @@ defmodule Hexpds.Http do send_resp(conn, 200, "Why would a PDS need a favicon?") end + def respond_with(conn, {statuscode, resp_body}) do + case resp_body do + {:blob, blob} -> + conn + |> Plug.Conn.put_resp_content_type(blob.mime_type) + |> Plug.Conn.send_resp(statuscode, blob.data) + |> Plug.Conn.halt() + |> IO.inspect() + + {:websock, {module, args, [timeout: timeout]}} -> + conn + |> WebSockAdapter.upgrade(module, args, timeout: timeout) + |> halt() + + _ -> + conn + |> Plug.Conn.put_resp_content_type("application/json") + |> Plug.Conn.put_resp_header("access-control-allow-origin", "*") + |> Plug.Conn.send_resp(statuscode, Jason.encode!(resp_body)) + end + end + + get "/ws/firehose_test" do + conn + |> WebSockAdapter.upgrade(Hexpds.Firehose.Websocket, [], timeout: 60_000) + |> halt() + end + get "/.well-known/atproto-did" do {status, resp} = case Hexpds.User.get(conn.host) do @@ -65,12 +90,12 @@ defmodule Hexpds.Http do context = get_context(conn) - {statuscode, json_body} = + resp = try do # We can handle the method IO.puts("Got query: #{method} #{inspect(params)}") - xrpc_query(conn, method, params, context) + Hexpds.Http.Routes.xrpc_query(conn, method, params, context) |> IO.inspect() catch _, e_from_method -> @@ -96,20 +121,7 @@ defmodule Hexpds.Http do end end - case json_body do - {:blob, blob} -> - conn - |> Plug.Conn.put_resp_content_type(blob.mime_type) - |> Plug.Conn.send_resp(200, blob.data) - |> Plug.Conn.halt() - |> IO.inspect() - - _ -> - conn - |> Plug.Conn.put_resp_content_type("application/json") - |> Plug.Conn.put_resp_header("access-control-allow-origin", "*") - |> Plug.Conn.send_resp(statuscode, Jason.encode!(json_body)) - end + respond_with(conn, resp) end post "/xrpc/:method" do @@ -119,24 +131,14 @@ defmodule Hexpds.Http do case Jason.decode(body) do {:ok, map} -> for {key, value} <- map, into: %{}, do: {String.to_atom(key), value} + {:error, _} -> {:blob, body} end - {statuscode, json_resp} = xrpc_procedure(conn, method, body, get_context(conn)) - - case json_resp do - {:blob, blob} -> - conn - |> Plug.Conn.put_resp_content_type(blob.mime_type) - |> Plug.Conn.send_resp(200, blob.data) + resp = Hexpds.Http.Routes.xrpc_procedure(conn, method, body, get_context(conn)) - _ -> - conn - |> Plug.Conn.put_resp_content_type("application/json") - |> Plug.Conn.put_resp_header("access-control-allow-origin", "*") - |> Plug.Conn.send_resp(statuscode, Jason.encode!(json_resp)) - end + respond_with(conn, resp) end defp appview_for(%Plug.Conn{req_headers: r_h}) do @@ -196,130 +198,4 @@ defmodule Hexpds.Http do |> Map.get("authorization") |> Hexpds.Auth.Context.parse_header() end - - @spec xrpc_query(Plug.Conn.t(), String.t(), map(), Hexpds.Auth.Context.t()) :: - {integer(), map() | {:blob, Hexpds.Blob.t()}} - - XRPC.query _, "app.bsky.actor.getPreferences", %{}, ctx do - case ctx do - %{user: %Hexpds.User{} = user, token_type: :access} -> - {200, %{preferences: user.data["preferences"]}} - - _ -> - {401, %{error: "Unauthorized", message: "Not authorized"}} - end - end - - XRPC.query _, "com.atproto.sync.getBlob", %{did: did, cid: cid}, _ do - with %Hexpds.Blob{} = blob <- Hexpds.Blob.get(cid, did) do - {200, {:blob, blob}} - else - _ -> {400, %{error: "InvalidRequest", message: "No such blob"}} - end - end - - XRPC.query _, "com.atproto.sync.listBlobs", opts, _ do - case Hexpds.Xrpc.Query.ListBlobs.list_blobs( - opts[:did], - opts[:since], - String.to_integer(opts[:limit] || 500), - Hexpds.CID.decode_cid!(opts[:cursor]) - ) do - %{cids: cids, cursor: next_cursor} -> - {200, %{cursor: next_cursor, cids: Enum.map(cids, &to_string/1)}} - - _ -> - {400, %{error: "InvalidRequest", message: "Unknown user."}} - end - end - - XRPC.query _, "com.atproto.server.getSession", %{}, ctx do - case ctx do - %{user: %Hexpds.User{did: did, handle: handle}, token_type: :access} -> - {200, %{handle: handle, did: did}} - - _ -> - {401, %{error: "Unauthorized", message: "Not authorized"}} - end - end - - XRPC.query _, "com.atproto.server.describeServer", _, _ do - {200, - %{ - # These will all change, obviously - availableUserDomains: ["localhost"], - did: "did:web:localhost" - }} - end - - @spec xrpc_procedure(Plug.Conn.t(), String.t(), map(), Hexpds.Auth.Context.t()) :: - {integer(), map() | {:blob, Hexpds.Blob.t()}} - - XRPC.procedure _, - "com.atproto.server.createSession", - %{identifier: username, password: pw}, - _ do - {200, Hexpds.Auth.Session.new(username, pw)} - end - - XRPC.procedure c, "com.atproto.server.refreshSession", _, ctx do - case ctx do - %{user: %Hexpds.User{}, token_type: :refresh} -> - c.req_headers - |> Enum.into(%{}) - |> Map.get("authorization") - |> case do - "Bearer " <> token -> - case Hexpds.Auth.Session.refresh(token) do - %{} = session -> {200, session} - _ -> {400, %{error: "InvalidToken", message: "Refresh session failed"}} - end - - _ -> - {400, %{error: "InvalidToken", message: "Refresh session failed"}} - end - - _ -> - {401, %{error: "Unauthorized", message: "Not authorized"}} - end - end - - XRPC.procedure conn, "com.atproto.server.deleteSession", _, _ do - conn.req_headers - |> Enum.into(%{}) - |> Map.get("authorization") - |> case do - "Bearer " <> token -> - case Hexpds.Auth.Session.delete(token) do - :ok -> {200, %{}} - _ -> {401, %{error: "InvalidToken", message: "Delete session failed"}} - end - - _ -> - {401, %{error: "InvalidToken", message: "Delete session failed"}} - end - end - - XRPC.procedure _, "app.bsky.actor.putPreferences", %{preferences: prefs}, ctx do - case ctx do - %{user: %Hexpds.User{} = user, token_type: :access} -> - Hexpds.User.Preferences.put(user, prefs) - {200, XRPC.blank()} - - _ -> - {401, %{error: "Unauthorized", message: "Not authorized"}} - end - end - - XRPC.procedure _, "com.atproto.repo.uploadBlob", {:blob, blob_bytes}, ctx do - case ctx do - %{user: %Hexpds.User{} = user, token_type: :access} -> - blob = - Hexpds.Blob.new(blob_bytes, user) - |> tap(&Hexpds.Blob.save/1) - - {200, %{cid: Hexpds.Repo.Helpers.cid_string(blob.cid)}} - _ -> {401, %{error: "Unauthorized", message: "Not authorized"}} - end - end end diff --git a/lib/hexpds/service/http/routes.ex b/lib/hexpds/service/http/routes.ex new file mode 100644 index 0000000..5b219b8 --- /dev/null +++ b/lib/hexpds/service/http/routes.ex @@ -0,0 +1,155 @@ +defmodule Hexpds.Http.Routes do + alias Hexpds.XRPC + require XRPC + + @spec xrpc_query(Plug.Conn.t(), String.t(), map(), Hexpds.Auth.Context.t()) :: + {integer(), map() | {:blob, Hexpds.Blob.t()}} + + XRPC.query _, "app.bsky.actor.getPreferences", %{}, ctx do + case ctx do + %{user: %Hexpds.User{} = user, token_type: :access} -> + {200, %{preferences: user.data["preferences"]}} + + _ -> + {401, %{error: "Unauthorized", message: "Not authorized"}} + end + end + + XRPC.query _, "com.atproto.sync.getBlob", %{did: did, cid: cid}, _ do + with %Hexpds.Blob{} = blob <- Hexpds.Blob.get(cid, did) do + {200, {:blob, blob}} + else + _ -> {400, %{error: "InvalidRequest", message: "No such blob"}} + end + end + + XRPC.query _, "com.atproto.sync.listBlobs", opts, _ do + case Hexpds.Xrpc.Query.ListBlobs.list_blobs( + opts[:did], + opts[:since], + String.to_integer(opts[:limit] || 500), + Hexpds.CID.decode_cid!(opts[:cursor]) + ) do + %{cids: cids, cursor: next_cursor} -> + {200, %{cursor: next_cursor, cids: Enum.map(cids, &to_string/1)}} + + _ -> + {400, %{error: "InvalidRequest", message: "Unknown user."}} + end + end + + XRPC.query _, "com.atproto.server.getSession", %{}, ctx do + case ctx do + %{user: %Hexpds.User{did: did, handle: handle}, token_type: :access} -> + {200, %{handle: handle, did: did}} + + _ -> + {401, %{error: "Unauthorized", message: "Not authorized"}} + end + end + + XRPC.query _, "com.atproto.server.describeServer", _, _ do + domain = Application.get_env(:hexpds, :pds_host) + {200, + %{ + # These will all change, obviously + availableUserDomains: [domain], + did: "did:web:#{domain}" + }} + end + + XRPC.query _, "com.atproto.sync.subscribeRepos", _params_for_backfilling_to_implement_at_some_point, _ do + {200, {:websock, {Hexpds.Firehose.Websocket, [], timeout: :infinity}}} + end + + @spec xrpc_procedure(Plug.Conn.t(), String.t(), map(), Hexpds.Auth.Context.t()) :: + {integer(), map() | {:blob, Hexpds.Blob.t()}} + + XRPC.procedure _, + "com.atproto.server.createSession", + %{identifier: username, password: pw}, + _ do + {200, Hexpds.Auth.Session.new(username, pw)} + end + + XRPC.procedure c, "com.atproto.server.refreshSession", _, ctx do + XRPC.if_authed ctx, :refresh do + c.req_headers + |> Enum.into(%{}) + |> Map.get("authorization") + |> case do + "Bearer " <> token -> + case Hexpds.Auth.Session.refresh(token) do + %{} = session -> {200, session} + _ -> {400, %{error: "InvalidToken", message: "Refresh session failed"}} + end + + _ -> + {400, %{error: "InvalidToken", message: "Refresh session failed"}} + end + end + end + + XRPC.procedure conn, "com.atproto.server.deleteSession", _, _ do + conn.req_headers + |> Enum.into(%{}) + |> Map.get("authorization") + |> case do + "Bearer " <> token -> + case Hexpds.Auth.Session.delete(token) do + :ok -> {200, %{}} + _ -> {401, %{error: "InvalidToken", message: "Delete session failed"}} + end + + _ -> + {401, %{error: "InvalidToken", message: "Delete session failed"}} + end + end + + XRPC.procedure _, "app.bsky.actor.putPreferences", %{preferences: prefs}, ctx do + XRPC.if_authed ctx do + Hexpds.User.Preferences.put(ctx.user, prefs) + {200, XRPC.blank()} + end + end + + XRPC.procedure _, "com.atproto.repo.uploadBlob", {:blob, blob_bytes}, ctx do + XRPC.if_authed ctx do + blob = + Hexpds.Blob.new(blob_bytes, ctx.user) + |> tap(&Hexpds.Blob.save/1) + + response = %{ + blob: %{ + "$type": "blob", + ref: %{ + "$link": to_string(blob.cid) + }, + mimeType: blob.mime_type, + size: byte_size(blob.data) + } + } + + {200, response} + end + end + + XRPC.procedure _, + "com.atproto.repo.createRecord", + %{repo: did, collection: collection, record: record} = params, + ctx do + XRPC.if_authed ctx do + unless did == ctx.user.did do + {401, %{error: "Unauthorized", message: "Not authorized"}} + else + {200, + Hexpds.Repo.create_record( + ctx.user, + record, + collection, + params[:rkey] || "#{Hexpds.Tid.now()}" + )} + end + end + end +end diff --git a/lib/hexpds/service/xrpc.ex b/lib/hexpds/service/xrpc.ex index b415b25..5063163 100644 --- a/lib/hexpds/service/xrpc.ex +++ b/lib/hexpds/service/xrpc.ex @@ -22,4 +22,16 @@ defmodule Hexpds.XRPC do end end end + + defmacro if_authed(ctx, token_type \\ :access, do: block) do + quote do + case unquote(ctx) do + %{user: %Hexpds.User{}, token_type: unquote(token_type)} -> + unquote(block) + _ -> + {401, %{error: "Unauthorized", message: "Not authorized"}} + end + end + end + end diff --git a/lib/main.ex b/lib/main.ex index 5317f33..8c84d96 100644 --- a/lib/main.ex +++ b/lib/main.ex @@ -4,7 +4,7 @@ defmodule Hexpds.Application do @impl Application def start(_type, _args) do Hexpds.Database.Mnesia.create_tables() - + :syn.add_node_to_scopes([:firehose]) Supervisor.start_link( [ {Bandit, plug: Hexpds.Http, scheme: :http, port: Application.get_env(:hexpds, :port)}, diff --git a/mix.exs b/mix.exs index f6e6f12..0f2d7fb 100644 --- a/mix.exs +++ b/mix.exs @@ -42,6 +42,8 @@ defmodule Hexpds.MixProject do {:ecto_sqlite3, "~> 0.15"}, {:matcha, "~> 0.1.10"}, {:witchcraft, "~> 1.0.4"}, + {:websock_adapter, "~> 0.5.6"}, # Websockets + {:syn, "~> 3.3.0"}, # type checking {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}, # for linting diff --git a/mix.lock b/mix.lock index af69438..3aa4bcf 100644 --- a/mix.lock +++ b/mix.lock @@ -9,6 +9,9 @@ "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "comeonin": {:hex, :comeonin, "5.4.0", "246a56ca3f41d404380fc6465650ddaa532c7f98be4bda1b4656b3a37cc13abe", [:mix], [], "hexpm", "796393a9e50d01999d56b7b8420ab0481a7538d0caf80919da493b4a6e51faf1"}, "cors_plug": {:hex, :cors_plug, "3.0.3", "7c3ac52b39624bc616db2e937c282f3f623f25f8d550068b6710e58d04a0e330", [:mix], [{:plug, "~> 1.13", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "3f2d759e8c272ed3835fab2ef11b46bddab8c1ab9528167bd463b6452edf830d"}, + "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, + "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, @@ -18,6 +21,7 @@ "ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"}, "ecto_sqlite3": {:hex, :ecto_sqlite3, "0.15.1", "40f2fbd9e246455f8c42e7e0a77009ef806caa1b3ce6f717b2a0a80e8432fcfd", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.11", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:exqlite, "~> 0.19", [hex: :exqlite, repo: "hexpm", optional: false]}], "hexpm", "28b16e177123c688948357176662bf9ff9084daddf950ef5b6baf3ee93707064"}, "elixir_make": {:hex, :elixir_make, "0.8.3", "d38d7ee1578d722d89b4d452a3e36bcfdc644c618f0d063b874661876e708683", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "5c99a18571a756d4af7a4d89ca75c28ac899e6103af6f223982f09ce44942cc9"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.28.6", "2bbd7a143d3014fc26de9056793e97600ae8978af2ced82c2575f130b7c0d7d7", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bca1441614654710ba37a0e173079273d619f9160cbcc8cd04e6bd59f1ad0e29"}, "ex_multihash": {:hex, :ex_multihash, "2.0.0", "7fb36f842a2ec1c6bbba550f28fcd16d3c62981781b9466c9c1975c43d7db43c", [:mix], [], "hexpm", "66a08a86a1ba00d95736c595d7975696e5691308cdf7770c50b0f84a2a1172b0"}, @@ -38,23 +42,33 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "msgpax": {:hex, :msgpax, "2.4.0", "4647575c87cb0c43b93266438242c21f71f196cafa268f45f91498541148c15d", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "ca933891b0e7075701a17507c61642bf6e0407bb244040d5d0a58597a06369d2"}, "multibase": {:hex, :multibase, "0.0.1", "9243bebe6f7f0f9f873c770ddcddab285cc554a8dd2374b8de6b2add087eccbc", [:mix], [{:base1, "~> 0.1.0", [hex: :base1, repo: "hexpm", optional: false]}, {:base2, "~> 0.1.0", [hex: :base2, repo: "hexpm", optional: false]}, {:basefiftyeight, "~> 0.1.0", [hex: :basefiftyeight, repo: "hexpm", optional: false]}, {:zbase32, "~> 2.0.0", [hex: :zbase32, repo: "hexpm", optional: false]}], "hexpm", "6ea13a19e47da727c2def81b241ad10a39440c5705a4abed18e8310b46f45f51"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "operator": {:hex, :operator, "0.2.1", "4572312bbd3e63a5c237bf15c3a7670d568e3651ea744289130780006e70e5f5", [:mix], [], "hexpm", "1990cc6dc651d7fff04636eef06fc64e6bc1da83a1da890c08ca3432e17e267a"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, + "poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"}, "quark": {:hex, :quark, "2.3.2", "066e0d431440d077684469967f54d732443ea2a48932e0916e974633e8b39c95", [:mix], [], "hexpm", "2f6423779b02afe7e3e4af3cfecfcd94572f2051664d4d8329ffa872d24b10a8"}, + "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "recon": {:hex, :recon, "2.5.5", "c108a4c406fa301a529151a3bb53158cadc4064ec0c5f99b03ddb8c0e4281bdf", [:mix, :rebar3], [], "hexpm", "632a6f447df7ccc1a4a10bdcfce71514412b16660fe59deca0fcf0aa3c054404"}, + "riverside": {:hex, :riverside, "2.2.1", "059e2971d44500f98c2bc2ef150875777ced3e2b37aee91e84d6b11d95698f57", [:mix], [{:cowboy, "~> 2.9", [hex: :cowboy, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.12", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:poison, "~> 5.0", [hex: :poison, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}, {:socket, "~> 0.3", [hex: :socket, repo: "hexpm", optional: false]}, {:the_end, "~> 1.1", [hex: :the_end, repo: "hexpm", optional: false]}], "hexpm", "85b682eb6cd7ac92b0ff43772dc633e5662a511b02007794d27d635d37a009e4"}, "rustler": {:hex, :rustler, "0.32.1", "f4cf5a39f9e85d182c0a3f75fa15b5d0add6542ab0bf9ceac6b4023109ebd3fc", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "b96be75526784f86f6587f051bc8d6f4eaff23d6e0f88dbcfe4d5871f52946f7"}, + "secure_random": {:hex, :secure_random, "0.5.1", "c5532b37c89d175c328f5196a0c2a5680b15ebce3e654da37129a9fe40ebf51b", [:mix], [], "hexpm", "1b9754f15e3940a143baafd19da12293f100044df69ea12db5d72878312ae6ab"}, + "socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm", "f82ea9833ef49dde272e6568ab8aac657a636acb4cf44a7de8a935acb8957c2e"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "syn": {:hex, :syn, "3.3.0", "4684a909efdfea35ce75a9662fc523e4a8a4e8169a3df275e4de4fa63f99c486", [:rebar3], [], "hexpm", "e58ee447bc1094bdd21bf0acc102b1fbf99541a508cd48060bf783c245eaf7d6"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "the_end": {:hex, :the_end, "1.1.0", "cd11af29051d8823b2e4d0109aa0bf0f08a00ce60ca36080ad2014e11e7f9d52", [:mix], [], "hexpm", "e1bfadb140ca43d9010a0da1c159782697f96ef50c68dcd7c59cb9215a4779d7"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "type_class": {:hex, :type_class, "1.2.8", "349db84be8c664e119efaae1a09a44b113bc8e81af1d032f4e3e38feef4fac32", [:mix], [{:exceptional, "~> 2.1", [hex: :exceptional, repo: "hexpm", optional: false]}], "hexpm", "bb93de2cacfd6f0ee43f4616f7a139816a73deba4ae8ee3364bcfa4abe3eef3e"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "varint": {:hex, :varint, "1.4.0", "b7405c8a99db7b95d4341fa9cb15e7c3af6c8dda43e21bbe1c4a9cdff50b6502", [:mix], [], "hexpm", "0fd461901b7120c03467530dff3c58fa3475328fd75ba72c7d3cbf13bce6b0d2"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.6", "0437fe56e093fd4ac422de33bf8fc89f7bc1416a3f2d732d8b2c8fd54792fe60", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "e04378d26b0af627817ae84c92083b7e97aca3121196679b73c73b99d0d133ea"}, "witchcraft": {:hex, :witchcraft, "1.0.4", "8733ac0ee769d4d2f73610de5a2b601a4ccbe385d1fca6419280f2511d21d0c9", [:mix], [{:exceptional, "~> 2.1", [hex: :exceptional, repo: "hexpm", optional: false]}, {:operator, "~> 0.2", [hex: :operator, repo: "hexpm", optional: false]}, {:quark, "~> 2.2", [hex: :quark, repo: "hexpm", optional: false]}, {:type_class, "~> 1.2", [hex: :type_class, repo: "hexpm", optional: false]}], "hexpm", "a380f439f1962d2e56cdad874ed7eb4612ddd6ec5ee3c6ad0c5d63e60539e6b0"}, "zbase32": {:hex, :zbase32, "2.0.0", "5a61d5ee8f39092d4a243da2a42b5b4339ef226d9b182603f63d5a3f16d192ee", [:mix], [], "hexpm", "798f81895658f9773e1dcf30ba3c118547f482502c5e1e19e72752f9a6f23e44"}, } From 8c341b7173a9a564647ea811dcd6414d88316dc1 Mon Sep 17 00:00:00 2001 From: Shreyan Jain Date: Tue, 24 Sep 2024 19:57:07 -0700 Subject: [PATCH 10/10] this will be useful later on in the firehose --- lib/hexpds/firehose/firehose.ex | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 lib/hexpds/firehose/firehose.ex diff --git a/lib/hexpds/firehose/firehose.ex b/lib/hexpds/firehose/firehose.ex new file mode 100644 index 0000000..951ee48 --- /dev/null +++ b/lib/hexpds/firehose/firehose.ex @@ -0,0 +1,16 @@ +defmodule Hexpds.Firehose do + defp cbor_concat(header, op) do + [header, op] + |> Enum.map(&Hexpds.DagCBOR.encode!/1) + |> Enum.reverse() + |> Enum.reduce(&<>/2) + end + + def error(type, message \\ nil) do + cbor_concat(%{op: -1}, optional_join(%{error: type}, if(message, do: %{message: message}))) + end + + defp optional_join(map, nil), do: map + defp optional_join(map1, map2), do: Map.merge(map1, map2) + +end