From 6e5015d0569f637f518f9945b696c2aab96b00ae Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 25 Apr 2023 11:24:35 +0200 Subject: [PATCH 1/3] wip --- Cargo.lock | 229 +++++++++---- rust-toolchain.toml | 2 +- sqld/Cargo.toml | 1 + sqld/src/database/backbone/mod.rs | 334 +++++++++++++++++++ sqld/src/database/libsql.rs | 2 +- sqld/src/database/mod.rs | 3 + sqld/src/lib.rs | 123 +++++-- sqld/src/main.rs | 8 + sqld/src/replication/mod.rs | 2 +- sqld/src/replication/primary/frame_stream.rs | 8 +- sqld/src/replication/primary/logger.rs | 123 ++++--- sqld/src/replication/replica/hook.rs | 21 +- sqld/src/replication/replica/injector.rs | 51 ++- sqld/src/replication/replica/meta.rs | 12 +- sqld/src/replication/replica/mod.rs | 3 + sqld/src/replication/replica/replicator.rs | 24 +- sqld/src/rpc/mod.rs | 134 +++++--- sqld/src/rpc/replication_log.rs | 7 +- 18 files changed, 856 insertions(+), 231 deletions(-) create mode 100644 sqld/src/database/backbone/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 04965a0b..32439139 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.20" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04" dependencies = [ "memchr", ] @@ -63,9 +63,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e579a7752471abc2a8268df8b20005e3eadd975f585398f17efcfd8d4927371" +checksum = "6342bd4f5a1205d7f41e94a41a901f5647c938cdfa96036338e8533c9d6c2450" dependencies = [ "anstyle", "anstyle-parse", @@ -102,9 +102,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bcd8291a340dd8ac70e18878bc4501dd7b4ff970cfa21c207d36ece51ea88fd" +checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -533,9 +533,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.15" +version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b32c5ea3aabaf4deb5f5ced2d688ec0844c881c9e6c696a8b769a05fc691e62" +checksum = "b70caf9f1b0c045f7da350636435b775a9733adf2df56e8aa2a29210fbc335d4" dependencies = [ "async-trait", "axum-core", @@ -665,9 +665,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.1.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c70beb79cbb5ce9c4f8e20849978f34225931f665bb49efa6982875a4d5facb3" +checksum = "24a6904aef64d73cf10ab17ebace7befb918b82164785cb89907993be7f83813" [[package]] name = "blake3" @@ -759,9 +759,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.12.0" +version = "3.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +checksum = "9b1ce199063694f33ffb7dd4e0ee620741495c32833cde5aa08f02a0bf96f0c8" [[package]] name = "bytecount" @@ -904,9 +904,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.2.2" +version = "4.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b802d85aaf3a1cdb02b224ba472ebdea62014fccfcb269b95a4d76443b5ee5a" +checksum = "8a1f23fa97e1d1641371b51f35535cb26959b8e27ab50d167a8b996b5bada819" dependencies = [ "clap_builder", "clap_derive", @@ -915,9 +915,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.2.2" +version = "4.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14a1a858f532119338887a4b8e1af9c60de8249cd7bafd68036a489e261e37b6" +checksum = "0fdc5d93c358224b4d6867ef1356d740de2303e9892edc06c5340daeccd96bab" dependencies = [ "anstream", "anstyle", @@ -1034,9 +1034,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "280a9f2d8b3a38871a3c8a46fb80db65e5e5ed97da80c4d08bf27fb63e35e181" +checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58" dependencies = [ "libc", ] @@ -1773,9 +1773,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b91535aa35fea1523ad1b86cb6b53c28e0ae566ba4a460f4457e936cad7c6f" +checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" dependencies = [ "bytes 1.4.0", "fnv", @@ -2090,7 +2090,7 @@ checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes 1.0.10", - "rustix 0.37.11", + "rustix 0.37.15", "windows-sys 0.48.0", ] @@ -2181,9 +2181,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.141" +version = "0.2.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" +checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317" [[package]] name = "libgit2-sys" @@ -2215,9 +2215,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libmimalloc-sys" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a558e3d911bc3c7bfc8c78bc580b404d6e51c1cefbf656e176a94b49b0df40" +checksum = "f4ac0e912c8ef1b735e92369695618dc5b1819f5a7bf3f167301a3ba1cea515e" dependencies = [ "cc", "libc", @@ -2294,9 +2294,9 @@ checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" [[package]] name = "linux-raw-sys" -version = "0.3.1" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" +checksum = "2e8776872cdc2f073ccaab02e336fa321328c1e02646ebcb9d2108d0baab480d" [[package]] name = "lock_api" @@ -2374,7 +2374,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffc89ccdc6e10d6907450f753537ebc5c5d3460d2e4e62ea74bd571db62c0f9e" dependencies = [ - "rustix 0.37.11", + "rustix 0.37.15", ] [[package]] @@ -2416,9 +2416,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d88dad3f985ec267a3fcb7a1726f5cb1a7e8cad8b646e70a84f967210df23da" +checksum = "4e2894987a3459f3ffb755608bd82188f8ed00d0ae077f1edea29c068d639d98" dependencies = [ "libmimalloc-sys", ] @@ -2648,6 +2648,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "numtoa" version = "0.1.0" @@ -2709,9 +2730,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.50" +version = "0.10.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e30d8bc91859781f0a943411186324d580f2bbeb71b452fe91ae344806af3f1" +checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" dependencies = [ "bitflags 1.3.2", "cfg-if", @@ -2741,9 +2762,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.85" +version = "0.9.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d3d193fb1488ad46ffe3aaabc912cc931d02ee8518fe2959aea8ef52718b0c0" +checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" dependencies = [ "cc", "libc", @@ -3025,6 +3046,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3073,7 +3104,7 @@ dependencies = [ "rand", "rand_chacha", "rand_xorshift", - "regex-syntax", + "regex-syntax 0.6.29", "rusty-fork", "tempfile", "unarray", @@ -3260,6 +3291,34 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdkafka" +version = "0.29.0" +source = "git+https://github.com/fede1024/rust-rdkafka.git?rev=611aeea#611aeea5012787ad0f1a3ff74af32b64d41142f8" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.3.0+1.9.2" +source = "git+https://github.com/fede1024/rust-rdkafka.git?rev=611aeea#611aeea5012787ad0f1a3ff74af32b64d41142f8" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3312,13 +3371,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.3" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d" +checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.1", ] [[package]] @@ -3327,7 +3386,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" dependencies = [ - "regex-syntax", + "regex-syntax 0.6.29", ] [[package]] @@ -3336,6 +3395,12 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +[[package]] +name = "regex-syntax" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" + [[package]] name = "reqwest" version = "0.11.16" @@ -3420,7 +3485,7 @@ name = "rusqlite" version = "0.29.0" source = "git+https://github.com/psarna/rusqlite?rev=a6332e530f30dc2d47110#a6332e530f30dc2d471103eed96a650407a73c7a" dependencies = [ - "bitflags 2.1.0", + "bitflags 2.2.1", "fallible-iterator", "fallible-streaming-iterator", "hashlink", @@ -3465,15 +3530,15 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.11" +version = "0.37.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" +checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece" dependencies = [ "bitflags 1.3.2", "errno 0.3.1", "io-lifetimes 1.0.10", "libc", - "linux-raw-sys 0.3.1", + "linux-raw-sys 0.3.5", "windows-sys 0.48.0", ] @@ -3903,6 +3968,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "rusqlite", "serde", @@ -4052,9 +4118,9 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.12.6" +version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae9980cab1db3fceee2f6c6f643d5d8de2997c58ee8d25fb0cc8a9e9e7348e5" +checksum = "fd1ba337640d60c3e96bc6f0638a939b9c9a7f2c316a1598c279828b3d1dc8c5" [[package]] name = "tempfile" @@ -4065,7 +4131,7 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix 0.37.11", + "rustix 0.37.15", "windows-sys 0.45.0", ] @@ -4175,9 +4241,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" dependencies = [ "autocfg", "bytes 1.4.0", @@ -4189,7 +4255,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -4204,9 +4270,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", @@ -4236,9 +4302,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", @@ -4259,9 +4325,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes 1.4.0", "futures-core", @@ -4280,6 +4346,23 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_datetime" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" + +[[package]] +name = "toml_edit" +version = "0.19.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239410c8609e8125456927e6707163a3b1fdb40561e4b803bc041f466ccfdc13" +dependencies = [ + "indexmap", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.8.3" @@ -4383,11 +4466,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -4396,13 +4478,13 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.15", ] [[package]] @@ -4438,9 +4520,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers", "nu-ansi-term", @@ -4739,9 +4821,9 @@ checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" [[package]] name = "wasm-encoder" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eff853c4f09eec94d76af527eddad4e9de13b11d6286a1ef7134bc30135a2b7" +checksum = "d05d0b6fcd0aeb98adf16e7975331b3c17222aa815148f5b976370ce589d80ef" dependencies = [ "leb128", ] @@ -4945,9 +5027,9 @@ dependencies = [ [[package]] name = "wast" -version = "56.0.0" +version = "57.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b54185c051d7bbe23757d50fe575880a2426a2f06d2e9f6a10fd9a4a42920c0" +checksum = "6eb0f5ed17ac4421193c7477da05892c2edafd67f9639e3c11a82086416662dc" dependencies = [ "leb128", "memchr", @@ -4957,9 +5039,9 @@ dependencies = [ [[package]] name = "wat" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56681922808216ab86d96bb750f70d500b5a7800e41564290fd46bb773581299" +checksum = "ab9ab0d87337c3be2bb6fc5cd331c4ba9fd6bcb4ee85048a0dd59ed9ecf92e53" dependencies = [ "wast", ] @@ -5234,6 +5316,15 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "winnow" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8970b36c66498d8ff1d66685dc86b91b29db0c7739899012f63a63814b4b28" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.10.1" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 71066da0..187b4381 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] profile = "default" -channel = "1.67.0" +channel = "1.69.0" diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 049d1229..22788f30 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -59,6 +59,7 @@ tempfile = "3.3.0" memmap = "0.7.0" mimalloc = "0.1.36" sha256 = "1.1.3" +rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", rev = "611aeea" } [dev-dependencies] proptest = "1.0.0" diff --git a/sqld/src/database/backbone/mod.rs b/sqld/src/database/backbone/mod.rs new file mode 100644 index 00000000..70961161 --- /dev/null +++ b/sqld/src/database/backbone/mod.rs @@ -0,0 +1,334 @@ +#![allow(dead_code)] +use std::collections::HashMap; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, oneshot, watch}; +use tokio::task::JoinSet; +use tonic::transport::ClientTlsConfig; + +use crate::replication::frame::{Frame, FrameHeader}; +use crate::replication::primary::logger::WalPage; +use crate::replication::{FrameNo, ReplicationLogger, CRC_64_GO_ISO, WAL_PAGE_SIZE}; +use crate::stats::Stats; +use crate::Result; +use crate::{auth::Authenticated, error::Error, query::QueryResult, query_analysis::State}; + +use self::init::InitState; +use self::primary::PrimaryState; +use self::replica::ReplicaState; + +use super::{factory::DbFactory, Database, Program}; + +mod init; +mod primary; +mod replica; + +struct BackboneReplicationLogger { + buffer: RwLock>, + current_frame_no: AtomicU64, + current_checksum: AtomicU64, + sender: mpsc::Sender<(Vec, oneshot::Sender>)>, +} +impl BackboneReplicationLogger { + fn new( + current_frame_no: FrameNo, + current_checksum: u64, + sender: mpsc::Sender<(Vec, oneshot::Sender>)>, + ) -> Self { + Self { + buffer: RwLock::new(Vec::new()), + current_frame_no: current_frame_no.into(), + current_checksum: current_checksum.into(), + sender, + } + } +} + +impl ReplicationLogger for BackboneReplicationLogger { + fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<(u64, u64)> { + let mut buffer = self.buffer.write(); + let mut current_frame_no = + self.current_frame_no.load(Ordering::Relaxed) + buffer.len() as u64; + let mut current_checksum = self.current_checksum.load(Ordering::Relaxed); + for page in pages.iter() { + assert_eq!(page.data.len(), WAL_PAGE_SIZE as usize); + let mut digest = CRC_64_GO_ISO.digest_with_initial(current_checksum); + digest.update(&page.data); + let checksum = digest.finalize(); + let header = FrameHeader { + frame_no: current_frame_no, + checksum, + page_no: page.page_no, + size_after: page.size_after, + }; + + let frame = Frame::from_parts(&header, &page.data); + + buffer.push(frame); + + current_frame_no += 1; + current_checksum = checksum; + } + self.current_frame_no + .store(current_frame_no, Ordering::Relaxed); + + // don't care about those values + Ok((0, 0)) + } + + fn commit( + &self, + new_frame_count: u64, + new_current_checksum: u64, + ) -> anyhow::Result { + // don't care + assert_eq!((0, 0), (new_frame_count, new_current_checksum)); + + let data = std::mem::take(&mut *self.buffer.write()); + let frame_no = data.last().unwrap().header().frame_no; + let (sender, ret) = oneshot::channel(); + self.sender.blocking_send((data, sender))?; + ret.blocking_recv()??; + + Ok(frame_no) + } + + // no-op + fn maybe_compact(&self, _size_after: u32) {} +} + +pub enum Role<'a> { + Init(InitState<'a>), + Primary(PrimaryState<'a>), + Replica(ReplicaState<'a>), +} + +impl<'a> Role<'a> { + pub fn transition( + role: impl Into>, + meta: MetaMessage, + offset: i64, + ) -> anyhow::Result { + let backbone = role.into().backbone(); + backbone.term = meta.term; + if meta.primary_infos.id == backbone.config.node_id { + // we are the new primary + let primary = PrimaryState::new(backbone, offset as _)?; + Ok(Role::Primary(primary)) + } else { + Ok(Role::Replica(ReplicaState::new( + backbone, + meta.primary_infos, + ))) + } + } + + fn backbone(self) -> &'a mut BackboneDatabase { + let role = match self { + Role::Init(i) => i.backbone, + Role::Primary(p) => p.backbone, + Role::Replica(r) => r.backbone, + }; + role + } +} + +impl<'a> From> for Role<'a> { + fn from(state: InitState<'a>) -> Self { + Self::Init(state) + } +} + +impl<'a> From> for Role<'a> { + fn from(state: PrimaryState<'a>) -> Self { + Self::Primary(state) + } +} + +impl<'a> From> for Role<'a> { + fn from(state: ReplicaState<'a>) -> Self { + Self::Replica(state) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeInfo { + /// Id of the node + id: String, + /// address and port on which the node is listening for rpc call + addr: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MetaMessage { + primary_infos: NodeInfo, + term: u64, +} + +struct Connections { + factory: Box, + connections: HashMap>, + futs: JoinSet<()>, +} + +impl Connections { + fn new(factory: Box) -> Self { + Self { + factory, + connections: HashMap::new(), + futs: JoinSet::new(), + } + } + + async fn handle_op(&mut self, id: u64, op: Message) -> anyhow::Result<()> { + match op { + Message::Open => { + if self.connections.contains_key(&id) { + todo!("connection already exist"); + } + let db = self.factory.create().await.unwrap(); + self.connections.insert(id, db); + } + Message::Program { pgm, auth, ret } => match self.connections.get(&id) { + Some(conn) => { + let conn = conn.clone(); + self.futs.spawn(async move { + let res = conn.execute_program(pgm, auth).await; + let _ = ret.send(res); + }); + } + None => { + todo!("connection closed"); + } + }, + Message::Close => { + self.connections.remove(&id); + } + } + Ok(()) + } +} + +pub struct BackboneDatabaseConfig { + pub db_path: PathBuf, + pub kafka_bootstrap_servers: Vec, + pub extensions: Vec, + pub stats: Stats, + pub cluster_id: String, + pub node_id: String, + pub rpc_tls_config: Option, + pub rpc_server_addr: String, +} + +pub struct BackboneDatabase { + config: BackboneDatabaseConfig, + db_ops_receiver: mpsc::Receiver<(u64, Message)>, + pub last_frame_no: watch::Sender, + term: u64, +} + +impl BackboneDatabase { + pub fn new(config: BackboneDatabaseConfig) -> (Self, BackboneDbHandleFactory) { + let (sender, db_ops_receiver) = mpsc::channel(256); + let factory = BackboneDbHandleFactory::new(sender); + let (last_frame_no, _) = watch::channel(0); + ( + Self { + config, + db_ops_receiver, + last_frame_no, + term: 0, + }, + factory, + ) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + let mut state = Role::Init(InitState::new(&mut self)?); + loop { + dbg!(); + state = match state { + Role::Init(s) => s.run().await?, + Role::Primary(s) => match s.run().await? { + Some(s) => s, + None => return Ok(()), + }, + Role::Replica(s) => s.run().await?, + } + } + } +} + +pub struct BackboneDbHandleFactory { + sender: mpsc::Sender<(u64, Message)>, + next_id: AtomicU64, +} + +impl BackboneDbHandleFactory { + fn new(sender: mpsc::Sender<(u64, Message)>) -> Self { + Self { + sender, + next_id: 0.into(), + } + } +} + +enum Message { + Open, + Program { + pgm: Program, + auth: Authenticated, + ret: oneshot::Sender>, State)>>, + }, + Close, +} + +#[async_trait::async_trait] +impl DbFactory for BackboneDbHandleFactory { + async fn create(&self) -> std::result::Result, Error> { + let id = self.next_id.fetch_add(1, Ordering::SeqCst); + let sender = self.sender.clone(); + sender + .send((id, Message::Open)) + .await + .map_err(|_| Error::Internal("Failed to open database connection".into()))?; + Ok(Arc::new(BackboneDbHandle { id, sender })) + } +} + +pub struct BackboneDbHandle { + sender: mpsc::Sender<(u64, Message)>, + id: u64, +} + +impl Drop for BackboneDbHandle { + fn drop(&mut self) { + // drop in a separate task + let sender = self.sender.clone(); + let id = self.id; + tokio::spawn(async move { + let _ = sender.send((id, Message::Close)).await; + }); + } +} + +#[async_trait::async_trait] +impl Database for BackboneDbHandle { + async fn execute_program( + &self, + pgm: Program, + auth: Authenticated, + ) -> Result<(Vec>, State)> { + let (ret, rcv) = oneshot::channel(); + let _ = self + .sender + .send((self.id, Message::Program { pgm, auth, ret })) + .await; + rcv.await.unwrap() + } +} diff --git a/sqld/src/database/libsql.rs b/sqld/src/database/libsql.rs index 103a4a82..8de40b79 100644 --- a/sqld/src/database/libsql.rs +++ b/sqld/src/database/libsql.rs @@ -289,7 +289,7 @@ impl Connection { .map_err(Error::LibSqlInvalidQueryParams)?; let mut qresult = stmt.raw_query(); - while let Some(row) = qresult.next()? { + while let Some(row) = dbg!(qresult.next())? { let mut values = vec![]; for (i, _) in columns.iter().enumerate() { values.push(row.get::(i)?.into()); diff --git a/sqld/src/database/mod.rs b/sqld/src/database/mod.rs index f96fc0c5..a6fee56f 100644 --- a/sqld/src/database/mod.rs +++ b/sqld/src/database/mod.rs @@ -1,3 +1,4 @@ +use std::ffi::c_int; use std::sync::Arc; use crate::auth::Authenticated; @@ -5,12 +6,14 @@ use crate::query::{Params, Query, QueryResult}; use crate::query_analysis::{State, Statement}; use crate::Result; +pub mod backbone; pub mod dump; pub mod factory; pub mod libsql; pub mod write_proxy; const TXN_TIMEOUT_SECS: u64 = 5; +pub const SQLITE_MUST_ROLLBACK: c_int = 0xdead; #[derive(Debug, Clone)] pub struct Program { diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index 49828d91..4373e750 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -6,6 +6,7 @@ use std::sync::Mutex; use std::time::Duration; use anyhow::Context as AnyhowContext; +use database::backbone::{BackboneDatabase, BackboneDatabaseConfig}; use database::dump::loader::DumpLoader; use database::factory::DbFactory; use database::libsql::LibSqlDb; @@ -13,12 +14,14 @@ use database::write_proxy::WriteProxyDbFactory; use once_cell::sync::Lazy; #[cfg(feature = "mwal_backend")] use once_cell::sync::OnceCell; -use replication::{ReplicationLogger, ReplicationLoggerHook}; -use rpc::run_rpc_server; +use replication::primary::logger::FileReplicationLogger; +use replication::ReplicationLoggerHook; +use rpc::{LoggerServiceConfig, ProxyServiceConfig, RpcServerBuilder, TlsConfig}; use tokio::sync::{mpsc, Notify}; use tokio::task::JoinSet; -use tonic::transport::Channel; +use tonic::transport::{Channel, ClientTlsConfig}; use utils::services::idle_shutdown::IdleShutdownLayer; +use uuid::Uuid; use crate::auth::Auth; use crate::error::Error; @@ -90,6 +93,9 @@ pub struct Config { pub idle_shutdown_timeout: Option, pub load_from_dump: Option, pub max_log_size: u64, + + pub backbone_addrs: Vec, + pub cluster_id: String, } async fn run_service( @@ -189,8 +195,7 @@ async fn hard_reset( Ok(()) } -fn configure_rpc(config: &Config) -> anyhow::Result<(Channel, tonic::transport::Uri)> { - let mut endpoint = Channel::from_shared(config.writer_rpc_addr.clone().unwrap())?; +fn make_rpc_tls_config(config: &Config) -> anyhow::Result> { if config.writer_rpc_tls { let cert_pem = std::fs::read_to_string(config.writer_rpc_cert.clone().unwrap())?; let key_pem = std::fs::read_to_string(config.writer_rpc_key.clone().unwrap())?; @@ -203,11 +208,25 @@ fn configure_rpc(config: &Config) -> anyhow::Result<(Channel, tonic::transport:: .identity(identity) .ca_certificate(ca_cert) .domain_name("sqld"); + + Ok(Some(tls_config)) + } else { + Ok(None) + } +} + +pub fn configure_rpc( + remote_addr: String, + tls_config: Option, +) -> anyhow::Result<(Channel, tonic::transport::Uri)> { + let mut endpoint = Channel::from_shared(remote_addr.clone())?; + + let uri = tonic::transport::Uri::from_maybe_shared(remote_addr)?; + if let Some(tls_config) = tls_config { endpoint = endpoint.tls_config(tls_config)?; } let channel = endpoint.connect_lazy(); - let uri = tonic::transport::Uri::from_maybe_shared(config.writer_rpc_addr.clone().unwrap())?; Ok((channel, uri)) } @@ -218,7 +237,8 @@ async fn start_replica( idle_shutdown_layer: Option, stats: Stats, ) -> anyhow::Result<()> { - let (channel, uri) = configure_rpc(config)?; + let rpc_tls_config = make_rpc_tls_config(config)?; + let (channel, uri) = configure_rpc(config.writer_rpc_addr.clone().unwrap(), rpc_tls_config)?; let replicator = Replicator::new(config.db_path.clone(), channel.clone(), uri.clone()); let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe(); @@ -302,11 +322,10 @@ async fn start_primary( stats: Stats, ) -> anyhow::Result<()> { let is_fresh_db = check_fresh_db(&config.db_path); - let logger = Arc::new(ReplicationLogger::open( + let logger = Arc::new(FileReplicationLogger::open( &config.db_path, config.max_log_size, )?); - let logger_clone = logger.clone(); let path_clone = config.db_path.clone(); #[cfg(feature = "bottomless")] let enable_bottomless = config.enable_bottomless_replication; @@ -342,24 +361,75 @@ async fn start_primary( } }); - if let Some(ref addr) = config.rpc_server_addr { - join_set.spawn(run_rpc_server( - *addr, - config.rpc_server_tls, - config.rpc_server_cert.clone(), - config.rpc_server_key.clone(), - config.rpc_server_ca_cert.clone(), - db_factory.clone(), - logger_clone, - idle_shutdown_layer.clone(), - )); + let mut builder = + RpcServerBuilder::new(config.rpc_server_addr.context("missing rpc server addr")?); + if config.rpc_server_tls { + builder.with_tls(TlsConfig { + cert_path: config.rpc_server_cert.clone().unwrap(), + key_path: config.rpc_server_key.clone().unwrap(), + ca_cert_path: config.rpc_server_ca_cert.clone().unwrap(), + }); } + builder.with_proxy_service(ProxyServiceConfig { + factory: db_factory.clone(), + frame_notifier: logger.new_frame_notifier.subscribe(), + }); + builder.with_replication_logger_service(LoggerServiceConfig { logger }); + idle_shutdown_layer + .clone() + .map(|l| builder.with_idle_shutdown(l.clone())); + join_set.spawn(builder.serve()); run_service(db_factory, config, join_set, idle_shutdown_layer, stats).await?; Ok(()) } +async fn start_backbone( + config: &Config, + join_set: &mut JoinSet>, + idle_shutdown_layer: Option, + stats: Stats, +) -> anyhow::Result<()> { + let extensions = validate_extensions(config.extensions_path.clone())?; + + let backbone_config = BackboneDatabaseConfig { + db_path: config.db_path.clone(), + extensions, + stats: stats.clone(), + cluster_id: config.cluster_id.clone(), + node_id: Uuid::new_v4().to_string(), + rpc_tls_config: make_rpc_tls_config(config)?, + rpc_server_addr: format!("http://{}", config.rpc_server_addr.clone().unwrap()), + kafka_bootstrap_servers: config.backbone_addrs.clone(), + }; + let (backbone, factory) = BackboneDatabase::new(backbone_config); + let factory = Arc::new(factory); + + let mut builder = RpcServerBuilder::new(config.rpc_server_addr.clone().unwrap()); + if config.rpc_server_tls { + builder.with_tls(TlsConfig { + cert_path: config.rpc_server_cert.clone().unwrap(), + key_path: config.rpc_server_key.clone().unwrap(), + ca_cert_path: config.rpc_server_ca_cert.clone().unwrap(), + }); + } + builder.with_proxy_service(ProxyServiceConfig { + factory: factory.clone(), + frame_notifier: backbone.last_frame_no.subscribe(), + }); + idle_shutdown_layer + .clone() + .map(|l| builder.with_idle_shutdown(l.clone())); + + join_set.spawn(builder.serve()); + join_set.spawn(backbone.run()); + + run_service(factory, config, join_set, idle_shutdown_layer, stats).await?; + + Ok(()) +} + pub async fn run_server(config: Config) -> anyhow::Result<()> { tracing::trace!("Backend: {:?}", config.backend); @@ -395,9 +465,15 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { let stats = Stats::new(&config.db_path)?; - match config.writer_rpc_addr { - Some(_) => start_replica(&config, &mut join_set, idle_shutdown_layer, stats).await?, - None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?, + if !config.backbone_addrs.is_empty() { + start_backbone(&config, &mut join_set, idle_shutdown_layer, stats).await?; + } else { + match config.writer_rpc_addr { + Some(_) => { + start_replica(&config, &mut join_set, idle_shutdown_layer, stats).await? + } + None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?, + } } let reset = HARD_RESET.clone(); @@ -408,6 +484,7 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { break; }, _ = shutdown_notify.notified() => { + join_set.shutdown().await; return Ok(()) } Some(res) = join_set.join_next() => { diff --git a/sqld/src/main.rs b/sqld/src/main.rs index 88905e7a..b90a8839 100644 --- a/sqld/src/main.rs +++ b/sqld/src/main.rs @@ -133,6 +133,12 @@ struct Cli { #[clap(subcommand)] utils: Option, + + #[clap(long, env = "SQLD_CLUSTER_ID")] + cluster_id: Option, + + #[clap(long)] + backbone_addrs: Vec, } #[derive(clap::Subcommand, Debug)] @@ -231,6 +237,8 @@ fn config_from_args(args: Cli) -> Result { idle_shutdown_timeout: args.idle_shutdown_timeout_s.map(Duration::from_secs), load_from_dump: args.load_from_dump, max_log_size: args.max_log_size, + backbone_addrs: args.backbone_addrs, + cluster_id: args.cluster_id.unwrap(), }) } diff --git a/sqld/src/replication/mod.rs b/sqld/src/replication/mod.rs index 7a212d30..bfab49d4 100644 --- a/sqld/src/replication/mod.rs +++ b/sqld/src/replication/mod.rs @@ -8,7 +8,7 @@ pub use primary::logger::{LogReadError, ReplicationLogger, ReplicationLoggerHook pub const WAL_PAGE_SIZE: i32 = 4096; pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0"); -const CRC_64_GO_ISO: Crc = Crc::::new(&crc::CRC_64_GO_ISO); +pub const CRC_64_GO_ISO: Crc = Crc::::new(&crc::CRC_64_GO_ISO); /// The frame uniquely identifying, monotonically increasing number pub type FrameNo = u64; diff --git a/sqld/src/replication/primary/frame_stream.rs b/sqld/src/replication/primary/frame_stream.rs index d9b69e7f..eac957cc 100644 --- a/sqld/src/replication/primary/frame_stream.rs +++ b/sqld/src/replication/primary/frame_stream.rs @@ -6,19 +6,21 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::Stream; -use crate::replication::{FrameNo, LogReadError, ReplicationLogger}; +use crate::replication::{FrameNo, LogReadError}; + +use super::logger::FileReplicationLogger; /// Streams frames from the replication log starting at `current_frame_no`. /// Only stops if the current frame is not in the log anymore. pub struct FrameStream { current_frame_no: FrameNo, max_available_frame_no: FrameNo, - logger: Arc, + logger: Arc, state: FrameStreamState, } impl FrameStream { - pub fn new(logger: Arc, current_frameno: FrameNo) -> Self { + pub fn new(logger: Arc, current_frameno: FrameNo) -> Self { let max_available_frame_no = *logger.new_frame_notifier.subscribe().borrow(); Self { current_frame_no: current_frameno, diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 5b4f2a99..16726a70 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -14,6 +14,7 @@ use rusqlite::ffi::SQLITE_ERROR; use tokio::sync::watch; use uuid::Uuid; +use crate::database::SQLITE_MUST_ROLLBACK; use crate::libsql::ffi::{ types::{XWalFrameFn, XWalUndoFn}, PgHdr, Wal, @@ -28,7 +29,7 @@ use crate::replication::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE}; #[derive(Clone)] pub struct ReplicationLoggerHook { buffer: Vec, - logger: Arc, + logger: Arc, } /// This implementation of WalHook intercepts calls to `on_frame`, and writes them to a @@ -81,19 +82,13 @@ unsafe impl WalHook for ReplicationLoggerHook { if is_commit != 0 && rc == 0 { if let Some((count, checksum)) = commit_info { - self.commit(count, checksum); + if let Err(e) = self.commit(count, checksum) { + tracing::error!("failed to commit: {e}"); + return SQLITE_MUST_ROLLBACK; + } } - self.logger - .log_file - .write() - .maybe_compact( - self.logger.compactor.clone(), - ntruncate, - &self.logger.db_path, - self.logger.current_checksum.load(Ordering::Relaxed), - ) - .unwrap(); + self.logger.maybe_compact(ntruncate); } rc @@ -112,15 +107,15 @@ unsafe impl WalHook for ReplicationLoggerHook { } #[derive(Clone)] -struct WalPage { - page_no: u32, +pub struct WalPage { + pub page_no: u32, /// 0 for non-commit frames - size_after: u32, - data: Bytes, + pub size_after: u32, + pub data: Bytes, } impl ReplicationLoggerHook { - pub fn new(logger: Arc) -> Self { + pub fn new(logger: Arc) -> Self { Self { buffer: Default::default(), logger, @@ -149,9 +144,9 @@ impl ReplicationLoggerHook { } } - fn commit(&self, new_count: u64, new_checksum: u64) { - let new_frame_no = self.logger.commit(new_count, new_checksum); - let _ = self.logger.new_frame_notifier.send(new_frame_no); + fn commit(&self, new_count: u64, new_checksum: u64) -> anyhow::Result<()> { + self.logger.commit(new_count, new_checksum)?; + Ok(()) } fn rollback(&mut self) { @@ -447,7 +442,16 @@ impl Generation { } } -pub struct ReplicationLogger { +pub trait ReplicationLogger: 'static + Send + Sync { + /// Write pages to the log, without updating the file header. + /// Returns the new frame count and checksum to commit + fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<(u64, u64)>; + /// commit the current transaction and returns the new top frame number + fn commit(&self, new_frame_count: u64, new_current_checksum: u64) -> anyhow::Result; + fn maybe_compact(&self, size_after: u32); +} + +pub struct FileReplicationLogger { current_checksum: AtomicU64, pub generation: Generation, pub log_file: RwLock, @@ -458,7 +462,7 @@ pub struct ReplicationLogger { pub new_frame_notifier: watch::Sender, } -impl ReplicationLogger { +impl FileReplicationLogger { pub fn open(db_path: &Path, max_log_size: u64) -> anyhow::Result { let log_path = db_path.join("wallog"); let file = OpenOptions::new() @@ -494,8 +498,32 @@ impl ReplicationLogger { Ok(Uuid::from_u128((self.log_file.read()).header().db_id)) } - /// Write pages to the log, without updating the file header. - /// Returns the new frame count and checksum to commit + fn compute_checksum(wal_header: &LogFileHeader, log_file: &LogFile) -> anyhow::Result { + tracing::debug!("computing WAL log running checksum..."); + let mut iter = log_file.frames_iter()?; + iter.try_fold(wal_header.start_checksum, |sum, frame| { + let frame = frame?; + let mut digest = CRC_64_GO_ISO.digest_with_initial(sum); + digest.update(frame.page()); + let cs = digest.finalize(); + ensure!( + cs == frame.header().checksum, + "invalid WAL file: invalid checksum" + ); + Ok(cs) + }) + } + + pub fn get_snapshot_file(&self, from: FrameNo) -> anyhow::Result> { + find_snapshot_file(&self.db_path, from) + } + + pub fn get_frame(&self, frame_no: FrameNo) -> Result { + self.log_file.read().frame_bytes(frame_no) + } +} + +impl ReplicationLogger for FileReplicationLogger { fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<(u64, u64)> { let mut log_file = self.log_file.write(); let log_header = *log_file.header(); @@ -528,39 +556,28 @@ impl ReplicationLogger { )) } - fn compute_checksum(wal_header: &LogFileHeader, log_file: &LogFile) -> anyhow::Result { - tracing::debug!("computing WAL log running checksum..."); - let mut iter = log_file.frames_iter()?; - iter.try_fold(wal_header.start_checksum, |sum, frame| { - let frame = frame?; - let mut digest = CRC_64_GO_ISO.digest_with_initial(sum); - digest.update(frame.page()); - let cs = digest.finalize(); - ensure!( - cs == frame.header().checksum, - "invalid WAL file: invalid checksum" - ); - Ok(cs) - }) - } - - /// commit the current transaction and returns the new top frame number - fn commit(&self, new_frame_count: u64, new_current_checksum: u64) -> FrameNo { + fn commit(&self, new_frame_count: u64, new_current_checksum: u64) -> anyhow::Result { let mut log_file = self.log_file.write(); let mut header = *log_file.header(); header.frame_count = new_frame_count; - log_file.write_header(&header).expect("dailed to commit"); + log_file.write_header(&header)?; self.current_checksum .store(new_current_checksum, Ordering::Relaxed); - log_file.header().last_frame_no() - } - - pub fn get_snapshot_file(&self, from: FrameNo) -> anyhow::Result> { - find_snapshot_file(&self.db_path, from) + let new_frame_no = log_file.header().last_frame_no(); + let _ = self.new_frame_notifier.send(new_frame_no); + Ok(new_frame_no) } - pub fn get_frame(&self, frame_no: FrameNo) -> Result { - self.log_file.read().frame_bytes(frame_no) + fn maybe_compact(&self, size_after: u32) { + let mut log_file = self.log_file.write(); + log_file + .maybe_compact( + self.compactor.clone(), + size_after, + &self.db_path, + self.current_checksum.load(Ordering::Relaxed), + ) + .unwrap(); } } @@ -571,7 +588,7 @@ mod test { #[test] fn write_and_read_from_frame_log() { let dir = tempfile::tempdir().unwrap(); - let logger = ReplicationLogger::open(dir.path(), 0).unwrap(); + let logger = FileReplicationLogger::open(dir.path(), 0).unwrap(); let frames = (0..10) .map(|i| WalPage { @@ -599,7 +616,7 @@ mod test { #[test] fn index_out_of_bounds() { let dir = tempfile::tempdir().unwrap(); - let logger = ReplicationLogger::open(dir.path(), 0).unwrap(); + let logger = FileReplicationLogger::open(dir.path(), 0).unwrap(); let log_file = logger.log_file.write(); assert!(matches!(log_file.frame_bytes(1), Err(LogReadError::Ahead))); } @@ -608,7 +625,7 @@ mod test { #[should_panic] fn incorrect_frame_size() { let dir = tempfile::tempdir().unwrap(); - let logger = ReplicationLogger::open(dir.path(), 0).unwrap(); + let logger = FileReplicationLogger::open(dir.path(), 0).unwrap(); let entry = WalPage { page_no: 0, size_after: 0, diff --git a/sqld/src/replication/replica/hook.rs b/sqld/src/replication/replica/hook.rs index 9109369d..c1fd869c 100644 --- a/sqld/src/replication/replica/hook.rs +++ b/sqld/src/replication/replica/hook.rs @@ -9,7 +9,7 @@ use sqld_libsql_bindings::{ffi::types::XWalFrameFn, wal_hook::WalHook}; use crate::replication::frame::{Frame, FrameBorrowed}; use crate::replication::{FrameNo, WAL_PAGE_SIZE}; -use super::meta::WalIndexMeta; +use super::meta::ReplicationMeta; use super::snapshot::TempSnapshot; #[derive(Debug)] @@ -25,6 +25,13 @@ impl Frames { Frames::Snapshot(snap) => make_page_header(snap.iter()), } } + + pub fn is_empty(&self) -> bool { + match self { + Frames::Vec(f) => f.is_empty(), + Frames::Snapshot(_) => false, + } + } } /// The injector hook hijacks a call to xframes, and replace the content of the call with it's own @@ -38,7 +45,7 @@ pub struct InjectorHook { } impl InjectorHook { - pub fn new(meta_file: File, meta: WalIndexMeta) -> Self { + pub fn new(meta_file: File, meta: ReplicationMeta) -> Self { Self { inner: Rc::new(RefCell::new(InjectorHookInner { current_frames: None, @@ -75,7 +82,7 @@ pub struct InjectorHookInner { /// On success, returns the last applied frame_no result: Option>, meta_file: File, - meta: WalIndexMeta, + meta: ReplicationMeta, } impl InjectorHookInner { @@ -136,20 +143,28 @@ unsafe impl WalHook for InjectorHook { orig: XWalFrameFn, ) -> c_int { self.with_inner_mut(|this| { + dbg!(); let Some(ref frames) = this.current_frames.take() else { + dbg!(); return SQLITE_ERROR; }; + dbg!(); let (headers, last_frame_no, size_after) = frames.to_headers(); + dbg!(); + dbg!(); // SAFETY: frame headers are valid for the duration of the call of apply_pages let result = unsafe { this.inject_pages(headers, last_frame_no, size_after, sync_flags, orig, wal) }; + dbg!(); free_page_header(headers); + dbg!(); let result = result.map(|_| last_frame_no); + dbg!(&result); this.result.replace(result); SQLITE_ERROR diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs index ea27da0a..893eb8d0 100644 --- a/sqld/src/replication/replica/injector.rs +++ b/sqld/src/replication/replica/injector.rs @@ -6,11 +6,10 @@ use sqld_libsql_bindings::open_with_regular_wal; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use crate::{replication::FrameNo, rpc::replication_log::rpc::HelloResponse, HARD_RESET}; +use crate::replication::FrameNo; -use super::error::ReplicationError; use super::hook::{Frames, InjectorHook}; -use super::meta::WalIndexMeta; +use super::meta::ReplicationMeta; #[derive(Debug)] struct FrameApplyOp { @@ -24,11 +23,16 @@ pub struct FrameInjectorHandle { } impl FrameInjectorHandle { - pub async fn new(db_path: PathBuf, hello: HelloResponse) -> anyhow::Result<(Self, FrameNo)> { + pub async fn new( + db_path: PathBuf, + merger: impl FnOnce(Option) -> anyhow::Result + Send + 'static, + ) -> anyhow::Result<(Self, FrameNo)> { let (sender, mut receiver) = mpsc::channel(16); let (ret, init_ok) = oneshot::channel(); let handle = tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - let mut injector = match FrameInjector::new_from_hello(&db_path, hello) { + let maybe_injector = FrameInjector::new_with_merger(&db_path, merger); + + let mut injector = match maybe_injector { Ok((hook, last_applied_frame_no)) => { ret.send(Ok(last_applied_frame_no)).unwrap(); hook @@ -45,7 +49,6 @@ impl FrameInjectorHandle { anyhow::bail!("frame application result must not be ignored."); } } - Ok(()) }); @@ -68,7 +71,8 @@ impl FrameInjectorHandle { Ok(()) } - pub async fn apply_frames(&mut self, frames: Frames) -> anyhow::Result { + pub async fn inject_frames(&mut self, frames: Frames) -> anyhow::Result { + assert!(!frames.is_empty()); let (ret, rcv) = oneshot::channel(); self.sender.send(FrameApplyOp { frames, ret }).await?; rcv.await? @@ -82,27 +86,17 @@ pub struct FrameInjector { impl FrameInjector { /// returns the replication hook and the currently applied frame_no - pub fn new_from_hello(db_path: &Path, hello: HelloResponse) -> anyhow::Result<(Self, FrameNo)> { - let (meta, file) = WalIndexMeta::read_from_path(db_path)?; - let meta = match meta { - Some(meta) => match meta.merge_from_hello(hello) { - Ok(meta) => meta, - Err(e @ ReplicationError::Lagging) => { - tracing::error!("Replica ahead of primary: hard-reseting replica"); - HARD_RESET.notify_waiters(); - - anyhow::bail!(e); - } - Err(e) => anyhow::bail!(e), - }, - None => WalIndexMeta::new_from_hello(hello)?, - }; + pub fn new_with_merger( + db_path: &Path, + merger: impl FnOnce(Option) -> anyhow::Result, + ) -> anyhow::Result<(Self, FrameNo)> { + let (meta, file) = ReplicationMeta::read_from_path(db_path)?; + let meta = merger(meta)?; Ok((Self::init(db_path, file, meta)?, meta.current_frame_no())) } - fn init(db_path: &Path, meta_file: File, meta: WalIndexMeta) -> anyhow::Result { - let hook = InjectorHook::new(meta_file, meta); + pub fn new(db_path: &Path, hook: InjectorHook) -> anyhow::Result { let conn = open_with_regular_wal( db_path, OpenFlags::SQLITE_OPEN_READ_WRITE @@ -116,14 +110,19 @@ impl FrameInjector { Ok(Self { conn, hook }) } + pub fn init(db_path: &Path, meta_file: File, meta: ReplicationMeta) -> anyhow::Result { + let hook = InjectorHook::new(meta_file, meta); + Self::new(db_path, hook) + } + /// sets the injector's frames to the provided frames, trigger a dummy write, and collect the /// injection result. fn inject_frames(&mut self, frames: Frames) -> anyhow::Result { self.hook.set_frames(frames); - let _ = self + let _ = dbg!(self .conn - .execute("create table if not exists __dummy__ (dummy)", ()); + .execute("create table if not exists __dummy__ (dummy)", ())); self.hook.take_result() } diff --git a/sqld/src/replication/replica/meta.rs b/sqld/src/replication/replica/meta.rs index 819e6409..e5e9a796 100644 --- a/sqld/src/replication/replica/meta.rs +++ b/sqld/src/replication/replica/meta.rs @@ -15,7 +15,7 @@ use super::error::ReplicationError; #[repr(C)] #[derive(Debug, Pod, Zeroable, Clone, Copy)] -pub struct WalIndexMeta { +pub struct ReplicationMeta { /// This is the anticipated next frame_no to request pub pre_commit_frame_no: FrameNo, /// After we have written the frames back to the wal, we set this value to the same value as @@ -25,12 +25,12 @@ pub struct WalIndexMeta { /// Generation Uuid /// This number is generated on each primary restart. This let's us know that the primary, and /// we need to make sure that we are not ahead of the primary. - generation_id: u128, + pub generation_id: u128, /// Uuid of the database this instance is a replica of - database_id: u128, + pub database_id: u128, } -impl WalIndexMeta { +impl ReplicationMeta { pub fn read_from_path(db_path: &Path) -> anyhow::Result<(Option, File)> { let path = db_path.join("client_wal_index"); let file = OpenOptions::new() @@ -43,7 +43,7 @@ impl WalIndexMeta { } fn read(file: &File) -> anyhow::Result> { - let mut buf = [0; size_of::()]; + let mut buf = [0; size_of::()]; let meta = match file.read_exact_at(&mut buf, 0) { Ok(()) => { file.read_exact_at(&mut buf, 0)?; @@ -86,7 +86,7 @@ impl WalIndexMeta { } } - pub fn new_from_hello(hello: HelloResponse) -> anyhow::Result { + pub fn new_from_hello(hello: HelloResponse) -> anyhow::Result { let database_id = Uuid::from_str(&hello.database_id) .context("invalid database id from primary")? .as_u128(); diff --git a/sqld/src/replication/replica/mod.rs b/sqld/src/replication/replica/mod.rs index 02243866..0594e190 100644 --- a/sqld/src/replication/replica/mod.rs +++ b/sqld/src/replication/replica/mod.rs @@ -5,4 +5,7 @@ mod meta; mod replicator; mod snapshot; +pub use hook::Frames; +pub use injector::{FrameInjector, FrameInjectorHandle}; +pub use meta::ReplicationMeta; pub use replicator::Replicator; diff --git a/sqld/src/replication/replica/replicator.rs b/sqld/src/replication/replica/replicator.rs index e535ab65..023d7c67 100644 --- a/sqld/src/replication/replica/replicator.rs +++ b/sqld/src/replication/replica/replicator.rs @@ -7,12 +7,15 @@ use tokio::sync::watch; use tonic::transport::Channel; use crate::replication::frame::Frame; +use crate::replication::replica::error::ReplicationError; use crate::replication::replica::snapshot::TempSnapshot; +use crate::replication::replica::ReplicationMeta; use crate::replication::FrameNo; use crate::rpc::replication_log::rpc::{ replication_log_client::ReplicationLogClient, HelloRequest, LogOffset, }; use crate::rpc::replication_log::NEED_SNAPSHOT_ERROR_MSG; +use crate::HARD_RESET; use super::hook::Frames; use super::injector::FrameInjectorHandle; @@ -75,7 +78,22 @@ impl Replicator { applicator.shutdown().await?; } let (injector, last_applied_frame_no) = - FrameInjectorHandle::new(self.db_path.clone(), hello).await?; + FrameInjectorHandle::new(self.db_path.clone(), |meta| match meta { + Some(meta) => match meta.merge_from_hello(hello) { + Ok(meta) => Ok(meta), + Err(e @ ReplicationError::Lagging) => { + tracing::error!( + "Replica ahead of primary: hard-reseting replica" + ); + HARD_RESET.notify_waiters(); + + anyhow::bail!(e); + } + Err(e) => anyhow::bail!(e), + }, + None => ReplicationMeta::new_from_hello(hello), + }) + .await?; self.update_current_frame_no(last_applied_frame_no); self.injector.replace(injector); return Ok(()); @@ -144,7 +162,7 @@ impl Replicator { .injector .as_mut() .unwrap() - .apply_frames(Frames::Snapshot(snap)) + .inject_frames(Frames::Snapshot(snap)) .await?; Ok(()) @@ -155,7 +173,7 @@ impl Replicator { .injector .as_mut() .expect("invalid state") - .apply_frames(Frames::Vec(frames)) + .inject_frames(Frames::Vec(frames)) .await?; self.update_current_frame_no(new_frame_no); diff --git a/sqld/src/rpc/mod.rs b/sqld/src/rpc/mod.rs index e5dd13d5..c2908507 100644 --- a/sqld/src/rpc/mod.rs +++ b/sqld/src/rpc/mod.rs @@ -2,10 +2,13 @@ use anyhow::Context; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::watch; +use tonic::transport::Server; use tower::util::option_layer; use crate::database::factory::DbFactory; -use crate::replication::ReplicationLogger; +use crate::replication::primary::logger::FileReplicationLogger; +use crate::replication::FrameNo; use crate::rpc::proxy::rpc::proxy_server::ProxyServer; use crate::rpc::proxy::ProxyService; use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLogServer; @@ -15,44 +18,97 @@ use crate::utils::services::idle_shutdown::IdleShutdownLayer; pub mod proxy; pub mod replication_log; -#[allow(clippy::too_many_arguments)] -pub async fn run_rpc_server( +pub struct TlsConfig { + pub cert_path: PathBuf, + pub key_path: PathBuf, + pub ca_cert_path: PathBuf, +} + +pub struct ProxyServiceConfig { + pub factory: Arc, + pub frame_notifier: watch::Receiver, +} + +pub struct LoggerServiceConfig { + pub logger: Arc, +} + +pub struct RpcServerBuilder { addr: SocketAddr, - tls: bool, - cert_path: Option, - key_path: Option, - ca_cert_path: Option, - factory: Arc, - logger: Arc, - idle_shutdown_layer: Option, -) -> anyhow::Result<()> { - let proxy_service = ProxyService::new(factory, logger.new_frame_notifier.subscribe()); - let logger_service = ReplicationLogService::new(logger); - - tracing::info!("serving write proxy server at {addr}"); - - let mut builder = tonic::transport::Server::builder(); - if tls { - let cert_pem = std::fs::read_to_string(cert_path.unwrap())?; - let key_pem = std::fs::read_to_string(key_path.unwrap())?; - let identity = tonic::transport::Identity::from_pem(cert_pem, key_pem); - - let ca_cert_pem = std::fs::read_to_string(ca_cert_path.unwrap())?; - let ca_cert = tonic::transport::Certificate::from_pem(ca_cert_pem); - - let tls_config = tonic::transport::ServerTlsConfig::new() - .identity(identity) - .client_ca_root(ca_cert); - builder = builder - .tls_config(tls_config) - .context("Failed to read the TSL config of RPC server")?; + tls_config: Option, + proxy_config: Option, + logger_config: Option, + idle_shutdown: Option, +} + +impl RpcServerBuilder { + pub fn new(addr: SocketAddr) -> Self { + Self { + addr, + tls_config: None, + proxy_config: None, + logger_config: None, + idle_shutdown: None, + } + } + + pub fn with_tls(&mut self, tls_config: TlsConfig) -> &mut Self { + self.tls_config.replace(tls_config); + self + } + + pub fn with_replication_logger_service(&mut self, config: LoggerServiceConfig) -> &mut Self { + self.logger_config.replace(config); + self + } + + pub fn with_proxy_service(&mut self, config: ProxyServiceConfig) -> &mut Self { + self.proxy_config.replace(config); + self + } + + pub fn with_idle_shutdown(&mut self, layer: IdleShutdownLayer) -> &mut Self { + self.idle_shutdown.replace(layer); + self + } + + fn apply_tls(&self, mut builder: Server) -> anyhow::Result { + if let Some(tls_config) = &self.tls_config { + let cert_pem = std::fs::read_to_string(&tls_config.cert_path)?; + let key_pem = std::fs::read_to_string(&tls_config.key_path)?; + let identity = tonic::transport::Identity::from_pem(cert_pem, key_pem); + + let ca_cert_pem = std::fs::read_to_string(&tls_config.ca_cert_path)?; + let ca_cert = tonic::transport::Certificate::from_pem(ca_cert_pem); + + let tls_config = tonic::transport::ServerTlsConfig::new() + .identity(identity) + .client_ca_root(ca_cert); + builder = builder + .tls_config(tls_config) + .context("Failed to read the TSL config of RPC server")?; + } + + Ok(builder) + } + + pub async fn serve(self) -> anyhow::Result<()> { + tracing::info!("serving write proxy server at {}", self.addr); + let mut builder = tonic::transport::Server::builder(); + builder = self.apply_tls(builder)?; + let proxy_service = self + .proxy_config + .map(|c| ProxyServer::new(ProxyService::new(c.factory, c.frame_notifier))); + let logger_service = self + .logger_config + .map(|c| ReplicationLogServer::new(ReplicationLogService::new(c.logger))); + builder + .layer(&option_layer(self.idle_shutdown)) + .add_optional_service(proxy_service) + .add_optional_service(logger_service) + .serve(self.addr) + .await?; + + Ok(()) } - builder - .layer(&option_layer(idle_shutdown_layer)) - .add_service(ProxyServer::new(proxy_service)) - .add_service(ReplicationLogServer::new(logger_service)) - .serve(addr) - .await?; - - Ok(()) } diff --git a/sqld/src/rpc/replication_log.rs b/sqld/src/rpc/replication_log.rs index f9aacfe2..21918a73 100644 --- a/sqld/src/rpc/replication_log.rs +++ b/sqld/src/rpc/replication_log.rs @@ -15,13 +15,14 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use crate::replication::primary::frame_stream::FrameStream; -use crate::replication::{LogReadError, ReplicationLogger}; +use crate::replication::primary::logger::FileReplicationLogger; +use crate::replication::LogReadError; use self::rpc::replication_log_server::ReplicationLog; use self::rpc::{Frame, HelloRequest, HelloResponse, LogOffset}; pub struct ReplicationLogService { - logger: Arc, + logger: Arc, replicas_with_hello: RwLock>, } @@ -29,7 +30,7 @@ pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO"; pub const NEED_SNAPSHOT_ERROR_MSG: &str = "NEED_SNAPSHOT"; impl ReplicationLogService { - pub fn new(logger: Arc) -> Self { + pub fn new(logger: Arc) -> Self { Self { logger, replicas_with_hello: RwLock::new(HashSet::::new()), From 70241d21a7a334b4781650a6620affd091d89e1c Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 2 May 2023 20:10:13 +0200 Subject: [PATCH 2/3] wip2 --- sqld/src/database/backbone/init.rs | 61 +++++++ sqld/src/database/backbone/mod.rs | 15 +- sqld/src/database/backbone/primary.rs | 220 ++++++++++++++++++++++++++ sqld/src/database/backbone/replica.rs | 142 +++++++++++++++++ sqld/src/error.rs | 2 + 5 files changed, 433 insertions(+), 7 deletions(-) create mode 100644 sqld/src/database/backbone/init.rs create mode 100644 sqld/src/database/backbone/primary.rs create mode 100644 sqld/src/database/backbone/replica.rs diff --git a/sqld/src/database/backbone/init.rs b/sqld/src/database/backbone/init.rs new file mode 100644 index 00000000..110b24f8 --- /dev/null +++ b/sqld/src/database/backbone/init.rs @@ -0,0 +1,61 @@ +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::{ClientConfig, Message}; + +use crate::database::backbone::MetaMessage; + +use super::{BackboneDatabase, Role}; + +pub struct InitState<'a> { + pub backbone: &'a mut BackboneDatabase, + consumer: StreamConsumer, +} + +impl<'a> InitState<'a> { + pub fn new(backbone: &'a mut BackboneDatabase) -> anyhow::Result { + let mut config = ClientConfig::new(); + config + .set("group.id", &backbone.config.node_id) + .set( + "bootstrap.servers", + backbone + .config + .kafka_bootstrap_servers + .first() + .unwrap() + .to_string(), + ) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false"); + + let consumer: StreamConsumer = + config.clone().set("auto.offset.reset", "latest").create()?; + consumer.subscribe(&[&backbone.config.cluster_id])?; + + Ok(Self { consumer, backbone }) + } + + pub async fn run(self) -> anyhow::Result> { + tracing::info!("entering idle state"); + dbg!(&self.backbone.config.node_id); + dbg!(); + loop { + let msg = self.consumer.recv().await?; + if msg.key() == Some(b"meta") { + match msg.payload() { + Some(payload) => { + let meta: MetaMessage = serde_json::from_slice(payload)?; + if self.backbone.term >= meta.term { + continue; + } + let offset = msg.offset(); + drop(msg); // holding a ref to the message while dropping the consumer causes a + // deadlock in the Drop implementation of StreamConsumer + return Role::transition(self, &meta, offset); + } + None => anyhow::bail!("message with empty payload"), + } + } + } + } +} diff --git a/sqld/src/database/backbone/mod.rs b/sqld/src/database/backbone/mod.rs index 70961161..4bdc788f 100644 --- a/sqld/src/database/backbone/mod.rs +++ b/sqld/src/database/backbone/mod.rs @@ -109,12 +109,13 @@ pub enum Role<'a> { } impl<'a> Role<'a> { - pub fn transition( + pub fn transition<'b>( role: impl Into>, - meta: MetaMessage, + meta: &'b MetaMessage, offset: i64, - ) -> anyhow::Result { + ) -> anyhow::Result> { let backbone = role.into().backbone(); + assert!(backbone.term < meta.term); backbone.term = meta.term; if meta.primary_infos.id == backbone.config.node_id { // we are the new primary @@ -123,7 +124,7 @@ impl<'a> Role<'a> { } else { Ok(Role::Replica(ReplicaState::new( backbone, - meta.primary_infos, + meta.primary_infos.clone(), ))) } } @@ -156,7 +157,7 @@ impl<'a> From> for Role<'a> { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct NodeInfo { /// Id of the node id: String, @@ -164,7 +165,7 @@ pub struct NodeInfo { addr: String, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct MetaMessage { primary_infos: NodeInfo, term: u64, @@ -203,7 +204,7 @@ impl Connections { }); } None => { - todo!("connection closed"); + let _ = ret.send(Err(Error::ConnectionReset)); } }, Message::Close => { diff --git a/sqld/src/database/backbone/primary.rs b/sqld/src/database/backbone/primary.rs new file mode 100644 index 00000000..d93d7e8b --- /dev/null +++ b/sqld/src/database/backbone/primary.rs @@ -0,0 +1,220 @@ +use std::future::ready; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::bail; +use futures::StreamExt; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; +use rdkafka::util::Timeout; +use rdkafka::{ClientConfig, Message}; +use tokio::sync::{mpsc, oneshot, watch}; + +use crate::database::backbone::MetaMessage; +use crate::database::libsql::LibSqlDb; +use crate::replication::{frame::Frame, ReplicationLoggerHook}; + +use super::{BackboneDatabase, BackboneReplicationLogger, Connections, Role}; + +pub struct PrimaryState<'a> { + pub backbone: &'a mut BackboneDatabase, + /// Database connections + connections: Connections, + frame_receiver: mpsc::Receiver<(Vec, oneshot::Sender>)>, + current_kafka_offset: u64, +} + +impl<'a> PrimaryState<'a> { + pub fn new( + backbone: &'a mut BackboneDatabase, + current_kafka_offset: u64, + ) -> anyhow::Result { + let (sender, frame_receiver) = mpsc::channel(50); + let logger = BackboneReplicationLogger::new(0, 0, sender); + let hook = ReplicationLoggerHook::new(Arc::new(logger)); + let factory = enclose::enclose!( + (backbone.config.extensions.clone() => ext, + backbone.config.db_path.clone() => db_path, + backbone.config.stats => stats) + move || ready(LibSqlDb::new( + db_path.clone(), + ext.clone(), + hook.clone(), + false, + stats.clone(), + )) + ); + + Ok(Self { + backbone, + connections: Connections::new(Box::new(factory)), + frame_receiver, + current_kafka_offset, + }) + } + + async fn handle_frames( + &mut self, + producer: &mut FutureProducer, + frames: Vec, + mut recv: watch::Receiver<(Option, i64)>, + ) -> anyhow::Result<()> { + for frame in frames { + let key = format!("frame:{}", self.backbone.term); + let record = FutureRecord::to(&self.backbone.config.cluster_id) + .key(&key) + .payload(frame.as_bytes()); + let (_, offset) = producer.send(record, Duration::from_secs(0)).await.unwrap(); + // Since there is only a single primary, no one else should be writing to the queue + // while we are. If we notice that the offset of the message we just wrote increased + // by more than one, it means that someone else wrote to the queue, and we should + // rollback the transaction. + // + // there may be in flight messages, that doesn't mean we are not the leader + // anymore. In order to solve that: + // - if the condition described above is satisfied, proceed with commit + // - else, spawn the consumer on a different task, and wait for this task to catch up + // with the replication offset. Once this is done compare the terms. If the term has + // changed, then, stepdown and rollback. Otherwise, we are still leader, proceed to + // commit and update offset. + if dbg!(offset) as u64 != dbg!(self.current_kafka_offset + 1) { + let r = recv.wait_for(|(_, offset)| offset >= offset).await?; + if let (Some(ref meta), _) = *r { + if meta.term > self.backbone.term { + // new term, we'll have to step_down + bail!("not a leader"); + } else { + continue; + } + } + bail!("offset of the message doesn't match expected offset"); + } + self.current_kafka_offset += 1; + } + + Ok(()) + } + + pub async fn run(mut self) -> anyhow::Result>> { + tracing::info!("entering primary state"); + let mut config = ClientConfig::new(); + config + .set("group.id", &self.backbone.config.node_id) + .set( + "bootstrap.servers", + self.backbone + .config + .kafka_bootstrap_servers + .first() + .unwrap() + .to_string(), + ) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false"); + let consumer: StreamConsumer = config.create()?; + consumer.subscribe(&[&self.backbone.config.cluster_id])?; + let mut producer: FutureProducer = config.set("transactional.id", "12").create()?; + producer.init_transactions(Timeout::Never)?; + + let (sender, mut recv) = watch::channel((None, 0)); + let consumer_loop_handle = tokio::spawn(async move { + let mut stream = consumer.stream(); + loop { + tokio::select! { + biased; + _ = sender.closed() => { + return Ok(()) + } + Some(msg) = stream.next() => { + match msg { + Ok(msg) => { + if msg.key() == Some(b"meta") { + let meta: MetaMessage = serde_json::from_slice(msg.payload().unwrap())?; + if let Err(_) = sender.send((Some(meta), msg.offset())) { + return Ok(()); + } + // if self.backbone.term >= meta.term { + // continue; + // } + // let offset = msg.offset(); + // drop(msg); // holding a ref to the message while dropping the consumer causes a + // return Role::transition(self, meta, offset).map(Some); + } else { + continue; + } + } + Err(e) => bail!(e), + } + } + } + } + }); + + loop { + tokio::select! { + biased; + maybe_new_term = recv.changed() => { + match maybe_new_term { + Ok(()) => { + let term = { + let meta_ref = recv.borrow_and_update() ; + let (Some(ref meta), _) = *meta_ref else { panic!("watcher updated without new meta") }; + meta.term + }; + dbg!(term); + if dbg!(term) > dbg!(self.backbone.term) { + // this must be the last channel, dropping it will cause the + // consumer loop to exit. + let new_role = { + let meta_ref = recv.borrow_and_update() ; + let (Some(ref meta), offset) = *meta_ref else { panic!("watcher updated without new meta") }; + Role::transition(self, meta, offset).map(Some) + }; + dbg!(); + drop(recv); + let _ = consumer_loop_handle.await; + dbg!(); + return new_role; + } + } + Err(_) => { + todo!() + } + } + } + Some((frames, ret)) = self.frame_receiver.recv() => { + if let Err(e) = producer.begin_transaction() { + let _ = ret.send(Err(anyhow::anyhow!("failed to start transaction: {e}"))); + continue; + } + // increment offset for begin + self.current_kafka_offset += 1; + match self.handle_frames(&mut producer, frames, recv.clone()).await { + Ok(_) => { + // todo: this blocks the executor + if let Err(e) = producer.commit_transaction(Duration::from_secs(1)) { + let _ = ret.send(Err(anyhow::anyhow!("failed to commit transaction: {e}"))); + } else { + // increment offset for commit + self.current_kafka_offset += 1; + let _ = ret.send(Ok(())); + } + }, + Err(e) => { + let _ = ret.send(Err(anyhow::anyhow!("failed to commit transaction: {e}"))); + // todo: this blocks the executor + producer.abort_transaction(Duration::from_secs(2)).unwrap(); + // increment offset for rollback + self.current_kafka_offset += 1; + }, + } + } + Some((id, msg)) = self.backbone.db_ops_receiver.recv() => { + self.connections.handle_op(id, msg).await?; + }, + else => return Ok(None), + } + } + } +} diff --git a/sqld/src/database/backbone/replica.rs b/sqld/src/database/backbone/replica.rs new file mode 100644 index 00000000..2de2d038 --- /dev/null +++ b/sqld/src/database/backbone/replica.rs @@ -0,0 +1,142 @@ +use bytes::Bytes; +use futures::StreamExt; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::{ClientConfig, Message}; +use tokio::sync::watch; + +use crate::configure_rpc; +use crate::database::backbone::MetaMessage; +use crate::database::write_proxy::WriteProxyDbFactory; +use crate::replication::replica::{FrameInjectorHandle, Frames, ReplicationMeta}; +use crate::replication::{frame::Frame, FrameNo}; + +use super::{BackboneDatabase, Connections, NodeInfo, Role}; + +pub struct ReplicaState<'a> { + pub backbone: &'a mut BackboneDatabase, + buffer: Vec, + next_frame_no: FrameNo, + connections: Connections, + frame_no_sender: watch::Sender, + current_primary: NodeInfo, +} + +impl<'a> ReplicaState<'a> { + pub fn new(backbone: &'a mut BackboneDatabase, current_primary: NodeInfo) -> Self { + let config = &backbone.config; + let (channel, uri) = configure_rpc( + dbg!(current_primary.addr.clone()), + config.rpc_tls_config.clone(), + ) + .expect("invalid rpc configuration"); + let (frame_no_sender, frame_no_receiver) = watch::channel(0); + let factory = WriteProxyDbFactory::new( + config.db_path.clone(), + config.extensions.clone(), + channel, + uri, + config.stats.clone(), + frame_no_receiver, + ); + let connections = Connections::new(Box::new(factory)); + + Self { + backbone, + buffer: Vec::new(), + connections, + frame_no_sender, + current_primary, + next_frame_no: 0, + } + } + + pub async fn run(mut self) -> anyhow::Result> { + tracing::info!( + primary = self.current_primary.id, + addr = self.current_primary.addr, + "Started running in replica mode", + ); + + let (mut injector, _) = + FrameInjectorHandle::new(self.backbone.config.db_path.clone(), |meta| { + Ok(meta.unwrap_or_else(|| ReplicationMeta { + pre_commit_frame_no: 0, + post_commit_frame_no: 0, + generation_id: 0, + database_id: 0, + })) + }) + .await?; + + let mut config = ClientConfig::new(); + config + .set("group.id", &self.backbone.config.node_id) + .set( + "bootstrap.servers", + self.backbone + .config + .kafka_bootstrap_servers + .first() + .unwrap() + .to_string(), + ) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false"); + let consumer: StreamConsumer = config.create()?; + consumer.subscribe(&[&self.backbone.config.cluster_id])?; + let mut stream = consumer.stream(); + + loop { + tokio::select! { + biased; + Some(Ok(msg)) = stream.next() => { + if msg.key() == Some(b"meta") { + let meta: MetaMessage = serde_json::from_slice(msg.payload().unwrap())?; + if self.backbone.term >= meta.term { + continue; + } + let offset = msg.offset(); + drop(msg); + injector.shutdown().await?; + return Role::transition(self, &meta, offset); + } else if let Some(key) = msg.key() { + if key.starts_with(b"frame") { + let key =std::str::from_utf8(key).unwrap(); + let (_key, term) = key.split_once(':').unwrap(); + let term = term.parse::().unwrap(); + assert_eq!(term, self.backbone.term); + if let Some(payload) = msg.payload() { + let bytes = Bytes::copy_from_slice(payload); + let frame = Frame::try_from_bytes(bytes).unwrap(); + // TODO: this is blocking for too long, delegate to a separate thread + // instead + self.handle_frame(frame, &mut injector).await; + } + + } + } + } + Some((id, op)) = self.backbone.db_ops_receiver.recv() => { + self.connections.handle_op(id, op).await?; + } + } + } + } + + async fn handle_frame(&mut self, frame: Frame, injector: &mut FrameInjectorHandle) { + let should_inject = frame.header().size_after != 0; + dbg!(frame.header()); + assert_eq!(self.next_frame_no, frame.header().frame_no); + self.next_frame_no += 1; + self.buffer.push(frame); + dbg!(should_inject); + if should_inject { + // transaction boundary, inject buffer + injector + .inject_frames(Frames::Vec(std::mem::take(&mut self.buffer))) + .await + .unwrap(); + } + } +} diff --git a/sqld/src/error.rs b/sqld/src/error.rs index 89275536..20dfca16 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -27,6 +27,8 @@ pub enum Error { NotAuthorized(String), #[error("The replicator exited, instance cannot make any progress.")] ReplicatorExited, + #[error("Connection Reset")] + ConnectionReset, } impl From for Error { From cd0f99d9051325e01530b8250b515fba57a44266 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 3 May 2023 17:38:48 +0200 Subject: [PATCH 3/3] checkpoint --- Cargo.lock | 2 - bottomless/src/lib.rs | 6 +- sqld-libsql-bindings/Cargo.toml | 10 +++- sqld-libsql-bindings/src/ffi/types.rs | 2 + sqld-libsql-bindings/src/wal_hook.rs | 4 ++ sqld/Cargo.toml | 8 ++- sqld/src/database/backbone/mod.rs | 2 + sqld/src/database/backbone/primary.rs | 58 ++++++++++-------- sqld/src/database/backbone/replica.rs | 2 + sqld/src/database/libsql.rs | 76 ++++++++++++++++++++++-- sqld/src/replication/primary/logger.rs | 32 ++++++---- sqld/src/replication/replica/hook.rs | 11 +++- sqld/src/replication/replica/injector.rs | 13 +--- 13 files changed, 166 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32439139..8b8a5bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2250,7 +2250,6 @@ dependencies = [ [[package]] name = "libsqlite3-sys" version = "0.26.0" -source = "git+https://github.com/psarna/rusqlite?rev=a6332e530f30dc2d47110#a6332e530f30dc2d471103eed96a650407a73c7a" dependencies = [ "bindgen", "cc", @@ -3483,7 +3482,6 @@ dependencies = [ [[package]] name = "rusqlite" version = "0.29.0" -source = "git+https://github.com/psarna/rusqlite?rev=a6332e530f30dc2d47110#a6332e530f30dc2d471103eed96a650407a73c7a" dependencies = [ "bitflags 2.2.1", "fallible-iterator", diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index e290fdde..a500497c 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -9,7 +9,7 @@ pub mod replicator; use crate::ffi::{ bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal, }; -use std::ffi::{c_char, c_void}; +use std::ffi::{c_char, c_int, c_void}; // Just heuristics, but should work for ~100% of cases fn is_regular(vfs: *const sqlite3_vfs) -> bool { @@ -238,6 +238,8 @@ pub extern "C" fn xFrames( size_after: u32, is_commit: i32, sync_flags: i32, + _precommit_cb: Option c_int>, + _precommit_ctx: *mut c_void, ) -> i32 { let mut last_consistent_frame = 0; if !is_local() { @@ -282,6 +284,8 @@ pub extern "C" fn xFrames( size_after, is_commit, sync_flags, + None, + std::ptr::null_mut(), ) }; if is_local() || rc != ffi::SQLITE_OK { diff --git a/sqld-libsql-bindings/Cargo.toml b/sqld-libsql-bindings/Cargo.toml index 17c7023b..7a80445c 100644 --- a/sqld-libsql-bindings/Cargo.toml +++ b/sqld-libsql-bindings/Cargo.toml @@ -9,10 +9,16 @@ edition = "2021" anyhow = "1.0.66" mvfs = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true } mwal = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true } -rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [ +# rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [ +# "buildtime_bindgen", +# "bundled-libsql-wasm-experimental", +# "column_decltype" +# ] } +rusqlite = { version = "0.29.0", path = "../../rusqlite", default-features = false, features = [ "buildtime_bindgen", "bundled-libsql-wasm-experimental", - "column_decltype" + "column_decltype", + "load_extension" ] } tracing = "0.1.37" diff --git a/sqld-libsql-bindings/src/ffi/types.rs b/sqld-libsql-bindings/src/ffi/types.rs index 239e9042..989533d7 100644 --- a/sqld-libsql-bindings/src/ffi/types.rs +++ b/sqld-libsql-bindings/src/ffi/types.rs @@ -60,6 +60,8 @@ pub type XWalFrameFn = unsafe extern "C" fn( size_after: u32, is_commit: c_int, sync_flags: c_int, + precommit_cb: Option c_int>, + precommit_ctx: *mut c_void, ) -> c_int; pub type XWalUndoFn = unsafe extern "C" fn( wal: *mut Wal, diff --git a/sqld-libsql-bindings/src/wal_hook.rs b/sqld-libsql-bindings/src/wal_hook.rs index 758a0f13..596c318d 100644 --- a/sqld-libsql-bindings/src/wal_hook.rs +++ b/sqld-libsql-bindings/src/wal_hook.rs @@ -35,6 +35,8 @@ pub unsafe trait WalHook { size_after, is_commit, sync_flags, + None, + std::ptr::null_mut(), ) } } @@ -230,6 +232,8 @@ pub extern "C" fn xFrames( size_after: u32, is_commit: c_int, sync_flags: c_int, + _precommit_cb: Option c_int>, + _precommit_ctx: *mut c_void, ) -> c_int { let orig_methods = unsafe { get_orig_methods(wal) }; let methods = unsafe { get_methods(wal) }; diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 22788f30..b5c90015 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -32,7 +32,13 @@ pin-project-lite = "0.2.9" postgres-protocol = "0.6.4" prost = "0.11.3" regex = "1.7.0" -rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [ +# rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [ +# "buildtime_bindgen", +# "bundled-libsql-wasm-experimental", +# "column_decltype", +# "load_extension" +# ] } +rusqlite = { version = "0.29.0", path = "../../rusqlite", default-features = false, features = [ "buildtime_bindgen", "bundled-libsql-wasm-experimental", "column_decltype", diff --git a/sqld/src/database/backbone/mod.rs b/sqld/src/database/backbone/mod.rs index 4bdc788f..4777ead4 100644 --- a/sqld/src/database/backbone/mod.rs +++ b/sqld/src/database/backbone/mod.rs @@ -118,6 +118,8 @@ impl<'a> Role<'a> { assert!(backbone.term < meta.term); backbone.term = meta.term; if meta.primary_infos.id == backbone.config.node_id { + // TODO: better parsing + assert_eq!(meta.primary_infos.addr, backbone.config.rpc_server_addr); // we are the new primary let primary = PrimaryState::new(backbone, offset as _)?; Ok(Role::Primary(primary)) diff --git a/sqld/src/database/backbone/primary.rs b/sqld/src/database/backbone/primary.rs index d93d7e8b..224d2a99 100644 --- a/sqld/src/database/backbone/primary.rs +++ b/sqld/src/database/backbone/primary.rs @@ -64,32 +64,40 @@ impl<'a> PrimaryState<'a> { let record = FutureRecord::to(&self.backbone.config.cluster_id) .key(&key) .payload(frame.as_bytes()); - let (_, offset) = producer.send(record, Duration::from_secs(0)).await.unwrap(); - // Since there is only a single primary, no one else should be writing to the queue - // while we are. If we notice that the offset of the message we just wrote increased - // by more than one, it means that someone else wrote to the queue, and we should - // rollback the transaction. - // - // there may be in flight messages, that doesn't mean we are not the leader - // anymore. In order to solve that: - // - if the condition described above is satisfied, proceed with commit - // - else, spawn the consumer on a different task, and wait for this task to catch up - // with the replication offset. Once this is done compare the terms. If the term has - // changed, then, stepdown and rollback. Otherwise, we are still leader, proceed to - // commit and update offset. - if dbg!(offset) as u64 != dbg!(self.current_kafka_offset + 1) { - let r = recv.wait_for(|(_, offset)| offset >= offset).await?; - if let (Some(ref meta), _) = *r { - if meta.term > self.backbone.term { - // new term, we'll have to step_down - bail!("not a leader"); - } else { - continue; + match producer.send(record, Timeout::Never).await { + Ok((_partition, offset)) => { + // Since there is only a single primary, no one else should be writing to the queue + // while we are. If we notice that the offset of the message we just wrote increased + // by more than one, it means that someone else wrote to the queue, and we should + // rollback the transaction. + // + // there may be in flight messages, that doesn't mean we are not the leader + // anymore. In order to solve that: + // - if the condition described above is satisfied, proceed with commit + // - else, spawn the consumer on a different task, and wait for this task to catch up + // with the replication offset. Once this is done compare the terms. If the term has + // changed, then, stepdown and rollback. Otherwise, we are still leader, proceed to + // commit and update offset. + if dbg!(offset) as u64 != dbg!(self.current_kafka_offset + 1) { + let r = recv.wait_for(|(_, offset)| offset >= offset).await?; + if let (Some(ref meta), _) = *r { + if meta.term > self.backbone.term { + // new term, we'll have to step_down + tracing::error!("cannot perform write: not a leader"); + bail!("not a leader"); + } else { + self.current_kafka_offset = offset as _; + continue; + } + } + bail!("offset of the message doesn't match expected offset"); } + self.current_kafka_offset += 1; + } + Err((_e, _msg)) => { + bail!("failed to replicate") } - bail!("offset of the message doesn't match expected offset"); } - self.current_kafka_offset += 1; } Ok(()) @@ -204,7 +212,9 @@ impl<'a> PrimaryState<'a> { Err(e) => { let _ = ret.send(Err(anyhow::anyhow!("failed to commit transaction: {e}"))); // todo: this blocks the executor - producer.abort_transaction(Duration::from_secs(2)).unwrap(); + if let Err(e) = producer.abort_transaction(Duration::from_secs(2)) { + tracing::error!("failed to rollback: {e}"); + } // increment offset for rollback self.current_kafka_offset += 1; }, diff --git a/sqld/src/database/backbone/replica.rs b/sqld/src/database/backbone/replica.rs index 2de2d038..89ca6797 100644 --- a/sqld/src/database/backbone/replica.rs +++ b/sqld/src/database/backbone/replica.rs @@ -68,6 +68,8 @@ impl<'a> ReplicaState<'a> { }) .await?; + dbg!(); + let mut config = ClientConfig::new(); config .set("group.id", &self.backbone.config.node_id) diff --git a/sqld/src/database/libsql.rs b/sqld/src/database/libsql.rs index 8de40b79..213feb6b 100644 --- a/sqld/src/database/libsql.rs +++ b/sqld/src/database/libsql.rs @@ -78,11 +78,12 @@ macro_rules! ok_or_exit { pub fn open_db( path: &Path, - wal_hook: impl WalHook + Send + Clone + 'static, + wal_hook: impl WalHook + Clone + 'static, with_bottomless: bool, ) -> anyhow::Result { let mut retries = 0; - loop { + while retries < 10 { + retries += 1; #[cfg(feature = "mwal_backend")] let conn_result = match crate::VWAL_METHODS.get().unwrap() { Some(ref vwal_methods) => crate::libsql::mwal::open_with_virtual_wal( @@ -128,17 +129,18 @@ pub fn open_db( // For this reason we may not be able to open the database right away, so we // retry a couple of times before giving up. Ok(rusqlite::Error::SqliteFailure(e, _)) - if e.code == rusqlite::ffi::ErrorCode::DatabaseBusy && retries < 10 => + if e.code == rusqlite::ffi::ErrorCode::DatabaseBusy => { std::thread::sleep(Duration::from_millis(10)); - retries += 1; } - Ok(e) => panic!("Unhandled error opening libsql: {e}"), - Err(e) => panic!("Unhandled error opening libsql: {e}"), + Ok(e) => anyhow::bail!("error opening libsql: {e}"), + Err(e) => anyhow::bail!("error opening libsql: {e}"), } } } } + + anyhow::bail!("error opening libsql") } impl LibSqlDb { @@ -399,3 +401,65 @@ impl Database for LibSqlDb { Ok(receiver.await?) } } + +#[cfg(test)] +mod test { + use std::ffi::{c_int, c_void}; + use std::sync::{atomic::AtomicBool, Arc}; + + use tempfile::tempdir; + + use super::*; + + #[test] + fn test_pre_commit_callback() { + #[derive(Clone)] + struct TestHook(Arc); + unsafe impl WalHook for TestHook { + fn on_frames( + &mut self, + wal: *mut rusqlite::ffi::libsql_wal, + page_size: std::ffi::c_int, + page_headers: *mut rusqlite::ffi::libsql_pghdr, + size_after: u32, + is_commit: std::ffi::c_int, + sync_flags: std::ffi::c_int, + orig: sqld_libsql_bindings::ffi::types::XWalFrameFn, + ) -> std::ffi::c_int { + unsafe { + orig( + wal, + page_size, + page_headers, + size_after, + is_commit, + sync_flags, + Some(pre_commit_cb), + self as *mut _ as *mut c_void, + ) + } + } + } + + unsafe extern "C" fn pre_commit_cb(ctx: *mut c_void) -> c_int { + let this = &mut *(ctx as *mut TestHook); + if this.0.load(std::sync::atomic::Ordering::Relaxed) { + 0 + } else { + 1 + } + } + + let hook = TestHook(Arc::new(AtomicBool::new(false))); + let dir = tempdir().unwrap(); + let db = open_db(dir.path(), hook.clone(), false).unwrap(); + + assert!(db.execute("CREATE TABLE test (x);", ()).is_err()); + assert!(db.execute("SELECT * from test;", ()).is_err()); // table doesn't exist + + hook.0.store(true, std::sync::atomic::Ordering::Relaxed); + + assert!(db.execute("CREATE TABLE test (x);", ()).is_ok()); + assert!(db.execute("SELECT * FROM test;", ()).is_ok()); // table created + } +} diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 16726a70..8f484aa1 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -14,7 +14,6 @@ use rusqlite::ffi::SQLITE_ERROR; use tokio::sync::watch; use uuid::Uuid; -use crate::database::SQLITE_MUST_ROLLBACK; use crate::libsql::ffi::{ types::{XWalFrameFn, XWalUndoFn}, PgHdr, Wal, @@ -69,6 +68,24 @@ unsafe impl WalHook for ReplicationLoggerHook { None }; + unsafe extern "C" fn pre_commit_cb(ctx: *mut c_void) -> c_int { + let (is_commit, commit_info, ntruncate, this) = + &mut *(ctx as *mut (i32, Option<(u64, u64)>, u32, &mut ReplicationLoggerHook)); + if *is_commit != 0 { + if let Some((count, checksum)) = commit_info { + if let Err(e) = (*this).commit(*count, *checksum) { + tracing::error!("error during pre-commit phase: {e}"); + return SQLITE_ERROR; + } + } + + this.logger.maybe_compact(*ntruncate); + } + + 0 + } + + let mut ctx = (is_commit, commit_info, ntruncate, self); let rc = unsafe { orig( wal, @@ -77,20 +94,11 @@ unsafe impl WalHook for ReplicationLoggerHook { ntruncate, is_commit, sync_flags, + Some(pre_commit_cb), + &mut ctx as *mut _ as *mut c_void, ) }; - if is_commit != 0 && rc == 0 { - if let Some((count, checksum)) = commit_info { - if let Err(e) = self.commit(count, checksum) { - tracing::error!("failed to commit: {e}"); - return SQLITE_MUST_ROLLBACK; - } - } - - self.logger.maybe_compact(ntruncate); - } - rc } diff --git a/sqld/src/replication/replica/hook.rs b/sqld/src/replication/replica/hook.rs index c1fd869c..a717d724 100644 --- a/sqld/src/replication/replica/hook.rs +++ b/sqld/src/replication/replica/hook.rs @@ -97,7 +97,16 @@ impl InjectorHookInner { ) -> anyhow::Result<()> { self.pre_commit(last_frame_no) .expect("failed to write pre-commit frame_no"); - let ret = orig(wal, WAL_PAGE_SIZE, page_headers, size_after, 1, sync_flags); + let ret = orig( + wal, + WAL_PAGE_SIZE, + page_headers, + size_after, + 1, + sync_flags, + None, + std::ptr::null_mut(), + ); if ret == 0 { debug_assert!(all_applied(page_headers)); diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs index 893eb8d0..45b5333f 100644 --- a/sqld/src/replication/replica/injector.rs +++ b/sqld/src/replication/replica/injector.rs @@ -1,11 +1,10 @@ use std::fs::File; use std::path::{Path, PathBuf}; -use rusqlite::OpenFlags; -use sqld_libsql_bindings::open_with_regular_wal; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; +use crate::database::libsql::open_db; use crate::replication::FrameNo; use super::hook::{Frames, InjectorHook}; @@ -97,15 +96,7 @@ impl FrameInjector { } pub fn new(db_path: &Path, hook: InjectorHook) -> anyhow::Result { - let conn = open_with_regular_wal( - db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, - hook.clone(), - false, // bottomless replication is not enabled for replicas - )?; + let conn = open_db(db_path, hook.clone(), false)?; Ok(Self { conn, hook }) }