diff --git a/.gitignore b/.gitignore index 4aaa98a..d61d804 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ dev.db* rpc.toml daemon.toml grpcurl* +config.toml test/.terraform* test/local.tfstate* diff --git a/.sqlx/query-127e6b12137e638062be0641dd2cecb72906706050357cef77290a623ca24037.json b/.sqlx/query-127e6b12137e638062be0641dd2cecb72906706050357cef77290a623ca24037.json new file mode 100644 index 0000000..4f7f8b8 --- /dev/null +++ b/.sqlx/query-127e6b12137e638062be0641dd2cecb72906706050357cef77290a623ca24037.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT id, email, auth_provider, auth_provider_id\n FROM users WHERE auth_provider_id = $1;\n ", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "email", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "auth_provider", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "auth_provider_id", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "127e6b12137e638062be0641dd2cecb72906706050357cef77290a623ca24037" +} diff --git a/.sqlx/query-c467aa589b04d75815556bf41a142728fd69d73846807cf230a5a95add35c73e.json b/.sqlx/query-c467aa589b04d75815556bf41a142728fd69d73846807cf230a5a95add35c73e.json new file mode 100644 index 0000000..ad26d9c --- /dev/null +++ b/.sqlx/query-c467aa589b04d75815556bf41a142728fd69d73846807cf230a5a95add35c73e.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO users (id, email, auth_provider, auth_provider_id)\n VALUES ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "c467aa589b04d75815556bf41a142728fd69d73846807cf230a5a95add35c73e" +} diff --git a/Cargo.lock b/Cargo.lock index b9d751e..0e25b41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,6 +99,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -118,7 +124,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "itoa", "matchit", "memchr", @@ -127,7 +133,7 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", "tower", "tower-layer", "tower-service", @@ -213,6 +219,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -384,6 +396,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -408,7 +429,7 @@ dependencies = [ [[package]] name = "dmtri" version = "0.1.0" -source = "git+https://github.com/demeter-run/specs.git#8262e943d03c729c965b0c4dcc8f4a0cf30cbeef" +source = "git+https://github.com/demeter-run/specs.git#838880d359c1505b82ac702c1997cea9d3a37156" dependencies = [ "bytes", "pbjson", @@ -445,6 +466,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -488,6 +518,7 @@ dependencies = [ "dmtri", "dotenv", "futures", + "jsonwebtoken", "k8s-openapi", "kube", "mockall", @@ -495,6 +526,7 @@ dependencies = [ "protoc-wkt", "rand", "rdkafka", + "reqwest", "serde", "serde_json", "sqlx", @@ -535,6 +567,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -667,8 +714,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -696,6 +745,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -869,15 +937,15 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.29" +version = "0.14.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" +checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -893,13 +961,14 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -920,7 +989,7 @@ dependencies = [ "futures-util", "headers", "http 1.1.0", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-rustls", "hyper-util", "pin-project-lite", @@ -938,7 +1007,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-util", "log", "rustls 0.23.11", @@ -947,6 +1016,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots 0.26.3", ] [[package]] @@ -955,7 +1025,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.29", + "hyper 0.14.30", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -967,13 +1037,29 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.0", + "hyper 1.4.1", "hyper-util", "pin-project-lite", "tokio", "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.6" @@ -985,7 +1071,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.4.0", + "hyper 1.4.1", "pin-project-lite", "socket2", "tokio", @@ -1024,6 +1110,12 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "itertools" version = "0.10.5" @@ -1048,6 +1140,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "json5" version = "0.4.1" @@ -1074,6 +1175,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ae10193d25051e74945f1ea2d0b42e03cc3b890f7e4cc5faa44997d808193f" +dependencies = [ + "base64 0.21.7", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k8s-openapi" version = "0.22.0" @@ -1113,7 +1229,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-http-proxy", "hyper-rustls", "hyper-timeout 0.5.1", @@ -1319,6 +1435,23 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -1339,6 +1472,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1356,6 +1499,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1432,12 +1581,50 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.70", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1674,6 +1861,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1827,6 +2020,53 @@ version = "1.0.0+3.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1d0fdc23ea945d58449259496ba12d3b520da1a45ce5011f631c990add9029" +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.23.11", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash", + "rustls 0.23.11", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -1958,6 +2198,54 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "reqwest" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls 0.23.11", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "system-configuration", + "tokio", + "tokio-native-tls", + "tokio-rustls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.26.3", + "winreg", +] + [[package]] name = "ring" version = "0.17.8" @@ -2021,6 +2309,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.38.34" @@ -2239,6 +2533,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -2302,6 +2608,18 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "slab" version = "0.4.9" @@ -2408,7 +2726,7 @@ dependencies = [ "tokio-stream", "tracing", "url", - "webpki-roots", + "webpki-roots 0.25.4", ] [[package]] @@ -2597,6 +2915,33 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.10.1" @@ -2645,6 +2990,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -2708,6 +3084,16 @@ dependencies = [ "syn 2.0.70", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -2799,10 +3185,10 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "hyper-timeout 0.4.1", "percent-encoding", "pin-project", @@ -3075,12 +3461,97 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.70", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.70", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -3282,6 +3753,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index 751c6a0..c8d0533 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ protoc-wkt = "1.0.0" config = { version = "0.14.0", features = ["toml"] } rdkafka = "0.36.2" uuid = { version = "1.10.0", features = ["v4"] } +reqwest = { version = "0.12.5", features = ["json", "rustls-tls-webpki-roots"] } +jsonwebtoken = "9.3.0" [dev-dependencies] mockall = "0.12.1" diff --git a/README.md b/README.md index 6078e5c..76a664f 100644 --- a/README.md +++ b/README.md @@ -35,14 +35,20 @@ If there are updates in the schemas, execute the command below to update the sql cargo sqlx prepare ``` -### Queue dependence +### Dependences -Start the dependences with the command below +The system is connected using the kafka protocol, so it's necessary to set up a Kafka instance. There is an example using redpanda and docker in the examples folder. To start it's necessary to run the command below. ```sh docker compose up -d ``` +The fabric is using a default topic which is called `events`. After the redpanda is running, the docker will expose port 8080 to access the console, it's necessary to open the console and create the topic. + +``` +http://localhost:8080 +``` + ### Run binaries Command to run RPC diff --git a/examples/README.md b/examples/README.md index 4271598..d61f891 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,12 +4,15 @@ Two binaries need to be executed, `rpc` which will allow external requests to cr ## rpc -It's possible to create a toml file as config and set the file path in the env `RPC_CONFIG`, but it's possible to set the config using the prefix `RPC_` +It's possible to create a toml file as config and set the file path in the env `RPC_CONFIG`, but it's possible to set the config using the prefix `RPC_`. The auth url is the endpoint to integrate with auth0. ``` addr="0.0.0.0:5000" db_path="dev.db" brokers="localhost:19092" + +[auth] +url="" ``` ## daemon diff --git a/examples/config.toml b/examples/config.toml index 783caaf..b75c809 100644 --- a/examples/config.toml +++ b/examples/config.toml @@ -1,3 +1,6 @@ addr="0.0.0.0:5000" db_path="dev.db" brokers="localhost:19092" + +[auth] +url="" diff --git a/src/bin/rpc.rs b/src/bin/rpc.rs index 7839f00..87ef027 100644 --- a/src/bin/rpc.rs +++ b/src/bin/rpc.rs @@ -2,6 +2,7 @@ use std::env; use anyhow::Result; use dotenv::dotenv; +use fabric::drivers::{event::EventConfig, grpc::GrpcConfig}; use serde::Deserialize; use tracing::Level; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -23,19 +24,24 @@ async fn main() -> Result<()> { let config = Config::new()?; futures::future::try_join( - fabric::drivers::grpc::server(&config.addr, &config.db_path, &config.brokers), - fabric::drivers::event::subscribe(&config.db_path, &config.brokers), + fabric::drivers::grpc::server(config.clone().into()), + fabric::drivers::event::subscribe(config.clone().into()), ) .await?; Ok(()) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] +struct Auth { + url: String, +} +#[derive(Debug, Clone, Deserialize)] struct Config { addr: String, db_path: String, brokers: String, + auth: Auth, } impl Config { pub fn new() -> Result { @@ -51,3 +57,23 @@ impl Config { Ok(config) } } + +impl From for GrpcConfig { + fn from(value: Config) -> Self { + Self { + addr: value.addr, + brokers: value.brokers, + db_path: value.db_path, + auth_url: value.auth.url, + } + } +} + +impl From for EventConfig { + fn from(value: Config) -> Self { + Self { + brokers: value.brokers, + db_path: value.db_path, + } + } +} diff --git a/src/domain/daemon/namespace.rs b/src/domain/daemon/namespace.rs index c3a5b87..c75da6b 100644 --- a/src/domain/daemon/namespace.rs +++ b/src/domain/daemon/namespace.rs @@ -1,4 +1,4 @@ -use anyhow::{Error, Result}; +use anyhow::{bail, Result}; use k8s_openapi::api::core::v1::Namespace; use kube::{api::ObjectMeta, ResourceExt}; use std::sync::Arc; @@ -11,7 +11,7 @@ pub async fn create_namespace( project: ProjectCreated, ) -> Result<()> { if cluster.find_by_name(&project.slug).await?.is_some() { - return Err(Error::msg("namespace alread exist")); + bail!("namespace alread exist") } let ns: Namespace = project.into(); diff --git a/src/domain/events.rs b/src/domain/events.rs index c27e7d6..170ca7a 100644 --- a/src/domain/events.rs +++ b/src/domain/events.rs @@ -7,8 +7,11 @@ pub struct ProjectCreated { pub slug: String, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AccountCreated { - pub name: String, +pub struct UserCreated { + pub id: String, + pub email: String, + pub auth_provider: String, + pub auth_provider_id: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PortCreated { @@ -23,7 +26,7 @@ pub struct PortCreated { #[allow(clippy::enum_variant_names)] pub enum Event { ProjectCreated(ProjectCreated), - AccountCreated(AccountCreated), + UserCreated(UserCreated), PortCreated(PortCreated), } diff --git a/src/domain/management/account.rs b/src/domain/management/account.rs deleted file mode 100644 index c0c55e5..0000000 --- a/src/domain/management/account.rs +++ /dev/null @@ -1,111 +0,0 @@ -use anyhow::Result; -use std::sync::Arc; -use tracing::info; - -use crate::domain::events::{AccountCreated, Event, EventBridge}; - -pub async fn create( - _cache: Arc, - event: Arc, - account: Account, -) -> Result<()> { - let account_event = Event::AccountCreated(account.clone().into()); - - event.dispatch(account_event).await?; - info!(account = account.name, "new account created"); - - Ok(()) -} - -//TODO: remove later -#[allow(dead_code)] -pub async fn create_cache(cache: Arc, account: AccountCreated) -> Result<()> { - cache.create(&account.into()).await?; - - Ok(()) -} - -#[derive(Debug, Clone)] -pub struct Account { - pub name: String, -} -impl Account { - pub fn new(name: String) -> Self { - Self { name } - } -} -impl From for Account { - fn from(value: AccountCreated) -> Self { - Self { name: value.name } - } -} -impl From for AccountCreated { - fn from(value: Account) -> Self { - AccountCreated { name: value.name } - } -} - -#[async_trait::async_trait] -pub trait AccountCache: Send + Sync { - async fn create(&self, account: &Account) -> Result<()>; -} - -#[cfg(test)] -mod tests { - use mockall::mock; - - use super::*; - - mock! { - pub FakeAccountCache { } - - #[async_trait::async_trait] - impl AccountCache for FakeAccountCache { - async fn create(&self, account: &Account) -> Result<()>; - } - } - - mock! { - pub FakeEventBridge { } - - #[async_trait::async_trait] - impl EventBridge for FakeEventBridge { - async fn dispatch(&self, event: Event) -> Result<()>; - } - } - - impl Default for Account { - fn default() -> Self { - Self { - name: "New Account".into(), - } - } - } - - #[tokio::test] - async fn it_should_create_account() { - let account_cache = MockFakeAccountCache::new(); - let mut event_bridge = MockFakeEventBridge::new(); - event_bridge.expect_dispatch().return_once(|_| Ok(())); - - let account = Account::default(); - - let result = create(Arc::new(account_cache), Arc::new(event_bridge), account).await; - if let Err(err) = result { - unreachable!("{err}") - } - } - - #[tokio::test] - async fn it_should_create_account_cache() { - let mut account_cache = MockFakeAccountCache::new(); - account_cache.expect_create().return_once(|_| Ok(())); - - let account = Account::default(); - - let result = create_cache(Arc::new(account_cache), account.into()).await; - if let Err(err) = result { - unreachable!("{err}") - } - } -} diff --git a/src/domain/management/mod.rs b/src/domain/management/mod.rs index b4c783b..cbfb54a 100644 --- a/src/domain/management/mod.rs +++ b/src/domain/management/mod.rs @@ -1,3 +1,3 @@ -pub mod account; pub mod port; pub mod project; +pub mod user; diff --git a/src/domain/management/port.rs b/src/domain/management/port.rs index d0c8c6a..8fa0860 100644 --- a/src/domain/management/port.rs +++ b/src/domain/management/port.rs @@ -1,4 +1,4 @@ -use anyhow::{Error, Result}; +use anyhow::{bail, Result}; use std::sync::Arc; use tracing::info; use uuid::Uuid; @@ -13,7 +13,7 @@ pub async fn create( port: Port, ) -> Result<()> { if project_cache.find_by_slug(&port.project).await?.is_none() { - return Err(Error::msg("Invalid project")); + bail!("Invalid project") } let port_event = Event::PortCreated(port.clone().into()); diff --git a/src/domain/management/project.rs b/src/domain/management/project.rs index 393adda..0ecb52e 100644 --- a/src/domain/management/project.rs +++ b/src/domain/management/project.rs @@ -1,4 +1,4 @@ -use anyhow::{Error, Result}; +use anyhow::{bail, Result}; use rand::{distributions::Alphanumeric, Rng}; use std::sync::Arc; use tracing::info; @@ -11,7 +11,7 @@ pub async fn create( project: Project, ) -> Result<()> { if cache.find_by_slug(&project.slug).await?.is_some() { - return Err(Error::msg("invalid project slug")); + bail!("invalid project slug") } let namespace = Event::ProjectCreated(project.clone().into()); diff --git a/src/domain/management/user.rs b/src/domain/management/user.rs new file mode 100644 index 0000000..0ea52e9 --- /dev/null +++ b/src/domain/management/user.rs @@ -0,0 +1,261 @@ +use anyhow::{bail, Result}; +use std::sync::Arc; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::domain::events::{Event, EventBridge, UserCreated}; + +const AUTH_PROVIDER: &str = "auth0"; + +pub async fn create( + cache: Arc, + auth: Arc, + event: Arc, + token: String, +) -> Result { + let verify_result = auth.verify(&token).await; + if let Err(err) = verify_result { + error!(error = err.to_string(), "invalid access token"); + bail!("invalid access token"); + } + + let auth_provider_id = verify_result.unwrap(); + if let Some(user) = cache.get_by_auth_provider_id(&auth_provider_id).await? { + return Ok(user); + } + + let email_result = auth.get_profile(&token).await; + if let Err(err) = email_result { + error!(error = err.to_string(), "error to get user info"); + bail!("invalid access token"); + } + let email = email_result.unwrap(); + + let user = User::new(email, auth_provider_id); + let user_event = Event::UserCreated(user.clone().into()); + + event.dispatch(user_event).await?; + info!(user = user.id, "new user created"); + + Ok(user) +} + +pub async fn create_cache(cache: Arc, user: UserCreated) -> Result<()> { + if let Some(user) = cache + .get_by_auth_provider_id(&user.auth_provider_id) + .await? + { + info!(user = user.id, "user already exists"); + return Ok(()); + } + + cache.create(&user.into()).await?; + + Ok(()) +} + +#[derive(Debug, Clone)] +pub struct User { + pub id: String, + pub email: String, + pub auth_provider: String, + pub auth_provider_id: String, +} +impl User { + pub fn new(email: String, auth_provider_id: String) -> Self { + let id = Uuid::new_v4().to_string(); + + Self { + id, + email, + auth_provider: AUTH_PROVIDER.into(), + auth_provider_id, + } + } +} +impl From for UserCreated { + fn from(value: User) -> Self { + UserCreated { + id: value.id, + email: value.email, + auth_provider: value.auth_provider, + auth_provider_id: value.auth_provider_id, + } + } +} +impl From for User { + fn from(value: UserCreated) -> Self { + User { + id: value.id, + email: value.email, + auth_provider: value.auth_provider, + auth_provider_id: value.auth_provider_id, + } + } +} + +#[async_trait::async_trait] +pub trait UserCache: Send + Sync { + async fn create(&self, user: &User) -> Result<()>; + async fn get_by_auth_provider_id(&self, id: &str) -> Result>; +} + +#[async_trait::async_trait] +pub trait AuthProvider: Send + Sync { + async fn verify(&self, token: &str) -> Result; + async fn get_profile(&self, token: &str) -> Result; +} + +#[cfg(test)] +mod tests { + use mockall::mock; + + use super::*; + + mock! { + pub FakeUserCache { } + + #[async_trait::async_trait] + impl UserCache for FakeUserCache { + async fn create(&self, user: &User) -> Result<()>; + async fn get_by_auth_provider_id(&self, id: &str) -> Result>; + } + } + + mock! { + pub FakeAuthProvider { } + + #[async_trait::async_trait] + impl AuthProvider for FakeAuthProvider { + async fn verify(&self, token: &str) -> Result; + async fn get_profile(&self, token: &str) -> Result; + } + } + + mock! { + pub FakeEventBridge { } + + #[async_trait::async_trait] + impl EventBridge for FakeEventBridge { + async fn dispatch(&self, event: Event) -> Result<()>; + } + } + + impl Default for User { + fn default() -> Self { + Self { + id: Uuid::new_v4().into(), + email: "cw@txpipe.io".into(), + auth_provider: AUTH_PROVIDER.into(), + auth_provider_id: "google-oauth2|xxx".into(), + } + } + } + + #[tokio::test] + async fn it_should_create_user() { + let mut auth_provider = MockFakeAuthProvider::new(); + auth_provider + .expect_verify() + .return_once(|_| Ok("google-oauth2|xxx".into())); + auth_provider + .expect_get_profile() + .return_once(|_| Ok("cw@txpipe.io".into())); + + let mut user_cache = MockFakeUserCache::new(); + user_cache + .expect_get_by_auth_provider_id() + .return_once(|_| Ok(None)); + user_cache.expect_create().return_once(|_| Ok(())); + + let mut event_bridge = MockFakeEventBridge::new(); + event_bridge.expect_dispatch().return_once(|_| Ok(())); + + let result = create( + Arc::new(user_cache), + Arc::new(auth_provider), + Arc::new(event_bridge), + Default::default(), + ) + .await; + if let Err(err) = result { + unreachable!("{err}") + } + } + #[tokio::test] + async fn it_should_return_user_existing() { + let mut auth_provider = MockFakeAuthProvider::new(); + auth_provider + .expect_verify() + .return_once(|_| Ok("google-oauth2|xxx".into())); + + let mut user_cache = MockFakeUserCache::new(); + user_cache + .expect_get_by_auth_provider_id() + .return_once(|_| Ok(Some(User::default()))); + + let event_bridge = MockFakeEventBridge::new(); + + let result = create( + Arc::new(user_cache), + Arc::new(auth_provider), + Arc::new(event_bridge), + Default::default(), + ) + .await; + if let Err(err) = result { + unreachable!("{err}") + } + } + #[tokio::test] + async fn it_should_fail_when_invalid_token() { + let mut auth_provider = MockFakeAuthProvider::new(); + auth_provider + .expect_verify() + .return_once(|_| bail!("invalid token")); + + let user_cache = MockFakeUserCache::new(); + let event_bridge = MockFakeEventBridge::new(); + + let result = create( + Arc::new(user_cache), + Arc::new(auth_provider), + Arc::new(event_bridge), + Default::default(), + ) + .await; + if result.is_ok() { + unreachable!("it should fail when invalid token") + } + } + + #[tokio::test] + async fn it_should_create_user_cache() { + let mut user_cache = MockFakeUserCache::new(); + user_cache + .expect_get_by_auth_provider_id() + .return_once(|_| Ok(None)); + user_cache.expect_create().return_once(|_| Ok(())); + + let user = User::default(); + + let result = create_cache(Arc::new(user_cache), user.into()).await; + if let Err(err) = result { + unreachable!("{err}") + } + } + #[tokio::test] + async fn it_should_ignore_create_user_cache() { + let mut user_cache = MockFakeUserCache::new(); + user_cache + .expect_get_by_auth_provider_id() + .return_once(|_| Ok(Some(User::default()))); + + let user = User::default(); + + let result = create_cache(Arc::new(user_cache), user.into()).await; + if let Err(err) = result { + unreachable!("{err}") + } + } +} diff --git a/src/driven/auth0/mod.rs b/src/driven/auth0/mod.rs new file mode 100644 index 0000000..dc6d6b9 --- /dev/null +++ b/src/driven/auth0/mod.rs @@ -0,0 +1,84 @@ +use anyhow::{bail, Result}; +use jsonwebtoken::jwk::{AlgorithmParameters, JwkSet}; +use jsonwebtoken::{decode, decode_header, DecodingKey, Validation}; +use serde::Deserialize; + +use crate::domain::management::user; + +pub struct Auth0Provider { + client: reqwest::Client, + url: String, +} +impl Auth0Provider { + pub fn new(url: &str) -> Self { + let client = reqwest::Client::new(); + Self { + client, + url: url.into(), + } + } +} + +#[async_trait::async_trait] +impl user::AuthProvider for Auth0Provider { + async fn verify(&self, token: &str) -> Result { + let jwks_request = self + .client + .get(format!("{}/.well-known/jwks.json", self.url)) + .build()?; + + let jwks_response = self.client.execute(jwks_request).await?; + let jwks: JwkSet = jwks_response.json().await?; + + let header = decode_header(token)?; + + let Some(kid) = header.kid else { + bail!("token doesn't have a `kid` header field"); + }; + let Some(jwk) = jwks.find(&kid) else { + bail!("no matching jwk found for the given kid"); + }; + + let decoding_key = match &jwk.algorithm { + AlgorithmParameters::RSA(rsa) => DecodingKey::from_rsa_components(&rsa.n, &rsa.e)?, + _ => bail!("algorithm should be a RSA"), + }; + + let validation = { + let mut validation = Validation::new(header.alg); + validation.set_audience(&["demeter-api"]); + validation.validate_exp = true; + validation + }; + + let decoded_token = decode::(token, &decoding_key, &validation)?; + + Ok(decoded_token.claims.sub) + } + async fn get_profile(&self, token: &str) -> Result { + let profile_request = self + .client + .get(format!("{}/userinfo", self.url)) + .header("Authorization", format!("Bearer {token}")) + .build()?; + + let profile_response = self + .client + .execute(profile_request) + .await? + .error_for_status()?; + + let profile = profile_response.json::().await?; + + Ok(profile.email) + } +} + +#[derive(Deserialize)] +struct Claims { + sub: String, +} +#[derive(Deserialize)] +struct UserInfo { + email: String, +} diff --git a/src/driven/cache/migrations/20240606_tables.sql b/src/driven/cache/migrations/20240606_tables.sql index 384969b..cd3e9b4 100644 --- a/src/driven/cache/migrations/20240606_tables.sql +++ b/src/driven/cache/migrations/20240606_tables.sql @@ -4,9 +4,16 @@ CREATE TABLE IF NOT EXISTS projects ( ); CREATE TABLE IF NOT EXISTS ports ( - id TEXT PRIMARY KEY, + id TEXT PRIMARY KEY NOT NULL, project TEXT NOT NULL, kind TEXT NOT NULL, data TEXT NOT NULL, FOREIGN KEY(project) REFERENCES projects(slug) ); + +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY NOT NULL, + email TEXT NOT NULL, + auth_provider TEXT NOT NULL, + auth_provider_id TEXT NOT NULL UNIQUE +); diff --git a/src/driven/cache/mod.rs b/src/driven/cache/mod.rs index f9f6412..d5f7d28 100644 --- a/src/driven/cache/mod.rs +++ b/src/driven/cache/mod.rs @@ -3,6 +3,7 @@ use std::path::Path; pub mod port; pub mod project; +pub mod user; pub struct SqliteCache { db: sqlx::sqlite::SqlitePool, diff --git a/src/driven/cache/user.rs b/src/driven/cache/user.rs new file mode 100644 index 0000000..18bb3b8 --- /dev/null +++ b/src/driven/cache/user.rs @@ -0,0 +1,60 @@ +use anyhow::Result; +use std::sync::Arc; + +use crate::domain::management::user::{User, UserCache}; + +use super::SqliteCache; + +pub struct SqliteUserCache { + sqlite: Arc, +} +impl SqliteUserCache { + pub fn new(sqlite: Arc) -> Self { + Self { sqlite } + } +} +#[async_trait::async_trait] +impl UserCache for SqliteUserCache { + async fn create(&self, user: &User) -> Result<()> { + sqlx::query!( + r#" + INSERT INTO users (id, email, auth_provider, auth_provider_id) + VALUES ($1, $2, $3, $4) + "#, + user.id, + user.email, + user.auth_provider, + user.auth_provider_id + ) + .execute(&self.sqlite.db) + .await?; + + Ok(()) + } + async fn get_by_auth_provider_id(&self, id: &str) -> Result> { + let result = sqlx::query!( + r#" + SELECT id, email, auth_provider, auth_provider_id + FROM users WHERE auth_provider_id = $1; + "#, + id + ) + .fetch_optional(&self.sqlite.db) + .await?; + + if result.is_none() { + return Ok(None); + } + + let result = result.unwrap(); + + let user = User { + id: result.id, + email: result.email, + auth_provider: result.auth_provider, + auth_provider_id: result.auth_provider_id, + }; + + Ok(Some(user)) + } +} diff --git a/src/driven/mod.rs b/src/driven/mod.rs index 9b9a62c..2f9e7ed 100644 --- a/src/driven/mod.rs +++ b/src/driven/mod.rs @@ -1,3 +1,4 @@ +pub mod auth0; pub mod cache; pub mod k8s; pub mod kafka; diff --git a/src/drivers/event/mod.rs b/src/drivers/event/mod.rs index ff72b9a..bf78a0a 100644 --- a/src/drivers/event/mod.rs +++ b/src/drivers/event/mod.rs @@ -9,22 +9,25 @@ use tracing::{error, info}; use crate::{ domain::{ events::Event, - management::{port, project::create_cache}, + management::{port, project::create_cache, user}, + }, + driven::cache::{ + port::SqlitePortCache, project::SqliteProjectCache, user::SqliteUserCache, SqliteCache, }, - driven::cache::{port::SqlitePortCache, project::SqliteProjectCache, SqliteCache}, }; -pub async fn subscribe(db_path: &str, brokers: &str) -> Result<()> { - let sqlite_cache = Arc::new(SqliteCache::new(Path::new(db_path)).await?); +pub async fn subscribe(config: EventConfig) -> Result<()> { + let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&config.db_path)).await?); sqlite_cache.migrate().await?; let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache.clone())); let port_cache = Arc::new(SqlitePortCache::new(sqlite_cache.clone())); + let user_cache = Arc::new(SqliteUserCache::new(sqlite_cache.clone())); let topic = String::from("events"); let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", brokers) + .set("bootstrap.servers", &config.brokers) .set("group.id", "cache") .create()?; @@ -39,11 +42,13 @@ pub async fn subscribe(db_path: &str, brokers: &str) -> Result<()> { let event: Event = serde_json::from_slice(payload)?; match event { Event::ProjectCreated(namespace) => { - create_cache(project_cache.clone(), namespace).await?; + create_cache(project_cache.clone(), namespace).await? + } + Event::UserCreated(user) => { + user::create_cache(user_cache.clone(), user).await? } - Event::AccountCreated(_) => todo!(), Event::PortCreated(port) => { - port::create_cache(port_cache.clone(), port).await?; + port::create_cache(port_cache.clone(), port).await? } }; consumer.commit_message(&message, CommitMode::Async)?; @@ -52,3 +57,8 @@ pub async fn subscribe(db_path: &str, brokers: &str) -> Result<()> { }; } } + +pub struct EventConfig { + pub db_path: String, + pub brokers: String, +} diff --git a/src/drivers/grpc/account.rs b/src/drivers/grpc/account.rs deleted file mode 100644 index 305467f..0000000 --- a/src/drivers/grpc/account.rs +++ /dev/null @@ -1,46 +0,0 @@ -use dmtri::demeter::ops::v1alpha as proto; -use std::sync::Arc; -use tonic::{async_trait, Status}; - -use crate::domain::{ - events::EventBridge, - management::{ - self, - account::{Account, AccountCache}, - }, -}; - -pub struct AccountServiceImpl { - pub cache: Arc, - pub event: Arc, -} - -//TODO: remove later -#[allow(dead_code)] -impl AccountServiceImpl { - pub fn new(cache: Arc, event: Arc) -> Self { - Self { cache, event } - } -} - -#[async_trait] -impl proto::account_service_server::AccountService for AccountServiceImpl { - async fn create_account( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let req = request.into_inner(); - - let account = Account::new(req.name); - let result = - management::account::create(self.cache.clone(), self.event.clone(), account.clone()) - .await; - - if let Err(err) = result { - return Err(Status::failed_precondition(err.to_string())); - } - - let message = proto::CreateAccountResponse { name: account.name }; - Ok(tonic::Response::new(message)) - } -} diff --git a/src/drivers/grpc/mod.rs b/src/drivers/grpc/mod.rs index fd60b9a..7d53047 100644 --- a/src/drivers/grpc/mod.rs +++ b/src/drivers/grpc/mod.rs @@ -1,5 +1,6 @@ use anyhow::Result; use dmtri::demeter::ops::v1alpha::port_service_server::PortServiceServer; +use dmtri::demeter::ops::v1alpha::user_service_server::UserServiceServer; use std::net::SocketAddr; use std::str::FromStr; use std::{path::Path, sync::Arc}; @@ -8,18 +9,23 @@ use tracing::info; use dmtri::demeter::ops::v1alpha::project_service_server::ProjectServiceServer; +use crate::driven::auth0::Auth0Provider; +use crate::driven::cache::user::SqliteUserCache; use crate::driven::cache::{project::SqliteProjectCache, SqliteCache}; use crate::driven::kafka::KafkaProducer; -mod account; mod port; mod project; +mod user; -pub async fn server(addr: &str, db_path: &str, brokers: &str) -> Result<()> { - let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&db_path)).await?); - let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache)); +pub async fn server(config: GrpcConfig) -> Result<()> { + let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&config.db_path)).await?); + let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache.clone())); + let user_cache = Arc::new(SqliteUserCache::new(sqlite_cache.clone())); - let event_bridge = Arc::new(KafkaProducer::new(brokers, "events")?); + let event_bridge = Arc::new(KafkaProducer::new(&config.brokers, "events")?); + + let auth_provider = Arc::new(Auth0Provider::new(&config.auth_url)); let reflection = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(dmtri::demeter::ops::v1alpha::FILE_DESCRIPTOR_SET) @@ -34,16 +40,31 @@ pub async fn server(addr: &str, db_path: &str, brokers: &str) -> Result<()> { let port_inner = port::PortServiceImpl::new(project_cache.clone(), event_bridge.clone()); let port_service = PortServiceServer::new(port_inner); - let address = SocketAddr::from_str(addr)?; + let user_inner = user::UserServiceImpl::new( + user_cache.clone(), + auth_provider.clone(), + event_bridge.clone(), + ); + let user_service = UserServiceServer::new(user_inner); + + let address = SocketAddr::from_str(&config.addr)?; - info!(address = addr, "Server running"); + info!(address = config.addr, "Server running"); Server::builder() .add_service(reflection) .add_service(project_service) .add_service(port_service) + .add_service(user_service) .serve(address) .await?; Ok(()) } + +pub struct GrpcConfig { + pub addr: String, + pub db_path: String, + pub brokers: String, + pub auth_url: String, +} diff --git a/src/drivers/grpc/user.rs b/src/drivers/grpc/user.rs new file mode 100644 index 0000000..101886c --- /dev/null +++ b/src/drivers/grpc/user.rs @@ -0,0 +1,58 @@ +use dmtri::demeter::ops::v1alpha as proto; +use std::sync::Arc; +use tonic::{async_trait, Status}; + +use crate::domain::{ + events::EventBridge, + management::{ + self, + user::{AuthProvider, UserCache}, + }, +}; + +pub struct UserServiceImpl { + pub cache: Arc, + pub auth: Arc, + pub event: Arc, +} + +//TODO: remove later +#[allow(dead_code)] +impl UserServiceImpl { + pub fn new( + cache: Arc, + auth: Arc, + event: Arc, + ) -> Self { + Self { cache, auth, event } + } +} + +#[async_trait] +impl proto::user_service_server::UserService for UserServiceImpl { + async fn create_user( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let req = request.into_inner(); + + let result = management::user::create( + self.cache.clone(), + self.auth.clone(), + self.event.clone(), + req.token, + ) + .await; + + if let Err(err) = result { + return Err(Status::failed_precondition(err.to_string())); + } + let user = result.unwrap(); + + let message = proto::CreateUserResponse { + id: user.id, + email: user.email, + }; + Ok(tonic::Response::new(message)) + } +} diff --git a/src/drivers/monitor/mod.rs b/src/drivers/monitor/mod.rs index 1ad0c6b..d795334 100644 --- a/src/drivers/monitor/mod.rs +++ b/src/drivers/monitor/mod.rs @@ -37,10 +37,12 @@ pub async fn subscribe(brokers: &str) -> Result<()> { Event::ProjectCreated(namespace) => { create_namespace(k8s_cluster.clone(), namespace).await?; } - Event::AccountCreated(_) => todo!(), Event::PortCreated(port) => { port::create_port(k8s_cluster.clone(), port).await?; } + _ => { + info!("skip event") + } }; consumer.commit_message(&message, CommitMode::Async)?; }