From e0010d14c10a4466fffffe68c07c38f750a436c8 Mon Sep 17 00:00:00 2001 From: 0xZensh Date: Mon, 25 Sep 2023 09:45:22 +0800 Subject: [PATCH] feat: implement message. --- Cargo.lock | 172 +++++++------ Cargo.toml | 2 +- cql/schema_table.cql | 262 +++++++++++++++++++- src/api/message.rs | 250 +++++++++++++++++++ src/api/mod.rs | 1 + src/db/mod.rs | 8 + src/db/model_creation.rs | 17 +- src/db/model_message.rs | 482 ++++++++++++++++++++++++++++++++++++ src/db/model_publication.rs | 33 +-- src/db/scylladb.rs | 9 +- src/router.rs | 12 + 11 files changed, 1128 insertions(+), 120 deletions(-) create mode 100644 src/api/message.rs create mode 100644 src/db/model_message.rs diff --git a/Cargo.lock b/Cargo.lock index 7e09b51..e403b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,7 +104,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -173,7 +173,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -269,9 +269,9 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -377,6 +377,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -437,9 +447,9 @@ dependencies = [ [[package]] name = "curl-sys" -version = "0.4.66+curl-8.3.0" +version = "0.4.67+curl-8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70c44a72e830f0e40ad90dda8a6ab6ed6314d39776599a58a2e5e37fbc6db5b9" +checksum = "3cc35d066510b197a0f72de863736641539957628c8a42e70e27c66849e77c34" dependencies = [ "cc", "libc", @@ -515,14 +525,14 @@ dependencies = [ [[package]] name = "enum-as-inner" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" +checksum = "5ffccbb6966c05b32ef8fbac435df276c4ae4d3dc55a8cd0eb9745e6c12f546a" dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] @@ -660,7 +670,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -929,17 +939,6 @@ dependencies = [ "cc", ] -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.4.0" @@ -1120,9 +1119,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libflate" @@ -1216,12 +1215,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "matchit" version = "0.7.3" @@ -1344,9 +1337,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1379,7 +1372,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1497,7 +1490,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1546,7 +1539,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -1625,9 +1618,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1726,9 +1719,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "async-compression", "base64 0.21.4", @@ -1753,6 +1746,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-rustls", "tokio-util", @@ -1958,13 +1952,13 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d777dadbf7163d1524ea4f5a095146298d263a686febb96d022cf46d06df32" +checksum = "5757ded3dfb10967ca7d1ff1084d072d565b5e10b2b21c286d5335c245425a7e" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] @@ -1989,7 +1983,7 @@ dependencies = [ "proc-macro2", "quote", "scylla-orm", - "syn 2.0.37", + "syn 2.0.38", "xid", ] @@ -2019,7 +2013,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -2210,15 +2204,15 @@ dependencies = [ [[package]] name = "sval" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f9150edabce0ada1e9b44f98d52817ba0fba9d572898da47e354a14a3eb406d" +checksum = "e55089b73dfa822e1eb6b635f8795215512cca94bfae11aee3a1a06228bc88bb" [[package]] name = "sval_buffer" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb08e361c8fbbc37fb3d08dc067a98207062d083ee5ef0b21e3739b16e69892" +checksum = "df307823073d63f1fb126895439fead41afc493ea35d636cceedef9f6b32ba81" dependencies = [ "sval", "sval_ref", @@ -2226,18 +2220,18 @@ dependencies = [ [[package]] name = "sval_dynamic" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae417f3812ea4403cd0cc0819628427ef6e099d5f482d80ed4e0f92836c51a85" +checksum = "e5f8e4c4d6d028d3cbff66c2bb3d98181d031d312b7df4550eea7142d7036f37" dependencies = [ "sval", ] [[package]] name = "sval_fmt" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a898ac59b0f7a0344d0ac0f408908f545d422ffbfe46522a5cdff3ed391650" +checksum = "ad53f8eb502b0a3051fea001ae2e3723044699868ebfe06ea81b45545db392c2" dependencies = [ "itoa", "ryu", @@ -2246,9 +2240,9 @@ dependencies = [ [[package]] name = "sval_json" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c63eda4f68a4df3d58f0c9805983560c1de8bf414800a990be25e433d1cccc8c" +checksum = "f913253c9f6cd27645ba9a0b6788039b5d4338eae0833c64b42ef178168d2862" dependencies = [ "itoa", "ryu", @@ -2257,18 +2251,18 @@ dependencies = [ [[package]] name = "sval_ref" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e59d69dac5af4c6b87c79b52581a5b9ab9cc2d019775dea318967ea3c3effd" +checksum = "66a9661412d06740ebe81512a527b3d9220460eb7685f4399232c0e670108cb7" dependencies = [ "sval", ] [[package]] name = "sval_serde" -version = "2.9.2" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93d72e44618c14d0f8aff885af8184a3579ffa0d9ad5b1fbffc59f85d6739982" +checksum = "b8d077e98c1c8dfa466837ae0ec1e03c78138d42ac75662dac05e1bf0aebae20" dependencies = [ "serde", "sval", @@ -2289,9 +2283,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.37" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -2340,6 +2334,27 @@ dependencies = [ "walkdir", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "thiserror" version = "1.0.49" @@ -2357,7 +2372,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -2405,9 +2420,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -2430,7 +2445,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -2554,7 +2569,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", ] [[package]] @@ -2578,9 +2593,9 @@ dependencies = [ [[package]] name = "trust-dns-proto" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26" +checksum = "0dc775440033cb114085f6f2437682b194fa7546466024b1037e82a48a052a69" dependencies = [ "async-trait", "cfg-if", @@ -2589,9 +2604,9 @@ dependencies = [ "futures-channel", "futures-io", "futures-util", - "idna 0.2.3", + "idna", "ipnet", - "lazy_static", + "once_cell", "rand", "smallvec", "thiserror", @@ -2603,16 +2618,17 @@ dependencies = [ [[package]] name = "trust-dns-resolver" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe" +checksum = "2dff7aed33ef3e8bf2c9966fccdfed93f93d46f432282ea875cd66faabc6ef2f" dependencies = [ "cfg-if", "futures-util", "ipconfig", - "lazy_static", "lru-cache", + "once_cell", "parking_lot", + "rand", "resolv-conf", "smallvec", "thiserror", @@ -2689,7 +2705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" dependencies = [ "form_urlencoded", - "idna 0.4.0", + "idna", "percent-encoding", ] @@ -2709,7 +2725,7 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b92f40481c04ff1f4f61f304d61793c7b56ff76ac1469f1beb199b1445b253bd" dependencies = [ - "idna 0.4.0", + "idna", "lazy_static", "regex", "serde", @@ -2845,7 +2861,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -2879,7 +2895,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.37", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3020,9 +3036,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" dependencies = [ "memchr", ] @@ -3048,7 +3064,7 @@ dependencies = [ [[package]] name = "writing" -version = "1.1.6" +version = "1.2.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 6f1a0c7..320516c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "writing" -version = "1.1.6" +version = "1.2.0" edition = "2021" rust-version = "1.64" description = "" diff --git a/cql/schema_table.cql b/cql/schema_table.cql index 21b7343..df85e27 100644 --- a/cql/schema_table.cql +++ b/cql/schema_table.cql @@ -92,7 +92,7 @@ CREATE INDEX publication_url ON publication (original_url); CREATE INDEX publication_gid_status ON publication ((gid), status); CREATE TABLE IF NOT EXISTS pub_index ( - day INT, -- publication day + day INT, -- publication birthday cid BLOB, -- creation id language TEXT, -- publication's language, ISO 639-3 original BOOLEAN, -- is original @@ -109,6 +109,85 @@ CREATE TABLE IF NOT EXISTS pub_index ( CREATE INDEX pub_index_day_gid ON pub_index ((day), gid); CREATE INDEX pub_index_gid ON pub_index (gid); +CREATE TABLE IF NOT EXISTS collection ( + gid BLOB, -- group id, publication belong to + id BLOB, -- collection id, 12 bytes XID + status TINYINT, -- int8, -1: Archived, 0: Internal, 1: Subscription, 2: Public + maintainer BLOB, -- user id who maintainer the collection. + message BLOB, -- message id, xid, collection's title and summary + updated_at BIGINT, -- update at, unix time with second precision. + collections SMALLINT, -- children collections count, max to 10000 + publications SMALLINT, -- children collections count, max to 10000 + PRIMARY KEY (gid, id) +) WITH CLUSTERING ORDER BY (id DESC) + AND caching = {'enabled': 'true'} + AND comment = 'collections' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; + +CREATE TABLE IF NOT EXISTS collection_collection ( + id BLOB, -- collection id, 12 bytes XID + cid BLOB, -- child collection id + gid BLOB, -- group id, child collection belong to + ord FLOAT, -- order value + PRIMARY KEY (id, cid) +) WITH CLUSTERING ORDER BY (cid DESC) + AND caching = {'enabled': 'true'} + AND comment = 'collection child collections' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; + +CREATE INDEX collection_collection_cid ON collection_collection (cid); + +CREATE TABLE IF NOT EXISTS collection_publication ( + id BLOB, -- collection id, 12 bytes XID + cid BLOB, -- child publication id + gid BLOB, -- group id, child publication belong to + ord FLOAT, -- order value + PRIMARY KEY (id, cid) +) WITH CLUSTERING ORDER BY (cid DESC) + AND caching = {'enabled': 'true'} + AND comment = 'collection child publications' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; + +CREATE INDEX collection_publication_cid ON collection_publication (cid); + +CREATE TABLE IF NOT EXISTS collection_subscription ( + id BLOB, -- collection id, 12 bytes XID + uid BLOB, -- user id who subscribe the collection + created_at BIGINT, -- create at, unix time, ms + expire_at BIGINT, -- subscription expire at, unix time, ms + txn BLOB, -- latest subscription transaction id, 12 bytes XID + PRIMARY KEY (id, uid) +) WITH CLUSTERING ORDER BY (uid DESC) + AND caching = {'enabled': 'true'} + AND comment = 'collection''s subscription' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; + +CREATE INDEX collection_subscription_uid ON collection_subscription (uid); + +CREATE TABLE IF NOT EXISTS publication_subscription ( + cid BLOB, -- publication id, 12 bytes XID + uid BLOB, -- user id who subscribe the publication + created_at BIGINT, -- create at, unix time, ms + expire_at BIGINT, -- subscription expire at, unix time, ms + txn BLOB, -- latest subscription transaction id, 12 bytes XID + PRIMARY KEY (cid, uid) +) WITH CLUSTERING ORDER BY (uid DESC) + AND caching = {'enabled': 'true'} + AND comment = 'publication''s subscription' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; + +CREATE INDEX publication_subscription_uid ON publication_subscription (uid); + CREATE TABLE IF NOT EXISTS bookmark ( uid BLOB, -- user id who create the bookmark, 12 bytes XID id BLOB, -- bookmark id, 12 bytes XID @@ -120,15 +199,17 @@ CREATE TABLE IF NOT EXISTS bookmark ( updated_at BIGINT, -- update at, unix time with second precision. title TEXT, -- title labels LIST, -- labels for bookmarks management + payload BLOB, -- payload in CBOR format PRIMARY KEY (uid, id) ) WITH CLUSTERING ORDER BY (id DESC) - AND caching = {'enabled': 'false'} + AND caching = {'enabled': 'true'} AND comment = 'user''s bookmarks' AND compaction = {'class': 'SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'LZ4Compressor'} AND default_time_to_live = 0; CREATE INDEX bookmark_uid_cid ON bookmark ((uid), cid); +CREATE INDEX bookmark_cid ON bookmark (cid); CREATE TABLE IF NOT EXISTS deleted_creation ( gid BLOB, -- group id, creation belong to @@ -185,3 +266,180 @@ CREATE TABLE IF NOT EXISTS deleted_publication ( AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': 10} AND compression = {'sstable_compression': 'LZ4Compressor'} AND default_time_to_live = 17280000; + +CREATE TABLE IF NOT EXISTS message ( + day INT, -- message birthday + id BLOB, -- message id, 12 bytes XID + attach_to BLOB, -- attach to, group id, collection id, etc. + kind TEXT, -- attach kind: group, collection, etc. + created_at BIGINT, -- created at, unix time, ms + updated_at BIGINT, -- updated at, unix time, ms + context TEXT, -- contextual definition for translation + language TEXT, -- message's language, ISO 639-3 + version SMALLINT, -- creation version + message BLOB, -- original message in CBOR format + abk BLOB, -- Abkhazian message + aar BLOB, -- Afar message + afr BLOB, -- Afrikaans message + aka BLOB, -- Akan message + sqi BLOB, -- Albanian message + amh BLOB, -- Amharic message + ara BLOB, -- Arabic message + arg BLOB, -- Aragonese message + hye BLOB, -- Armenian message + asm BLOB, -- Assamese message + ava BLOB, -- Avaric message + aze BLOB, -- Azerbaijani message + bam BLOB, -- Bambara message + bak BLOB, -- Bashkir message + eus BLOB, -- Basque message + bel BLOB, -- Belarusian message + ben BLOB, -- Bengali message + bis BLOB, -- Bislama message + bos BLOB, -- Bosnian message + bre BLOB, -- Breton message + bul BLOB, -- Bulgarian message + mya BLOB, -- Burmese message + cat BLOB, -- Catalan message + cha BLOB, -- Chamorro message + che BLOB, -- Chechen message + zho BLOB, -- Chinese message + chu BLOB, -- Church Slavic message + chv BLOB, -- Chuvash message + cor BLOB, -- Cornish message + cos BLOB, -- Corsican message + hrv BLOB, -- Croatian message + ces BLOB, -- Czech message + dan BLOB, -- Danish message + div BLOB, -- Dhivehi message + nld BLOB, -- Dutch message + dzo BLOB, -- Dzongkha message + eng BLOB, -- English message + epo BLOB, -- Esperanto message + est BLOB, -- Estonian message + ewe BLOB, -- Ewe message + fao BLOB, -- Faroese message + fin BLOB, -- Finnish message + fra BLOB, -- French message + ful BLOB, -- Fulah message + glg BLOB, -- Galician message + lug BLOB, -- Ganda message + kat BLOB, -- Georgian message + deu BLOB, -- German message + guj BLOB, -- Gujarati message + hat BLOB, -- Haitian message + hau BLOB, -- Hausa message + heb BLOB, -- Hebrew message + hin BLOB, -- Hindi message + hun BLOB, -- Hungarian message + isl BLOB, -- Icelandic message + ibo BLOB, -- Igbo message + ind BLOB, -- Indonesian message + ina BLOB, -- Interlingua message + iku BLOB, -- Inuktitut message + gle BLOB, -- Irish message + ita BLOB, -- Italian message + jpn BLOB, -- Japanese message + jav BLOB, -- Javanese message + kal BLOB, -- Kalaallisut message + kan BLOB, -- Kannada message + kas BLOB, -- Kashmiri message + kaz BLOB, -- Kazakh message + khm BLOB, -- Khmer message + kik BLOB, -- Kikuyu message + kin BLOB, -- Kinyarwanda message + kir BLOB, -- Kirghiz message + kor BLOB, -- Korean message + kua BLOB, -- Kuanyama message + kur BLOB, -- Kurdish message + lao BLOB, -- Lao message + lav BLOB, -- Latvian message + lim BLOB, -- Limburgan message + lin BLOB, -- Lingala message + lit BLOB, -- Lithuanian message + lub BLOB, -- Luba-Katanga message + ltz BLOB, -- Luxembourgish message + mkd BLOB, -- Macedonian message + mlg BLOB, -- Malagasy message + msa BLOB, -- Malay message + mal BLOB, -- Malayalam message + mlt BLOB, -- Maltese message + glv BLOB, -- Manx message + mri BLOB, -- Maori message + mar BLOB, -- Marathi message + ell BLOB, -- Modern Greek message + mon BLOB, -- Mongolian message + nav BLOB, -- Navajo message + nep BLOB, -- Nepali message + nde BLOB, -- North Ndebele message + sme BLOB, -- Northern Sami message + nor BLOB, -- Norwegian message + nno BLOB, -- Norwegian Nynorsk message + nya BLOB, -- Nyanja message + oci BLOB, -- Occitan message + ori BLOB, -- Oriya message + orm BLOB, -- Oromo message + oss BLOB, -- Ossetian message + pan BLOB, -- Panjabi message + fas BLOB, -- Persian message + pol BLOB, -- Polish message + por BLOB, -- Portuguese message + pus BLOB, -- Pushto message + que BLOB, -- Quechua message + ron BLOB, -- Romanian message + roh BLOB, -- Romansh message + run BLOB, -- Rundi message + rus BLOB, -- Russian message + smo BLOB, -- Samoan message + sag BLOB, -- Sango message + san BLOB, -- Sanskrit message + gla BLOB, -- Scottish Gaelic message + srp BLOB, -- Serbian message + sna BLOB, -- Shona message + iii BLOB, -- Sichuan Yi message + snd BLOB, -- Sindhi message + sin BLOB, -- Sinhala message + slk BLOB, -- Slovak message + slv BLOB, -- Slovenian message + som BLOB, -- Somali message + nbl BLOB, -- South Ndebele message + sot BLOB, -- Southern Sotho message + spa BLOB, -- Spanish message + sun BLOB, -- Sundanese message + swa BLOB, -- Swahili message + ssw BLOB, -- Swati message + swe BLOB, -- Swedish message + tgl BLOB, -- Tagalog message + tah BLOB, -- Tahitian message + tgk BLOB, -- Tajik message + tam BLOB, -- Tamil message + tat BLOB, -- Tatar message + tel BLOB, -- Telugu message + tha BLOB, -- Thai message + bod BLOB, -- Tibetan message + tir BLOB, -- Tigrinya message + ton BLOB, -- Tonga message + tso BLOB, -- Tsonga message + tsn BLOB, -- Tswana message + tur BLOB, -- Turkish message + tuk BLOB, -- Turkmen message + uig BLOB, -- Uighur message + ukr BLOB, -- Ukrainian message + urd BLOB, -- Urdu message + uzb BLOB, -- Uzbek message + ven BLOB, -- Venda message + vie BLOB, -- Vietnamese message + cym BLOB, -- Welsh message + fry BLOB, -- Western Frisian message + wol BLOB, -- Wolof message + xho BLOB, -- Xhosa message + yid BLOB, -- Yiddish message + yor BLOB, -- Yoruba message + zul BLOB, -- Zulu message + PRIMARY KEY (day, id) +) WITH CLUSTERING ORDER BY (id DESC) + AND caching = {'enabled': 'true'} + AND comment = 'phrases and sentences' + AND compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'LZ4Compressor'} + AND default_time_to_live = 0; diff --git a/src/api/message.rs b/src/api/message.rs new file mode 100644 index 0000000..65b5000 --- /dev/null +++ b/src/api/message.rs @@ -0,0 +1,250 @@ +use axum::{ + extract::{Query, State}, + Extension, +}; +use isolang::Language; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, convert::From, sync::Arc}; +use validator::Validate; + +use axum_web::context::ReqContext; +use axum_web::erring::{valid_user, HTTPError, SuccessResponse}; +use axum_web::object::PackObject; +use scylla_orm::ColumnsMap; + +use super::{get_fields, AppState, QueryId}; +use crate::db; + +#[derive(Debug, Deserialize, Serialize, Validate)] +pub struct CreateMessageInput { + pub attach_to: PackObject, + pub kind: String, + pub context: String, + pub language: PackObject, + pub message: PackObject>, +} + +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct MessageOutput { + pub id: PackObject, + pub i18n_messages: HashMap>>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub attach_to: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub language: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option>>, +} + +impl MessageOutput { + fn from(val: db::Message, to: &PackObject) -> Self { + let mut rt = Self { + id: to.with(val.id), + ..Default::default() + }; + + for v in val._fields { + match v.as_str() { + "attach_to" => rt.attach_to = Some(to.with(val.attach_to)), + "kind" => rt.kind = Some(val.kind.clone()), + "language" => rt.language = Some(to.with(val.language)), + "version" => rt.version = Some(val.version), + "created_at" => rt.created_at = Some(val.created_at), + "updated_at" => rt.updated_at = Some(val.updated_at), + "context" => rt.context = Some(val.context.to_owned()), + "message" => rt.message = Some(to.with(val.message.to_owned())), + _ => {} + } + } + + if !val._i18n_messages.is_empty() { + rt.i18n_messages = val + ._i18n_messages + .iter() + .map(|(k, v)| (k.to_owned(), to.with(v.to_owned()))) + .collect(); + } + + rt + } +} + +pub async fn create( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject, +) -> Result>, HTTPError> { + let (to, input) = to.unpack(); + input.validate()?; + valid_user(ctx.user)?; + + let attach_to = input.attach_to.unwrap(); + let language = input.language.unwrap(); + + let mut doc = db::Message { + id: xid::new(), + attach_to, + kind: input.kind, + language, + context: input.context, + message: input.message.unwrap(), + ..Default::default() + }; + + ctx.set_kvs(vec![ + ("action", "create_message".into()), + ("attach_to", doc.attach_to.to_string().into()), + ("kind", doc.kind.clone().into()), + ("language", doc.language.to_name().into()), + ]) + .await; + + doc.save(&app.scylla).await?; + Ok(to.with(SuccessResponse::new(MessageOutput::from(doc, &to)))) +} + +pub async fn get( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, +) -> Result>, HTTPError> { + input.validate()?; + valid_user(ctx.user)?; + + let id = *input.id.to_owned(); + + ctx.set_kvs(vec![ + ("action", "get_message".into()), + ("id", id.to_string().into()), + ]) + .await; + + let mut doc = db::Message::with_pk(id); + doc.get_one(&app.scylla, get_fields(input.fields.clone())) + .await?; + Ok(to.with(SuccessResponse::new(MessageOutput::from(doc, &to)))) +} + +#[derive(Debug, Deserialize, Validate)] +pub struct UpdateMessageInput { + pub id: PackObject, + #[validate(range(min = 1, max = 32767))] + pub version: i16, + #[validate(length(min = 0, max = 1024))] + pub context: Option, + pub message: Option>>, +} + +impl UpdateMessageInput { + fn into(self) -> anyhow::Result { + let mut cols = ColumnsMap::new(); + if let Some(context) = self.context { + cols.set_as("context", &context); + } + if let Some(message) = self.message { + cols.set_as("message", &message.unwrap()); + } + + if cols.is_empty() { + return Err(HTTPError::new(400, "No fields to update".to_string()).into()); + } + + Ok(cols) + } +} + +pub async fn update( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject, +) -> Result>, HTTPError> { + let (to, input) = to.unpack(); + input.validate()?; + valid_user(ctx.user)?; + + let id = *input.id.to_owned(); + let version = input.version; + let mut doc = db::Message::with_pk(id); + let cols = input.into()?; + ctx.set_kvs(vec![ + ("action", "update_message".into()), + ("id", doc.id.to_string().into()), + ]) + .await; + + let ok = doc.update(&app.scylla, cols, version).await?; + ctx.set("updated", ok.into()).await; + doc._fields = vec!["updated_at".to_string(), "version".to_string()]; + Ok(to.with(SuccessResponse::new(MessageOutput::from(doc, &to)))) +} + +#[derive(Debug, Deserialize, Validate)] +pub struct UpdateI18nMessageInput { + pub id: PackObject, + #[validate(range(min = 1, max = 32767))] + pub version: i16, + pub language: PackObject, + pub message: PackObject>, +} + +pub async fn update_i18n( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject, +) -> Result>, HTTPError> { + let (to, input) = to.unpack(); + input.validate()?; + valid_user(ctx.user)?; + + let id = *input.id.to_owned(); + let version = input.version; + let language = input.language.to_639_3().to_string(); + let mut doc = db::Message::with_pk(id); + ctx.set_kvs(vec![ + ("action", "update_i18n_message".into()), + ("id", doc.id.to_string().into()), + ("language", language.clone().into()), + ]) + .await; + + let ok = doc + .update_i18n(&app.scylla, language, input.message.unwrap(), version) + .await?; + ctx.set("updated", ok.into()).await; + doc._fields = vec!["updated_at".to_string()]; + Ok(to.with(SuccessResponse::new(MessageOutput::from(doc, &to)))) +} + +pub async fn delete( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, +) -> Result>, HTTPError> { + input.validate()?; + valid_user(ctx.user)?; + + let id = *input.id.to_owned(); + + ctx.set_kvs(vec![ + ("action", "delete_bookmark".into()), + ("id", id.to_string().into()), + ]) + .await; + + let mut doc = db::Bookmark::with_pk(ctx.user, id); + let res = doc.delete(&app.scylla).await?; + Ok(to.with(SuccessResponse::new(res))) +} diff --git a/src/api/mod.rs b/src/api/mod.rs index b290ef7..ae39279 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -10,6 +10,7 @@ use crate::db; pub mod bookmark; pub mod creation; +pub mod message; pub mod publication; pub mod search; diff --git a/src/db/mod.rs b/src/db/mod.rs index a1c24ff..8a63254 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,6 +1,7 @@ mod model_bookmark; mod model_content; mod model_creation; +mod model_message; mod model_publication; use model_content::Content; @@ -10,6 +11,7 @@ pub mod scylladb; pub use model_bookmark::Bookmark; pub use model_creation::{Creation, CreationIndex}; +pub use model_message::Message; pub use model_publication::{Publication, PublicationIndex}; pub static USER_JARVIS: &str = "0000000000000jarvis0"; // system user @@ -17,3 +19,9 @@ pub static USER_ANON: &str = "000000000000000anon0"; // anonymous user pub static DEFAULT_MODEL: &str = "gpt-3.5"; // default model pub static MAX_ID: xid::Id = xid::Id([255; 12]); pub static MIN_ID: xid::Id = xid::Id([0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255]); + +pub fn xid_day(xid: xid::Id) -> i32 { + let raw = xid.as_bytes(); + let unix_ts = u32::from_be_bytes([raw[0], raw[1], raw[2], raw[3]]); + (unix_ts / (3600 * 24)) as i32 +} diff --git a/src/db/model_creation.rs b/src/db/model_creation.rs index 701ff96..d01de4b 100644 --- a/src/db/model_creation.rs +++ b/src/db/model_creation.rs @@ -616,22 +616,17 @@ impl Creation { None => MAX_ID, }; - let rows = if status.is_none() { + let rows = if let Some(status) = status { let query = format!( - "SELECT {} FROM creation WHERE gid=? AND id=0 LIMIT ? ALLOW FILTERING USING TIMEOUT 3s", + "SELECT {} FROM creation WHERE gid=? AND status=? AND id=0 LIMIT ? ALLOW FILTERING USING TIMEOUT 3s", + fields.clone().join(",")); + let params = (gid.to_cql(), token.to_cql(), page_size as i32); db.execute_iter(query, params).await? }; diff --git a/src/db/model_message.rs b/src/db/model_message.rs new file mode 100644 index 0000000..68aec0e --- /dev/null +++ b/src/db/model_message.rs @@ -0,0 +1,482 @@ +use isolang::Language; +use std::collections::HashMap; + +use axum_web::context::unix_ms; +use axum_web::erring::HTTPError; +use scylla_orm::{ColumnsMap, CqlValue, ToCqlVal}; +use scylla_orm_macros::CqlOrm; + +use crate::db::{scylladb, scylladb::extract_applied, xid_day}; + +pub static LANGUAGES: [&str; 158] = [ + "abk", "aar", "afr", "aka", "sqi", "amh", "ara", "arg", "hye", "asm", "ava", "aze", "bam", + "bak", "eus", "bel", "ben", "bis", "bos", "bre", "bul", "mya", "cat", "cha", "che", "zho", + "chu", "chv", "cor", "cos", "hrv", "ces", "dan", "div", "nld", "dzo", "eng", "epo", "est", + "ewe", "fao", "fin", "fra", "ful", "glg", "lug", "kat", "deu", "guj", "hat", "hau", "heb", + "hin", "hun", "isl", "ibo", "ind", "ina", "iku", "gle", "ita", "jpn", "jav", "kal", "kan", + "kas", "kaz", "khm", "kik", "kin", "kir", "kor", "kua", "kur", "lao", "lav", "lim", "lin", + "lit", "lub", "ltz", "mkd", "mlg", "msa", "mal", "mlt", "glv", "mri", "mar", "ell", "mon", + "nav", "nep", "nde", "sme", "nor", "nno", "nya", "oci", "ori", "orm", "oss", "pan", "fas", + "pol", "por", "pus", "que", "ron", "roh", "run", "rus", "smo", "sag", "san", "gla", "srp", + "sna", "iii", "snd", "sin", "slk", "slv", "som", "nbl", "sot", "spa", "sun", "swa", "ssw", + "swe", "tgl", "tah", "tgk", "tam", "tat", "tel", "tha", "bod", "tir", "ton", "tso", "tsn", + "tur", "tuk", "uig", "ukr", "urd", "uzb", "ven", "vie", "cym", "fry", "wol", "xho", "yid", + "yor", "zul", +]; + +#[derive(Debug, Default, Clone, CqlOrm, PartialEq)] +pub struct Message { + pub day: i32, + pub id: xid::Id, + pub attach_to: xid::Id, + pub kind: String, + pub created_at: i64, + pub updated_at: i64, + pub context: String, + pub language: Language, + pub version: i16, + pub message: Vec, + + pub _i18n_messages: HashMap>, + pub _fields: Vec, // selected fields,`_` 前缀字段会被 CqlOrm 忽略 +} + +impl Message { + pub fn with_pk(id: xid::Id) -> Self { + Self { + day: xid_day(id), + id, + ..Default::default() + } + } + + pub fn select_fields(select_fields: Vec, with_pk: bool) -> anyhow::Result> { + if select_fields.is_empty() { + return Ok(Self::fields()); + } + + let fields = Self::fields(); + let mut select_fields = select_fields; + if let Some(i) = select_fields.iter().position(|s| s == "i18n") { + select_fields.remove(i); + let field = "message".to_string(); + if !select_fields.contains(&field) { + select_fields.push(field); + } + for l in LANGUAGES { + select_fields.push(l.to_string()); + } + } + + for field in &select_fields { + if !fields.contains(field) && !LANGUAGES.contains(&field.as_str()) { + return Err(HTTPError::new(400, format!("Invalid field: {}", field)).into()); + } + } + + let field = "language".to_string(); + if !select_fields.contains(&field) { + select_fields.push(field); + } + let field = "version".to_string(); + if !select_fields.contains(&field) { + select_fields.push(field); + } + + if with_pk { + let field = "day".to_string(); + if !select_fields.contains(&field) { + select_fields.push(field); + } + let field = "id".to_string(); + if !select_fields.contains(&field) { + select_fields.push(field); + } + return Ok(select_fields); + } + + Ok(select_fields) + } + + pub fn fill_languages(&mut self, cols: &scylla_orm::ColumnsMap) { + for lang in LANGUAGES.iter() { + if cols.has(lang) { + self._i18n_messages + .insert(lang.to_string(), cols.get_as(lang).unwrap_or_default()); + } + } + } + + pub async fn get_one( + &mut self, + db: &scylladb::ScyllaDB, + select_fields: Vec, + ) -> anyhow::Result<()> { + let fields = Self::select_fields(select_fields, false)?; + self._fields = fields.clone(); + self.day = xid_day(self.id); + + let query = format!( + "SELECT {} FROM message WHERE day=? AND id=? LIMIT 1", + fields.join(",") + ); + let params = (self.day, self.id.to_cql()); + let res = db.execute(query, params).await?.single_row()?; + + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(res, &fields)?; + self.fill(&cols); + self.fill_languages(&cols); + + Ok(()) + } + + pub async fn batch_get( + db: &scylladb::ScyllaDB, + list: Vec, + select_fields: Vec, + ) -> anyhow::Result> { + let fields = Self::select_fields(select_fields, false)?; + + let query = format!( + "SELECT {} FROM message WHERE day=? AND id=? LIMIT 1", + fields.join(",") + ); + let mut res: Vec = Vec::with_capacity(list.len()); + for v in list { + let mut doc = Self::with_pk(v); + let row = db + .execute(query.as_str(), (doc.day, doc.id.to_cql())) + .await? + .single_row()?; + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + doc.fill_languages(&cols); + doc._fields = fields.clone(); + res.push(doc); + } + + Ok(res) + } + + pub async fn save(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result { + let fields = Self::fields(); + self._fields = fields.clone(); + self.updated_at = unix_ms() as i64; + self.created_at = self.updated_at; + self.version = 1; + self.day = xid_day(self.id); + + let mut cols_name: Vec<&str> = Vec::with_capacity(fields.len()); + let mut vals_name: Vec<&str> = Vec::with_capacity(fields.len()); + let mut params: Vec<&CqlValue> = Vec::with_capacity(fields.len()); + let cols = self.to(); + + for field in &fields { + cols_name.push(field); + vals_name.push("?"); + params.push(cols.get(field).unwrap()); + } + + let query = format!( + "INSERT INTO message ({}) VALUES ({}) IF NOT EXISTS", + cols_name.join(","), + vals_name.join(",") + ); + + let res = db.execute(query, params).await?; + if !extract_applied(res) { + return Err( + HTTPError::new(409, "Message save failed, please try again".to_string()).into(), + ); + } + + Ok(true) + } + + pub async fn update( + &mut self, + db: &scylladb::ScyllaDB, + cols: ColumnsMap, + version: i16, + ) -> anyhow::Result { + let valid_fields = ["context", "message"]; + let update_fields = cols.keys(); + for field in &update_fields { + if !valid_fields.contains(&field.as_str()) { + return Err(HTTPError::new(400, format!("Invalid field: {}", field)).into()); + } + } + + self.get_one(db, vec!["version".to_string()]).await?; + if self.version != version { + return Err(HTTPError::new( + 409, + format!( + "Message version conflict, expected version {}, got {}", + self.version, version + ), + ) + .into()); + } + if version == 32767 { + return Err( + HTTPError::new(400, format!("Message version overflow, got {}", version)).into(), + ); + } + + self.day = xid_day(self.id); + let mut set_fields: Vec = Vec::with_capacity(update_fields.len() + 2); + let mut params: Vec = Vec::with_capacity(update_fields.len() + 2 + 3); + + let new_updated_at = unix_ms() as i64; + set_fields.push("updated_at=?".to_string()); + set_fields.push("version=?".to_string()); + params.push(new_updated_at.to_cql()); + params.push((version + 1).to_cql()); + for field in &update_fields { + set_fields.push(format!("{}=?", field)); + params.push(cols.get(field).unwrap().to_owned()); + } + + let query = format!( + "UPDATE message SET {} WHERE day=? AND id=? IF version=?", + set_fields.join(",") + ); + params.push(self.day.to_cql()); + params.push(self.id.to_cql()); + params.push(version.to_cql()); + + let res = db.execute(query, params).await?; + if !extract_applied(res) { + return Err( + HTTPError::new(409, "Message update failed, please try again".to_string()).into(), + ); + } + + self.updated_at = new_updated_at; + self.version += 1; + Ok(true) + } + + pub async fn update_i18n( + &mut self, + db: &scylladb::ScyllaDB, + lang: String, + message: Vec, + version: i16, + ) -> anyhow::Result { + if !LANGUAGES.contains(&lang.as_str()) { + return Err(HTTPError::new(400, format!("Invalid language: {}", lang)).into()); + } + + self.get_one(db, vec!["version".to_string()]).await?; + if self.version != version { + return Err(HTTPError::new( + 409, + format!( + "Message version conflict, expected version {}, got {}", + self.version, version + ), + ) + .into()); + } + + self.day = xid_day(self.id); + let query = format!( + "UPDATE message SET updated_at=?,{}=? WHERE day=? AND id=? IF version=?", + lang + ); + let mut params: Vec = Vec::with_capacity(4); + + let new_updated_at = unix_ms() as i64; + params.push(new_updated_at.to_cql()); + params.push(message.to_cql()); + params.push(self.day.to_cql()); + params.push(self.id.to_cql()); + params.push(version.to_cql()); + + let res = db.execute(query, params).await?; + if !extract_applied(res) { + return Err( + HTTPError::new(409, "Message update failed, please try again".to_string()).into(), + ); + } + + self.updated_at = new_updated_at; + Ok(true) + } + + pub async fn delete(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result { + let res = self.get_one(db, vec!["version".to_string()]).await; + if res.is_err() { + return Ok(false); // already deleted + } + + let query = "DELETE FROM message WHERE day=? AND id=?"; + let params = (self.day, self.id.to_cql()); + let _ = db.execute(query, params).await?; + + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use ciborium::cbor; + + use crate::conf; + use crate::db; + use axum_web::{erring, object::cbor_to_vec}; + use tokio::sync::OnceCell; + + use super::*; + + static DB: OnceCell = OnceCell::const_new(); + + async fn get_db() -> &'static db::scylladb::ScyllaDB { + DB.get_or_init(|| async { + let cfg = conf::Conf::new().unwrap_or_else(|err| panic!("config error: {}", err)); + let res = db::scylladb::ScyllaDB::new(cfg.scylla, "writing_test").await; + res.unwrap() + }) + .await + } + + #[tokio::test(flavor = "current_thread")] + #[ignore] + async fn test_all() { + message_model_works().await; + } + + // #[tokio::test(flavor = "current_thread")] + async fn message_model_works() { + let db = get_db().await; + let id = xid::new(); + let gid = xid::new(); + let message: Vec = cbor_to_vec( + &cbor!([{ + "id" => "title", + "texts" => ["Hello World"], + }]) + .unwrap(), + ) + .unwrap(); + + // create + { + let mut doc = Message::with_pk(id); + doc.attach_to = gid; + doc.kind = "group.message".to_string(); + doc.language = Language::Eng; + doc.version = 1; + doc.context = "Hello World".to_string(); + doc.message = message.clone(); + + let res = doc.get_one(db, vec![]).await; + assert!(res.is_err()); + let err: erring::HTTPError = res.unwrap_err().into(); + assert_eq!(err.code, 404); + + assert!(doc.save(db).await.unwrap()); + let res = doc.save(db).await; + assert!(res.is_err()); + let err: erring::HTTPError = res.unwrap_err().into(); // can not insert twice + assert_eq!(err.code, 409); + + let mut doc2 = Message::with_pk(id); + doc2.get_one(db, vec![]).await.unwrap(); + + assert_eq!(doc2.attach_to, gid); + assert_eq!(doc2.context.as_str(), "Hello World"); + assert_eq!(doc2.version, 1); + assert_eq!(doc2.language, Language::Eng); + assert_eq!(doc2.message, message); + + let mut doc3 = Message::with_pk(id); + doc3.get_one(db, vec!["language".to_string()]) + .await + .unwrap(); + assert_eq!(doc3.id, id); + assert_eq!(doc3.context.as_str(), ""); + assert_eq!(doc3.version, 1); + assert_eq!(doc3.language, Language::Eng); + } + + // update + { + let mut doc = Message::with_pk(id); + let mut cols = ColumnsMap::new(); + cols.set_as("version", &2i16); + let res = doc.update(db, cols, 0).await; + assert!(res.is_err()); + let err: erring::HTTPError = res.unwrap_err().into(); + assert_eq!(err.code, 400); // status is not updatable + + let mut cols = ColumnsMap::new(); + cols.set_as("context", &"update context 1".to_string()); + let res = doc.update(db, cols, 2).await; + assert!(res.is_err()); + let err: erring::HTTPError = res.unwrap_err().into(); + assert_eq!(err.code, 409); // version not match + + let mut cols = ColumnsMap::new(); + cols.set_as("context", &"context 1".to_string()); + let res = doc.update(db, cols, 1).await.unwrap(); + assert!(res); + assert_eq!(doc.version, 2); + + let mut cols = ColumnsMap::new(); + cols.set_as("context", &"context 2".to_string()); + + let res = doc.update(db, cols, doc.version).await.unwrap(); + assert!(res); + } + + // update_i18n + { + let mut doc = Message::with_pk(id); + doc.get_one(db, vec!["i18n".to_string()]).await.unwrap(); + assert_eq!(doc.id, id); + assert_eq!(doc.context.as_str(), ""); + assert_eq!(doc.version, 3); + assert_eq!(doc.language, Language::Eng); + assert_eq!(doc.message, message); + assert!(doc._fields.contains(&"zho".to_string())); + assert!(doc._i18n_messages.is_empty()); + + let res = doc + .update_i18n(db, "zho".to_string(), message.clone(), 1) + .await; + assert!(res.is_err()); + let err: erring::HTTPError = res.unwrap_err().into(); + assert_eq!(err.code, 409); // version not match + + doc.update_i18n(db, "zho".to_string(), message.clone(), 3) + .await + .unwrap(); + + let mut doc2 = Message::with_pk(id); + doc2.get_one(db, vec!["i18n".to_string()]).await.unwrap(); + assert_eq!(doc2.id, id); + assert_eq!(doc2.context.as_str(), ""); + assert_eq!(doc2.version, 3); + assert_eq!(doc2.language, Language::Eng); + assert_eq!(doc2.message, message); + assert!(doc2._fields.contains(&"zho".to_string())); + assert!(doc2._i18n_messages.len() == 1); + assert_eq!(doc2._i18n_messages.get("zho").unwrap(), &message); + } + + // delete + { + let mut doc = Message::with_pk(id); + let res = doc.delete(db).await.unwrap(); + assert!(res); + + let res = doc.delete(db).await.unwrap(); + assert!(!res); // already deleted + } + } +} diff --git a/src/db/model_publication.rs b/src/db/model_publication.rs index 0286a83..2a884eb 100644 --- a/src/db/model_publication.rs +++ b/src/db/model_publication.rs @@ -7,7 +7,7 @@ use scylla_orm_macros::CqlOrm; use crate::db::{ meili, scylladb::{self, extract_applied}, - Content, Creation, DEFAULT_MODEL, MAX_ID, MIN_ID, + xid_day, Content, Creation, DEFAULT_MODEL, MAX_ID, MIN_ID, }; use axum_web::context::unix_ms; use axum_web::erring::HTTPError; @@ -29,12 +29,6 @@ impl From for Publication { } } -pub fn xid_day(xid: xid::Id) -> i32 { - let raw = xid.as_bytes(); - let unix_ts = u32::from_be_bytes([raw[0], raw[1], raw[2], raw[3]]); - (unix_ts / (3600 * 24)) as i32 -} - impl PublicationIndex { pub fn with_pk(cid: xid::Id, language: Language) -> Self { Self { @@ -340,11 +334,9 @@ impl PublicationIndex { let rows = db.execute_iter(query, params).await?; let mut cids: HashSet = HashSet::new(); for row in rows { - if let Some(v) = row.columns.first() { - if let Some(v) = v { - let cid = xid::Id::from_cql(v)?; - cids.insert(cid); - } + if let Some(Some(v)) = row.columns.first() { + let cid = xid::Id::from_cql(v)?; + cids.insert(cid); } } @@ -1064,16 +1056,11 @@ impl Publication { let mut docs_set: HashSet<(xid::Id, Language, i16)> = HashSet::new(); 'label: loop { - let mut rows = if status.is_none() { - let params = (gid.to_cql(), token.to_cql(), page_size as i32); + let mut rows = if let Some(status) = status { + let params = (gid.to_cql(), status, token.to_cql(), page_size as i32); db.execute_iter(query.as_str(), params).await? } else { - let params = ( - gid.to_cql(), - status.unwrap(), - token.to_cql(), - page_size as i32, - ); + let params = (gid.to_cql(), token.to_cql(), page_size as i32); db.execute_iter(query.as_str(), params).await? }; @@ -1088,11 +1075,11 @@ impl Publication { let row = rows.pop().unwrap(); cols.fill(row, &fields)?; doc.fill(&cols); - let tail_rows = if status.is_none() { - let params = (gid.to_cql(), doc.cid.to_cql()); + let tail_rows = if let Some(status) = status { + let params = (gid.to_cql(), doc.cid.to_cql(), status); db.execute_iter(tail_query.as_str(), params).await? } else { - let params = (gid.to_cql(), doc.cid.to_cql(), status.unwrap()); + let params = (gid.to_cql(), doc.cid.to_cql()); db.execute_iter(tail_query.as_str(), params).await? }; rows.extend(tail_rows); diff --git a/src/db/scylladb.rs b/src/db/scylladb.rs index a64d38b..58880a2 100644 --- a/src/db/scylladb.rs +++ b/src/db/scylladb.rs @@ -132,12 +132,11 @@ pub async fn exec_cqls(db: &ScyllaDB, cqls: &str) -> anyhow::Result<()> { .execute(cql.clone(), &[]) .await .map_err(|err| anyhow::anyhow!("\ncql: {}\nerror: {}", &cql, &err)); - if res.is_err() { - let res = res.unwrap_err(); - if res.to_string().contains("Index already exists") { - println!("WARN: {}", res); + if let Err(err) = res { + if err.to_string().contains("Index already exists") { + println!("WARN: {}", err); } else { - return Err(res); + return Err(err); } } } diff --git a/src/router.rs b/src/router.rs index 2d9fc49..5760980 100644 --- a/src/router.rs +++ b/src/router.rs @@ -135,6 +135,18 @@ pub async fn new(cfg: conf::Conf) -> anyhow::Result<(Arc, Router) ) .route("/list_latest", routing::post(api::publication::list_latest)), ) + .nest( + "/v1/message", + Router::new() + .route( + "/", + routing::post(api::message::create) + .get(api::message::get) + .patch(api::message::update) + .delete(api::message::delete), + ) + .route("/update_i18n", routing::post(api::message::update_i18n)), + ) .nest( "/v1/bookmark", Router::new()