Skip to content

Commit

Permalink
feat: refactor publication_index
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Oct 3, 2023
1 parent fa74ef9 commit 5f51ce4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 48 deletions.
9 changes: 2 additions & 7 deletions cmd/sync-to-publication-index/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::stream::StreamExt;
use scylla_orm::{ColumnsMap, CqlValue, ToCqlVal};
use scylla_orm::{ColumnsMap, ToCqlVal};
use structured_logger::{async_json::new_writer, Builder};
use tokio::io;
use writing::{conf, db};
Expand Down Expand Up @@ -66,12 +66,7 @@ async fn main() -> anyhow::Result<()> {
let res = idoc.upsert(&sess).await?;
if res {
synced += 1;
println!(
"doc: {} {} {}",
idoc.cid.to_string(),
idoc.language.to_string(),
idoc.version
);
println!("doc: {} {} {}", idoc.cid, idoc.language, idoc.version);
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions cql/schema_table.cql
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,23 @@ CREATE INDEX publication_cid ON publication (cid);
CREATE INDEX publication_url ON publication (original_url);
CREATE INDEX publication_gid_status ON publication ((gid), status);

CREATE TABLE IF NOT EXISTS publication_index (
CREATE TABLE IF NOT EXISTS pub_index (
day INT, -- publication day
cid BLOB, -- creation id
language TEXT, -- publication's language, ISO 639-3
original BOOLEAN, -- is original
version SMALLINT, -- creation version
gid BLOB, -- group id, publication belong to
PRIMARY KEY (cid, language)
) WITH CLUSTERING ORDER BY (language ASC)
PRIMARY KEY (day, cid, language)
) WITH CLUSTERING ORDER BY (cid DESC, language ASC)
AND caching = {'enabled': 'true'}
AND comment = 'published publications index'
AND compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'LZ4Compressor'}
AND default_time_to_live = 0;

CREATE INDEX publication_index_gid ON publication_index (gid);
CREATE INDEX pub_index_day_gid ON pub_index ((day), gid);
CREATE INDEX pub_index_gid ON pub_index (gid);

CREATE TABLE IF NOT EXISTS bookmark (
uid BLOB, -- user id who create the bookmark, 12 bytes XID
Expand Down
6 changes: 3 additions & 3 deletions src/api/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ pub async fn implicit_get_beta(
return Err(HTTPError::new(451, "Can not view publication".to_string()));
}

let doc_i =
db::PublicationIndex::get_implicit_published(&app.scylla, gid, cid, language).await?;
let mut doc: db::Publication = doc_i.into();
let idoc =
db::PublicationIndex::get_implicit_published(&app.scylla, cid, gid, language).await?;
let mut doc: db::Publication = idoc.into();
doc.get_one(&app.scylla, get_fields(input.fields.clone()))
.await?;
doc._rating = index.rating;
Expand Down
74 changes: 40 additions & 34 deletions src/db/model_publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use axum_web::erring::HTTPError;

#[derive(Debug, Default, Clone, CqlOrm)]
pub struct PublicationIndex {
pub day: i32,
pub cid: xid::Id,
pub language: Language,
pub original: bool,
Expand All @@ -28,9 +29,16 @@ impl From<PublicationIndex> for Publication {
}
}

pub fn xid_day(xid: xid::Id) -> i32 {
let raw = xid.as_bytes();
let unix_ts = u32::from_be_bytes([raw[0], raw[1], raw[2], raw[3]]);
(unix_ts / (3600 * 24)) as i32
}

impl PublicationIndex {
pub fn with_pk(cid: xid::Id, language: Language) -> Self {
Self {
day: xid_day(cid),
cid,
language,
..Default::default()
Expand All @@ -39,12 +47,13 @@ impl PublicationIndex {

pub async fn get_one(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result<()> {
self._fields = Self::fields();
self.day = xid_day(self.cid);

let query = format!(
"SELECT {} FROM publication_index WHERE cid=? AND language=? LIMIT 1",
"SELECT {} FROM pub_index WHERE day=? AND cid=? AND language=? LIMIT 1",
self._fields.join(",")
);
let params = (self.cid.to_cql(), self.language.to_cql());
let params = (self.day, self.cid.to_cql(), self.language.to_cql());
let res = db.execute(query, params).await?.single_row()?;

let mut cols = ColumnsMap::with_capacity(self._fields.len());
Expand All @@ -57,6 +66,7 @@ impl PublicationIndex {
pub async fn upsert(&mut self, db: &scylladb::ScyllaDB) -> anyhow::Result<bool> {
let fields = Self::fields();
self._fields = fields.clone();
self.day = xid_day(self.cid);

let mut cols_name: Vec<&str> = Vec::with_capacity(fields.len());
let mut vals_name: Vec<&str> = Vec::with_capacity(fields.len());
Expand All @@ -70,7 +80,7 @@ impl PublicationIndex {
}

let query = format!(
"INSERT INTO publication_index ({}) VALUES ({}) IF NOT EXISTS",
"INSERT INTO pub_index ({}) VALUES ({}) IF NOT EXISTS",
cols_name.join(","),
vals_name.join(",")
);
Expand All @@ -80,11 +90,11 @@ impl PublicationIndex {
return Ok(true);
}

let query =
"UPDATE publication_index SET version=?,gid=? WHERE cid=? AND language=? IF version<?";
let query = "UPDATE pub_index SET version=?,gid=? WHERE day=? AND cid=? AND language=? IF version<?";
let params = (
self.version,
self.gid.to_cql(),
self.day,
self.cid.to_cql(),
self.language.to_cql(),
self.version,
Expand All @@ -105,35 +115,26 @@ impl PublicationIndex {
) -> anyhow::Result<(Vec<PublicationIndex>, Option<xid::Id>)> {
let fields = Self::fields();

let secs: u32 = 3600 * 24;
let mut res: Vec<PublicationIndex> = Vec::new();
let query = format!(
"SELECT {} FROM publication_index WHERE gid=? AND cid>=? AND cid<? LIMIT 1000 USING TIMEOUT 3s",
fields.clone().join(","));
"SELECT {} FROM pub_index WHERE day=? AND gid=? LIMIT 1000 USING TIMEOUT 3s",
fields.clone().join(",")
);

let mut end_id = if let Some(cid) = page_token {
cid
let mut day = if let Some(cid) = page_token {
xid_day(cid) - 1
} else {
let mut unix_ts = (unix_ms() / 1000) as u32;
unix_ts = unix_ts - (unix_ts % 600);
let mut end_id = xid::Id::default();
end_id.0[0..=3].copy_from_slice(&unix_ts.to_be_bytes());
end_id
(unix_ms() / (1000 * 3600 * 24)) as i32
};

let mut i = 0i8;
while i < 14 {
let raw = end_id.as_bytes();
let unix_ts = u32::from_be_bytes([raw[0], raw[1], raw[2], raw[3]]);
let mut start_id = xid::Id::default();
start_id.0[0..=3].copy_from_slice(&(unix_ts - secs).to_be_bytes());

while day > 0 && i < 30 {
for gid in gids.iter() {
if gid <= &MIN_ID {
continue;
}

let params = (gid.to_cql(), start_id.to_cql(), end_id.to_cql());
let params = (day, gid.to_cql());
let rows = db.execute_iter(query.as_str(), params).await?;
for row in rows {
let mut doc = PublicationIndex::default();
Expand Down Expand Up @@ -163,15 +164,20 @@ impl PublicationIndex {
// result should >= 6 for first page.
if (page_token.is_none() && res.len() >= 6) || (page_token.is_some() && res.len() >= 3)
{
let next_id = res.last().unwrap().cid;
res.sort_by(|a, b| b.cid.partial_cmp(&a.cid).unwrap());
return Ok((res, Some(start_id)));
return Ok((res, Some(next_id)));
}

i += 1;
end_id = start_id;
day -= 1;
}

let next = if res.is_empty() { None } else { Some(end_id) };
let next = if res.is_empty() {
None
} else {
Some(res.last().unwrap().cid)
};
res.sort_by(|a, b| b.cid.partial_cmp(&a.cid).unwrap());
Ok((res, next))
}
Expand All @@ -183,10 +189,10 @@ impl PublicationIndex {
let fields = Self::fields();

let query = format!(
"SELECT {} FROM publication_index WHERE cid=? LIMIT 200 USING TIMEOUT 3s",
"SELECT {} FROM pub_index WHERE day=? AND cid=? LIMIT 200 USING TIMEOUT 3s",
fields.clone().join(",")
);
let params = (cid.to_cql(),);
let params = (xid_day(cid), cid.to_cql());
let rows = db.execute_iter(query, params).await?;

let mut docs: Vec<PublicationIndex> = Vec::with_capacity(rows.len());
Expand All @@ -212,16 +218,16 @@ impl PublicationIndex {

let rows = if gid <= MIN_ID {
let query = format!(
"SELECT {} FROM publication_index WHERE cid=? LIMIT 200 USING TIMEOUT 3s",
"SELECT {} FROM pub_index WHERE day=? AND cid=? LIMIT 200 USING TIMEOUT 3s",
fields.clone().join(",")
);
let params = (cid.to_cql(),);
let params = (xid_day(cid), cid.to_cql());
db.execute_iter(query, params).await?
} else {
let query = format!(
"SELECT {} FROM publication_index WHERE cid=? AND gid=? LIMIT 200 ALLOW FILTERING USING TIMEOUT 3s",
"SELECT {} FROM pub_index WHERE day=? AND cid=? AND gid=? LIMIT 200 ALLOW FILTERING USING TIMEOUT 3s",
fields.clone().join(","));
let params = (cid.to_cql(), gid.to_cql());
let params = (xid_day(cid), cid.to_cql(), gid.to_cql());
db.execute_iter(query, params).await?
};

Expand All @@ -245,7 +251,7 @@ impl PublicationIndex {
if res.is_empty() {
return Err(HTTPError::new(
404,
format!("Publication not found, gid: {}, cid: {}", gid, cid),
format!("Publication not found, cid: {},gid: {}", cid, gid),
)
.into());
}
Expand All @@ -262,7 +268,7 @@ impl PublicationIndex {
return Ok(0);
}

let query = "SELECT cid FROM publication_index WHERE gid=? GROUP BY cid USING TIMEOUT 3s";
let query = "SELECT cid FROM pub_index WHERE gid=? GROUP BY cid USING TIMEOUT 3s";
let params = (gid.to_cql(),);
let rows = db.execute_iter(query, params).await?;
Ok(rows.len())
Expand Down Expand Up @@ -1153,7 +1159,7 @@ impl Publication {
let status_cond = if from_status == 1 {
"status=1"
} else {
"status>=0 AND status<2"
"status IN (0,1)"
};

let query = format!(
Expand Down

0 comments on commit 5f51ce4

Please sign in to comment.