From 5f51ce444a8218680761a44c403ee44a6aa7444d Mon Sep 17 00:00:00 2001 From: 0xZensh Date: Tue, 3 Oct 2023 10:24:22 +0800 Subject: [PATCH] feat: refactor publication_index --- cmd/sync-to-publication-index/src/main.rs | 9 +-- cql/schema_table.cql | 10 +-- src/api/publication.rs | 6 +- src/db/model_publication.rs | 74 ++++++++++++----------- 4 files changed, 51 insertions(+), 48 deletions(-) diff --git a/cmd/sync-to-publication-index/src/main.rs b/cmd/sync-to-publication-index/src/main.rs index bb8c9bf..47395ee 100644 --- a/cmd/sync-to-publication-index/src/main.rs +++ b/cmd/sync-to-publication-index/src/main.rs @@ -1,5 +1,5 @@ use futures::stream::StreamExt; -use scylla_orm::{ColumnsMap, CqlValue, ToCqlVal}; +use scylla_orm::{ColumnsMap, ToCqlVal}; use structured_logger::{async_json::new_writer, Builder}; use tokio::io; use writing::{conf, db}; @@ -66,12 +66,7 @@ async fn main() -> anyhow::Result<()> { let res = idoc.upsert(&sess).await?; if res { synced += 1; - println!( - "doc: {} {} {}", - idoc.cid.to_string(), - idoc.language.to_string(), - idoc.version - ); + println!("doc: {} {} {}", idoc.cid, idoc.language, idoc.version); } } } diff --git a/cql/schema_table.cql b/cql/schema_table.cql index 637cfef..21b7343 100644 --- a/cql/schema_table.cql +++ b/cql/schema_table.cql @@ -91,21 +91,23 @@ CREATE INDEX publication_cid ON publication (cid); CREATE INDEX publication_url ON publication (original_url); CREATE INDEX publication_gid_status ON publication ((gid), status); -CREATE TABLE IF NOT EXISTS publication_index ( +CREATE TABLE IF NOT EXISTS pub_index ( + day INT, -- publication day cid BLOB, -- creation id language TEXT, -- publication's language, ISO 639-3 original BOOLEAN, -- is original version SMALLINT, -- creation version gid BLOB, -- group id, publication belong to - PRIMARY KEY (cid, language) -) WITH CLUSTERING ORDER BY (language ASC) + PRIMARY KEY (day, cid, language) +) WITH CLUSTERING ORDER BY (cid DESC, language ASC) AND caching = {'enabled': 'true'} AND comment = 'published publications index' AND compaction = {'class': 'SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'LZ4Compressor'} AND default_time_to_live = 0; -CREATE INDEX publication_index_gid ON publication_index (gid); +CREATE INDEX pub_index_day_gid ON pub_index ((day), gid); +CREATE INDEX pub_index_gid ON pub_index (gid); CREATE TABLE IF NOT EXISTS bookmark ( uid BLOB, -- user id who create the bookmark, 12 bytes XID diff --git a/src/api/publication.rs b/src/api/publication.rs index 70e948c..c826094 100644 --- a/src/api/publication.rs +++ b/src/api/publication.rs @@ -348,9 +348,9 @@ pub async fn implicit_get_beta( return Err(HTTPError::new(451, "Can not view publication".to_string())); } - let doc_i = - db::PublicationIndex::get_implicit_published(&app.scylla, gid, cid, language).await?; - let mut doc: db::Publication = doc_i.into(); + let idoc = + db::PublicationIndex::get_implicit_published(&app.scylla, cid, gid, language).await?; + let mut doc: db::Publication = idoc.into(); doc.get_one(&app.scylla, get_fields(input.fields.clone())) .await?; doc._rating = index.rating; diff --git a/src/db/model_publication.rs b/src/db/model_publication.rs index 7e744c0..8dc3103 100644 --- a/src/db/model_publication.rs +++ b/src/db/model_publication.rs @@ -14,6 +14,7 @@ use axum_web::erring::HTTPError; #[derive(Debug, Default, Clone, CqlOrm)] pub struct PublicationIndex { + pub day: i32, pub cid: xid::Id, pub language: Language, pub original: bool, @@ -28,9 +29,16 @@ impl From for Publication { } } +pub fn xid_day(xid: xid::Id) -> i32 { + let raw = xid.as_bytes(); + let unix_ts = u32::from_be_bytes([raw[0], raw[1], raw[2], raw[3]]); + (unix_ts / (3600 * 24)) as i32 +} + impl PublicationIndex { pub fn with_pk(cid: xid::Id, language: Language) -> Self { Self { + day: xid_day(cid), cid, language, ..Default::default() @@ -39,12 +47,13 @@ impl PublicationIndex { pub async fn get_one(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result<()> { self._fields = Self::fields(); + self.day = xid_day(self.cid); let query = format!( - "SELECT {} FROM publication_index WHERE cid=? AND language=? LIMIT 1", + "SELECT {} FROM pub_index WHERE day=? AND cid=? AND language=? LIMIT 1", self._fields.join(",") ); - let params = (self.cid.to_cql(), self.language.to_cql()); + let params = (self.day, self.cid.to_cql(), self.language.to_cql()); let res = db.execute(query, params).await?.single_row()?; let mut cols = ColumnsMap::with_capacity(self._fields.len()); @@ -57,6 +66,7 @@ impl PublicationIndex { pub async fn upsert(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result { let fields = Self::fields(); self._fields = fields.clone(); + self.day = xid_day(self.cid); let mut cols_name: Vec<&str> = Vec::with_capacity(fields.len()); let mut vals_name: Vec<&str> = Vec::with_capacity(fields.len()); @@ -70,7 +80,7 @@ impl PublicationIndex { } let query = format!( - "INSERT INTO publication_index ({}) VALUES ({}) IF NOT EXISTS", + "INSERT INTO pub_index ({}) VALUES ({}) IF NOT EXISTS", cols_name.join(","), vals_name.join(",") ); @@ -80,11 +90,11 @@ impl PublicationIndex { return Ok(true); } - let query = - "UPDATE publication_index SET version=?,gid=? WHERE cid=? AND language=? IF version anyhow::Result<(Vec, Option)> { let fields = Self::fields(); - let secs: u32 = 3600 * 24; let mut res: Vec = Vec::new(); let query = format!( - "SELECT {} FROM publication_index WHERE gid=? AND cid>=? AND cid 0 && i < 30 { for gid in gids.iter() { if gid <= &MIN_ID { continue; } - let params = (gid.to_cql(), start_id.to_cql(), end_id.to_cql()); + let params = (day, gid.to_cql()); let rows = db.execute_iter(query.as_str(), params).await?; for row in rows { let mut doc = PublicationIndex::default(); @@ -163,15 +164,20 @@ impl PublicationIndex { // result should >= 6 for first page. if (page_token.is_none() && res.len() >= 6) || (page_token.is_some() && res.len() >= 3) { + let next_id = res.last().unwrap().cid; res.sort_by(|a, b| b.cid.partial_cmp(&a.cid).unwrap()); - return Ok((res, Some(start_id))); + return Ok((res, Some(next_id))); } i += 1; - end_id = start_id; + day -= 1; } - let next = if res.is_empty() { None } else { Some(end_id) }; + let next = if res.is_empty() { + None + } else { + Some(res.last().unwrap().cid) + }; res.sort_by(|a, b| b.cid.partial_cmp(&a.cid).unwrap()); Ok((res, next)) } @@ -183,10 +189,10 @@ impl PublicationIndex { let fields = Self::fields(); let query = format!( - "SELECT {} FROM publication_index WHERE cid=? LIMIT 200 USING TIMEOUT 3s", + "SELECT {} FROM pub_index WHERE day=? AND cid=? LIMIT 200 USING TIMEOUT 3s", fields.clone().join(",") ); - let params = (cid.to_cql(),); + let params = (xid_day(cid), cid.to_cql()); let rows = db.execute_iter(query, params).await?; let mut docs: Vec = Vec::with_capacity(rows.len()); @@ -212,16 +218,16 @@ impl PublicationIndex { let rows = if gid <= MIN_ID { let query = format!( - "SELECT {} FROM publication_index WHERE cid=? LIMIT 200 USING TIMEOUT 3s", + "SELECT {} FROM pub_index WHERE day=? AND cid=? LIMIT 200 USING TIMEOUT 3s", fields.clone().join(",") ); - let params = (cid.to_cql(),); + let params = (xid_day(cid), cid.to_cql()); db.execute_iter(query, params).await? } else { let query = format!( - "SELECT {} FROM publication_index WHERE cid=? AND gid=? LIMIT 200 ALLOW FILTERING USING TIMEOUT 3s", + "SELECT {} FROM pub_index WHERE day=? AND cid=? AND gid=? LIMIT 200 ALLOW FILTERING USING TIMEOUT 3s", fields.clone().join(",")); - let params = (cid.to_cql(), gid.to_cql()); + let params = (xid_day(cid), cid.to_cql(), gid.to_cql()); db.execute_iter(query, params).await? }; @@ -245,7 +251,7 @@ impl PublicationIndex { if res.is_empty() { return Err(HTTPError::new( 404, - format!("Publication not found, gid: {}, cid: {}", gid, cid), + format!("Publication not found, cid: {},gid: {}", cid, gid), ) .into()); } @@ -262,7 +268,7 @@ impl PublicationIndex { return Ok(0); } - let query = "SELECT cid FROM publication_index WHERE gid=? GROUP BY cid USING TIMEOUT 3s"; + let query = "SELECT cid FROM pub_index WHERE gid=? GROUP BY cid USING TIMEOUT 3s"; let params = (gid.to_cql(),); let rows = db.execute_iter(query, params).await?; Ok(rows.len()) @@ -1153,7 +1159,7 @@ impl Publication { let status_cond = if from_status == 1 { "status=1" } else { - "status>=0 AND status<2" + "status IN (0,1)" }; let query = format!(