Skip to content

Commit

Permalink
database: return IDs to discard on DatabaseIndexes::bulk_index
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Dec 6, 2023
1 parent a8cf64f commit 01eb461
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
18 changes: 9 additions & 9 deletions crates/nostr-database/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,22 @@ impl DatabaseIndexes {

/// Bulk index
#[tracing::instrument(skip_all)]
pub async fn bulk_index<I>(&self, events: I)
pub async fn bulk_index<I>(&self, events: I) -> HashSet<EventId>
where
I: IntoIterator<Item = RawEvent>,
{
let mut index = self.index.write().await;
let mut deleted = self.deleted.write().await;

let mut to_discard: HashSet<EventId> = HashSet::new();
let now = Timestamp::now();

// Sort ASC to prevent issues during index
let events: BTreeSet<WrappedRawEvent> = events
.into_iter()
.map(|raw| WrappedRawEvent { raw })
.collect();

let mut index = self.index.write().await;
let mut deleted = self.deleted.write().await;

let mut to_discard: HashSet<EventId> = HashSet::new();
let now = Timestamp::now();

events
.into_iter()
.map(|w| w.raw)
Expand All @@ -253,10 +253,10 @@ impl DatabaseIndexes {
// Remove events
if !to_discard.is_empty() {
index.retain(|e| !to_discard.contains(&e.event_id));
deleted.extend(to_discard);
deleted.extend(to_discard.iter());
}

// TODO: return to_discard events?
to_discard
}

fn index_raw_event(
Expand Down
13 changes: 11 additions & 2 deletions crates/nostr-indexeddb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl WebDatabase {
tracing::debug!("Building database indexes...");
let tx = self
.db
.transaction_on_one_with_mode(EVENTS_CF, IdbTransactionMode::Readonly)?;
.transaction_on_one_with_mode(EVENTS_CF, IdbTransactionMode::Readwrite)?;
let store = tx.object_store(EVENTS_CF)?;
let events = store
.get_all()?
Expand All @@ -200,7 +200,16 @@ impl WebDatabase {
let bytes = hex::decode(v).ok()?;
RawEvent::decode(&bytes).ok()
});
self.indexes.bulk_index(events).await;

// Build indexes
let to_discard: HashSet<EventId> = self.indexes.bulk_index(events).await;

// Discard events
for event_id in to_discard.into_iter() {
let key = JsValue::from(event_id.to_hex());
store.delete(&key)?;
}

tracing::info!("Database indexes loaded");
Ok(())
}
Expand Down
19 changes: 18 additions & 1 deletion crates/nostr-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,24 @@ impl RocksDatabase {
.full_iterator_cf(&cf, IteratorMode::Start)
.flatten()
.filter_map(|(_, value)| RawEvent::decode(&value).ok());
self.indexes.bulk_index(events).await;

// Build indexes
let to_discard: HashSet<EventId> = self.indexes.bulk_index(events).await;

// Discard events
if !to_discard.is_empty() {
// Prepare write batch
let mut batch = WriteBatchWithTransaction::default();

// Discard events no longer needed
for event_id in to_discard.into_iter() {
batch.delete_cf(&cf, event_id);
}

// Write batch changes
self.db.write(batch).map_err(DatabaseError::backend)?;
}

Ok(())
}
}
Expand Down
21 changes: 20 additions & 1 deletion crates/nostr-sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,26 @@ impl SQLiteDatabase {
Ok::<HashSet<RawEvent>, Error>(events)
})
.await??;
self.indexes.bulk_index(events).await;

// Build indexes
let to_discard = self.indexes.bulk_index(events).await;

// Discard events
if !to_discard.is_empty() {
let conn = self.acquire().await?;
conn.interact(move |conn| {
let delete_query = format!(
"DELETE FROM events WHERE {};",
to_discard
.iter()
.map(|id| format!("event_id = '{id}'"))
.collect::<Vec<_>>()
.join(" AND ")
);
conn.execute(&delete_query, [])
})
.await??;
}
Ok(())
}
}
Expand Down

0 comments on commit 01eb461

Please sign in to comment.