diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs index f4aab0ce..e9745d5c 100644 --- a/enostr/src/relay/pool.rs +++ b/enostr/src/relay/pool.rs @@ -80,6 +80,7 @@ impl RelayPool { } pub fn unsubscribe(&mut self, subid: String) { + // TODO(jb55): switch to &str for relay in &mut self.relays { relay.relay.send(&ClientMessage::close(subid.clone())); } diff --git a/src/actionbar.rs b/src/actionbar.rs index 593045de..6e018bec 100644 --- a/src/actionbar.rs +++ b/src/actionbar.rs @@ -1,9 +1,8 @@ use crate::{ - note::NoteRef, + note::{NoteRef, RootNoteId}, notecache::NoteCache, - notes_holder::{NotesHolder, NotesHolderStorage}, route::{Route, Router}, - thread::Thread, + timeline::{CachedTimeline, TimelineCache, TimelineCacheKey}, }; use enostr::{NoteId, Pubkey, RelayPool}; use nostrdb::{Ndb, Transaction}; @@ -22,11 +21,11 @@ pub struct NoteActionResponse { } pub struct NewNotes { - pub id: [u8; 32], + pub id: TimelineCacheKey, pub notes: Vec, } -pub enum NotesHolderResult { +pub enum BarResult { NewNotes(NewNotes), } @@ -41,13 +40,22 @@ fn open_thread( router: &mut Router, note_cache: &mut NoteCache, pool: &mut RelayPool, - threads: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, selected_note: &[u8; 32], -) -> Option { - router.route_to(Route::thread(NoteId::new(selected_note.to_owned()))); +) -> Option { + let root_id_raw = + crate::note::root_note_id_from_selected_id(ndb, note_cache, txn, selected_note); + let root_id = RootNoteId::new_unsafe(root_id_raw); - let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, txn, selected_note); - Thread::open(ndb, note_cache, txn, pool, threads, root_id) + router.route_to(Route::thread(root_id.clone())); + + timeline_cache.open( + ndb, + note_cache, + txn, + pool, + &TimelineCacheKey::thread(root_id), + ) } impl BarAction { @@ -56,11 +64,11 @@ impl BarAction { self, ndb: &Ndb, router: &mut Router, - threads: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, note_cache: &mut NoteCache, pool: &mut RelayPool, txn: &Transaction, - ) -> Option { + ) -> Option { match self { BarAction::Reply(note_id) => { router.route_to(Route::reply(note_id)); @@ -68,9 +76,15 @@ impl BarAction { None } - BarAction::OpenThread(note_id) => { - open_thread(ndb, txn, router, note_cache, pool, threads, note_id.bytes()) - } + BarAction::OpenThread(note_id) => open_thread( + ndb, + txn, + router, + note_cache, + pool, + timeline_cache, + note_id.bytes(), + ), BarAction::Quote(note_id) => { router.route_to(Route::quote(note_id)); @@ -85,51 +99,54 @@ impl BarAction { self, ndb: &Ndb, router: &mut Router, - threads: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, note_cache: &mut NoteCache, pool: &mut RelayPool, txn: &Transaction, ) { - if let Some(br) = self.execute(ndb, router, threads, note_cache, pool, txn) { - br.process(ndb, note_cache, txn, threads); + if let Some(br) = self.execute(ndb, router, timeline_cache, note_cache, pool, txn) { + br.process(ndb, note_cache, txn, timeline_cache); } } } -impl NotesHolderResult { - pub fn new_notes(notes: Vec, id: [u8; 32]) -> Self { - NotesHolderResult::NewNotes(NewNotes::new(notes, id)) +impl BarResult { + pub fn new_notes(notes: Vec, id: TimelineCacheKey) -> Self { + Self::NewNotes(NewNotes::new(notes, id)) } - pub fn process( + pub fn process( &self, ndb: &Ndb, note_cache: &mut NoteCache, txn: &Transaction, - storage: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, ) { match self { // update the thread for next render if we have new notes - NotesHolderResult::NewNotes(new_notes) => { - let holder = storage - .notes_holder_mutated(ndb, note_cache, txn, &new_notes.id) + Self::NewNotes(new_notes) => { + let notes = timeline_cache + .notes(ndb, note_cache, txn, &new_notes.id) .get_ptr(); - new_notes.process(holder); + new_notes.process(notes); } } } } impl NewNotes { - pub fn new(notes: Vec, id: [u8; 32]) -> Self { + pub fn new(notes: Vec, id: TimelineCacheKey) -> Self { NewNotes { notes, id } } /// Simple helper for processing a NewThreadNotes result. It simply /// inserts/merges the notes into the thread cache - pub fn process(&self, thread: &mut N) { + pub fn process(&self, thread: &mut CachedTimeline) { // threads are chronological, ie reversed from reverse-chronological, the default. let reversed = true; - thread.get_view().insert(&self.notes, reversed); + thread + .timeline() + .get_current_view_mut() + .insert(&self.notes, reversed); } } diff --git a/src/app.rs b/src/app.rs index 0f475170..38e5b7ff 100644 --- a/src/app.rs +++ b/src/app.rs @@ -16,10 +16,10 @@ use crate::{ notes_holder::NotesHolderStorage, profile::Profile, storage::{Directory, FileKeyStorage, KeyStorageType}, - subscriptions::{SubKind, Subscriptions}, + subscriptions::{Subscriptions, SubKind}, support::Support, thread::Thread, - timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, + timeline::{Timeline, TimelineCache, TimelineId, TimelineKind, ViewFilter}, ui::{self, DesktopSidePanel}, unknowns::UnknownIds, view_state::ViewState, @@ -43,7 +43,6 @@ use tracing::{debug, error, info, trace, warn}; pub enum DamusState { Initializing, Initialized, - NewTimelineSub(TimelineId), } /// We derive Deserialize/Serialize so we can persist app state on shutdown. @@ -57,13 +56,12 @@ pub struct Damus { pub view_state: ViewState, pub unknown_ids: UnknownIds, pub drafts: Drafts, - pub threads: NotesHolderStorage, - pub profiles: NotesHolderStorage, + pub timeline_cache: TimelineCache, pub img_cache: ImageCache, pub accounts: AccountManager, - pub subscriptions: Subscriptions, pub app_rect_handler: AppSizeHandler, pub support: Support, + pub subscriptions: Subscriptions, frame_history: crate::frame_history::FrameHistory, @@ -101,7 +99,7 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { fn send_initial_timeline_filter( ndb: &Ndb, can_since_optimize: bool, - subs: &mut Subscriptions, + subs: &mut RemoteSubscriptions, pool: &mut RelayPool, timeline: &mut Timeline, to: &str, @@ -155,6 +153,7 @@ fn send_initial_timeline_filter( //let sub_id = damus.gen_subid(&SubKind::Initial); let sub_id = Uuid::new_v4().to_string(); subs.subs.insert(sub_id.clone(), SubKind::Initial); + timeline.subscription.set_remote_subid(sub_id.clone()); let cmd = ClientMessage::req(sub_id, new_filters); pool.send_to(&cmd, to); @@ -361,7 +360,7 @@ fn setup_initial_timeline( note_cache: &mut NoteCache, filters: &[Filter], ) -> Result<()> { - timeline.subscription = Some(ndb.subscribe(filters)?); + timeline.subscription.subscribe_local(); let txn = Transaction::new(ndb)?; debug!( "querying nostrdb sub {:?} {:?}", @@ -477,33 +476,6 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { .expect("home subscription failed"); } - DamusState::NewTimelineSub(new_timeline_id) => { - info!("adding new timeline {}", new_timeline_id); - setup_new_nostrdb_sub( - &damus.ndb, - &mut damus.note_cache, - &mut damus.columns, - new_timeline_id, - ) - .expect("new timeline subscription failed"); - - if let Some(filter) = { - let timeline = damus - .columns - .find_timeline(new_timeline_id) - .expect("timeline"); - match &timeline.filter { - FilterState::Ready(filters) => Some(filters.clone()), - _ => None, - } - } { - let subid = Uuid::new_v4().to_string(); - damus.pool.subscribe(subid, filter); - - damus.state = DamusState::Initialized; - } - } - DamusState::Initialized => (), }; @@ -744,10 +716,9 @@ impl Damus { pool, debug, unknown_ids: UnknownIds::default(), - subscriptions: Subscriptions::default(), + subscriptions: RemoteSubscriptions::default(), since_optimize: parsed_args.since_optimize, - threads: NotesHolderStorage::default(), - profiles: NotesHolderStorage::default(), + timeline_cache: TimelineCache::default(), drafts: Drafts::default(), state: DamusState::Initializing, img_cache: ImageCache::new(imgcache_dir.into()), @@ -807,10 +778,6 @@ impl Damus { } } - pub fn subscribe_new_timeline(&mut self, timeline_id: TimelineId) { - self.state = DamusState::NewTimelineSub(timeline_id); - } - pub fn mock>(data_path: P) -> Self { let mut columns = Columns::new(); let filter = Filter::from_json(include_str!("../queries/global.json")).unwrap(); @@ -828,10 +795,9 @@ impl Damus { Self { debug, unknown_ids: UnknownIds::default(), - subscriptions: Subscriptions::default(), + subscriptions: RemoteSubscriptions::default(), since_optimize: true, - threads: NotesHolderStorage::default(), - profiles: NotesHolderStorage::default(), + timeline_cache: TimelineCache::default(), drafts: Drafts::default(), state: DamusState::Initializing, pool: RelayPool::new(), @@ -852,6 +818,7 @@ impl Damus { &mut self.subscriptions.subs } + /* pub fn note_cache_mut(&mut self) -> &mut NoteCache { &mut self.note_cache } @@ -864,13 +831,14 @@ impl Damus { &self.threads } - pub fn threads_mut(&mut self) -> &mut NotesHolderStorage { + pub fn timeline_cache_mut(&mut self) -> &mut TimelineCache { &mut self.threads } pub fn note_cache(&self) -> &NoteCache { &self.note_cache } + */ } /* diff --git a/src/column.rs b/src/column.rs index 0b986323..02de883b 100644 --- a/src/column.rs +++ b/src/column.rs @@ -1,6 +1,8 @@ use crate::route::{Route, Router}; use crate::timeline::{Timeline, TimelineId}; +use enostr::RelayPool; use indexmap::IndexMap; +use nostrdb::Ndb; use std::iter::Iterator; use std::sync::atomic::{AtomicU32, Ordering}; use tracing::warn; @@ -27,16 +29,15 @@ impl Column { #[derive(Default)] pub struct Columns { /// Columns are simply routers into settings, timelines, etc - columns: IndexMap, + columns: Vec, /// Timeline state is not tied to routing logic separately, so that /// different columns can navigate to and from settings to timelines, /// etc. - pub timelines: IndexMap, + pub timelines: Vec, /// The selected column for key navigation selected: i32, - should_delete_column_at_index: Option, } static UIDS: AtomicU32 = AtomicU32::new(0); @@ -46,34 +47,28 @@ impl Columns { } pub fn add_new_timeline_column(&mut self, timeline: Timeline) { - let id = Self::get_new_id(); let routes = vec![Route::timeline(timeline.id)]; - self.timelines.insert(id, timeline); - self.columns.insert(id, Column::new(routes)); + self.timelines.push(timeline); + self.columns.push(Column::new(routes)); } pub fn add_timeline_to_column(&mut self, col: usize, timeline: Timeline) { - let col_id = self.get_column_id_at_index(col); self.column_mut(col) .router_mut() .route_to_replaced(Route::timeline(timeline.id)); - self.timelines.insert(col_id, timeline); + self.timelines.push(timeline); } pub fn new_column_picker(&mut self) { self.add_column(Column::new(vec![Route::AddColumn])); } - fn get_new_id() -> u32 { - UIDS.fetch_add(1, Ordering::Relaxed) - } - pub fn add_column(&mut self, column: Column) { - self.columns.insert(Self::get_new_id(), column); + self.columns.push(column); } - pub fn columns_mut(&mut self) -> Vec<&mut Column> { - self.columns.values_mut().collect() + pub fn columns_mut(&mut self) -> &mut Vec { + &mut self.columns } pub fn num_columns(&self) -> usize { @@ -87,51 +82,37 @@ impl Columns { self.new_column_picker(); } self.columns - .get_index_mut(0) + .get_mut(0) .expect("There should be at least one column") - .1 .router_mut() } pub fn timeline_mut(&mut self, timeline_ind: usize) -> &mut Timeline { self.timelines - .get_index_mut(timeline_ind) + .get_mut(timeline_ind) .expect("expected index to be in bounds") - .1 } pub fn column(&self, ind: usize) -> &Column { - self.columns - .get_index(ind) - .expect("Expected index to be in bounds") - .1 + &self.columns[ind] } - pub fn columns(&self) -> Vec<&Column> { - self.columns.values().collect() - } - - pub fn get_column_id_at_index(&self, ind: usize) -> u32 { - *self - .columns - .get_index(ind) - .expect("expected index to be within bounds") - .0 + pub fn columns(&self) -> &Vec { + &self.columns } pub fn selected(&mut self) -> &mut Column { self.columns - .get_index_mut(self.selected as usize) + .get_mut(self.selected as usize) .expect("Expected selected index to be in bounds") - .1 } - pub fn timelines_mut(&mut self) -> Vec<&mut Timeline> { - self.timelines.values_mut().collect() + pub fn timelines_mut(&mut self) -> &mut Vec { + &mut self.timelines } - pub fn timelines(&self) -> Vec<&Timeline> { - self.timelines.values().collect() + pub fn timelines(&self) -> &Vec { + &self.timelines } pub fn find_timeline_mut(&mut self, id: TimelineId) -> Option<&mut Timeline> { @@ -144,14 +125,8 @@ impl Columns { pub fn column_mut(&mut self, ind: usize) -> &mut Column { self.columns - .get_index_mut(ind) + .get_mut(ind) .expect("Expected index to be in bounds") - .1 - } - - pub fn find_timeline_for_column_index(&self, ind: usize) -> Option<&Timeline> { - let col_id = self.get_column_id_at_index(ind); - self.timelines.get(&col_id) } pub fn select_down(&mut self) { @@ -176,22 +151,57 @@ impl Columns { self.selected += 1; } - pub fn request_deletion_at_index(&mut self, index: usize) { - self.should_delete_column_at_index = Some(index); - } + /// Remove a column and unsubscribe from any subscriptions active in it + pub fn remove_column(&mut self, col_ind: usize, ndb: &Ndb, pool: &mut RelayPool) { + // If we have a timeline in this column, then we need to make + // sure to unsubscribe from it + if let Some(timeline_id) = self + .column(col_ind) + .router() + .routes() + .iter() + .find_map(|r| r.timeline_id()) + { + let timeline_id = *timeline_id; + let mut timelines: i32 = 0; + // We may have multiple of the same timeline in different + // columns. We shouldn't unsubscribe the timeline in this + // case, we only want to unsubscribe if its the last one. + // Albeit this is probably a weird case, but we should still + // handle it properly. Let's count and make sure we only have 1 + + // Traverse all columns + for column in self.columns { + // Look at each route in each column + for route in column.router().routes() { + // Does the column's timeline we're removing exist in + // this column? + if let Some(tid) = route.timeline_id() { + if *tid == timeline_id { + // if so, we increase the count + timelines += 1; + } + } + } + } - pub fn attempt_perform_deletion_request(&mut self) { - if let Some(index) = self.should_delete_column_at_index { - if let Some((key, _)) = self.columns.get_index_mut(index) { - self.timelines.shift_remove(key); + // We only have one timeline in all of the columns, so we can + // unsubscribe + if timelines == 1 { + let timeline = self.find_timeline(timeline_id).expect("timeline"); + timeline.unsubscribe(ndb, pool); + self.timelines = self + .timelines + .iter() + .filter(|tl| tl.id != timeline_id) + .collect(); } + } - self.columns.shift_remove_index(index); - self.should_delete_column_at_index = None; + self.columns.remove(col_ind); - if self.columns.is_empty() { - self.new_column_picker(); - } + if self.columns.is_empty() { + self.new_column_picker(); } } } diff --git a/src/lib.rs b/src/lib.rs index 4ed39bfd..50b61c55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,11 +20,9 @@ mod images; mod imgcache; mod key_parsing; pub mod login_manager; -mod multi_subscriber; mod nav; mod note; mod notecache; -mod notes_holder; mod post; mod profile; pub mod relay_pool_manager; @@ -33,7 +31,6 @@ mod route; mod subscriptions; mod support; mod test_data; -mod thread; mod time; mod timecache; mod timeline; diff --git a/src/multi_subscriber.rs b/src/multi_subscriber.rs deleted file mode 100644 index 301dc7c6..00000000 --- a/src/multi_subscriber.rs +++ /dev/null @@ -1,133 +0,0 @@ -use enostr::{Filter, RelayPool}; -use nostrdb::{Ndb, Note, Transaction}; -use tracing::{debug, error, info}; -use uuid::Uuid; - -use crate::{filter::UnifiedSubscription, note::NoteRef, Error}; - -pub struct MultiSubscriber { - filters: Vec, - sub: Option, - subscribers: u32, -} - -impl MultiSubscriber { - pub fn new(filters: Vec) -> Self { - Self { - filters, - sub: None, - subscribers: 0, - } - } - - fn real_subscribe( - ndb: &Ndb, - pool: &mut RelayPool, - filters: Vec, - ) -> Option { - let subid = Uuid::new_v4().to_string(); - let sub = ndb.subscribe(&filters).ok()?; - - pool.subscribe(subid.clone(), filters); - - Some(UnifiedSubscription { - local: sub, - remote: subid, - }) - } - - pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { - if self.subscribers == 0 { - error!("No subscribers to unsubscribe from"); - return; - } - - self.subscribers -= 1; - if self.subscribers == 0 { - let sub = match self.sub { - Some(ref sub) => sub, - None => { - error!("No remote subscription to unsubscribe from"); - return; - } - }; - let local_sub = &sub.local; - if let Err(e) = ndb.unsubscribe(*local_sub) { - error!( - "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions", - local_sub.id(), - ndb.subscription_count() - ); - } else { - info!( - "Unsubscribed from object subid:{}. {} active subscriptions", - local_sub.id(), - ndb.subscription_count() - ); - } - - // unsub from remote - pool.unsubscribe(sub.remote.clone()); - self.sub = None; - } else { - info!( - "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object", - ndb.subscription_count(), - self.subscribers, - ); - } - } - - pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { - self.subscribers += 1; - if self.subscribers == 1 { - if self.sub.is_some() { - error!("Object is first subscriber, but it already had remote subscription"); - return; - } - - self.sub = Self::real_subscribe(ndb, pool, self.filters.clone()); - info!( - "Remotely subscribing to object. {} total active subscriptions, {} on this object", - ndb.subscription_count(), - self.subscribers, - ); - - if self.sub.is_none() { - error!("Error subscribing remotely to object"); - } - } else { - info!( - "Locally subscribing. {} total active subscriptions, {} for this object", - ndb.subscription_count(), - self.subscribers, - ) - } - } - - pub fn poll_for_notes(&mut self, ndb: &Ndb, txn: &Transaction) -> Result, Error> { - let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?; - let new_note_keys = ndb.poll_for_notes(sub.local, 500); - - if new_note_keys.is_empty() { - return Ok(vec![]); - } else { - debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); - } - - let mut notes: Vec> = Vec::with_capacity(new_note_keys.len()); - for key in new_note_keys { - let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { - note - } else { - continue; - }; - - notes.push(note); - } - - let note_refs: Vec = notes.iter().map(|n| NoteRef::from_note(n)).collect(); - - Ok(note_refs) - } -} diff --git a/src/nav.rs b/src/nav.rs index 6aea6a8b..fe42d4e1 100644 --- a/src/nav.rs +++ b/src/nav.rs @@ -2,14 +2,11 @@ use crate::{ account_manager::render_accounts_route, app_style::{get_font_size, NotedeckTextStyle}, fonts::NamedFontFamily, - notes_holder::NotesHolder, - profile::Profile, relay_pool_manager::RelayPoolManager, route::Route, - thread::Thread, timeline::{ route::{render_profile_route, render_timeline_route, AfterRouteExecution, TimelineRoute}, - Timeline, + CachedTimeline, Timeline, }, ui::{ self, @@ -28,7 +25,6 @@ use nostrdb::{Ndb, Transaction}; use tracing::{error, info}; pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { - let col_id = app.columns.get_column_id_at_index(col); // TODO(jb55): clean up this router_mut mess by using Router in egui-nav directly let routes = app .columns() @@ -41,7 +37,7 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { let nav_response = Nav::new(routes) .navigating(app.columns_mut().column_mut(col).router_mut().navigating) .returning(app.columns_mut().column_mut(col).router_mut().returning) - .id_source(egui::Id::new(col_id)) + .id_source(egui::Id::new(col)) .title(48.0, title_bar) .show_mut(ui, |ui, nav| { let column = app.columns.column_mut(col); @@ -53,7 +49,7 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { &mut app.drafts, &mut app.img_cache, &mut app.note_cache, - &mut app.threads, + &mut app.timeline_cache, &mut app.accounts, *tlr, col, @@ -111,25 +107,12 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { AddColumnResponse::Timeline(timeline) => { let id = timeline.id; app.columns_mut().add_timeline_to_column(col, timeline); - app.subscribe_new_timeline(id); } }; } None } - Route::Profile(pubkey) => render_profile_route( - pubkey, - &app.ndb, - &mut app.columns, - &mut app.profiles, - &mut app.pool, - &mut app.img_cache, - &mut app.note_cache, - &mut app.threads, - col, - ui, - ), Route::Support => { SupportView::new(&mut app.support).show(ui); None @@ -137,9 +120,14 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { } }); - if let Some(after_route_execution) = nav_response.inner { + if let Some(our_response) = nav_response.inner { // start returning when we're finished posting - match after_route_execution { + match our_response { + AfterRouteExecution::DeleteColumn(ind) => { + app.columns_mut() + .remove_column(ind, &app.ndb, &mut app.pool) + } + AfterRouteExecution::Post(resp) => { if let Some(action) = resp.action { match action { @@ -154,53 +142,27 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { app.columns .column_mut(col) .router_mut() - .route_to(Route::Profile(pubkey)); + .route_to(Route::Timeline(TimelineRoute::Profile(pubkey))); let txn = Transaction::new(&app.ndb).expect("txn"); - if let Some(res) = Profile::open( + if let Some(res) = CachedTimeline::open( &app.ndb, &mut app.note_cache, &txn, &mut app.pool, - &mut app.profiles, + &mut app.timeline_cache, pubkey.bytes(), ) { - res.process(&app.ndb, &mut app.note_cache, &txn, &mut app.profiles); + res.process(&app.ndb, &mut app.note_cache, &txn, &mut app.timeline_cache); } } } } if let Some(NavAction::Returned) = nav_response.action { - let r = app.columns_mut().column_mut(col).router_mut().pop(); - let txn = Transaction::new(&app.ndb).expect("txn"); - if let Some(Route::Timeline(TimelineRoute::Thread(id))) = r { - let root_id = { - crate::note::root_note_id_from_selected_id( - &app.ndb, - &mut app.note_cache, - &txn, - id.bytes(), - ) - }; - Thread::unsubscribe_locally( - &txn, - &app.ndb, - &mut app.note_cache, - &mut app.threads, - &mut app.pool, - root_id, - ); - } - - if let Some(Route::Profile(pubkey)) = r { - Profile::unsubscribe_locally( - &txn, - &app.ndb, - &mut app.note_cache, - &mut app.profiles, - &mut app.pool, - pubkey.bytes(), - ); + if let Some(r) = app.columns_mut().column_mut(col).router_mut().pop() { + if let Some(sub) = r.subscription() { + sub.unsubscribe(); + } } } else if let Some(NavAction::Navigated) = nav_response.action { let cur_router = app.columns_mut().column_mut(col).router_mut(); @@ -213,30 +175,13 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) { if let Some(title_response) = nav_response.title_response { match title_response { TitleResponse::RemoveColumn => { - app.columns_mut().request_deletion_at_index(col); - let tl = app.columns().find_timeline_for_column_index(col); - if let Some(timeline) = tl { - unsubscribe_timeline(app.ndb(), timeline); - } + app.columns_mut() + .remove_column(col, &app.ndb, &mut app.pool); } } } } -fn unsubscribe_timeline(ndb: &Ndb, timeline: &Timeline) { - if let Some(sub_id) = timeline.subscription { - if let Err(e) = ndb.unsubscribe(sub_id) { - error!("unsubscribe error: {}", e); - } else { - info!( - "successfully unsubscribed from timeline {} with sub id {}", - timeline.id, - sub_id.id() - ); - } - } -} - fn title_bar( ui: &mut egui::Ui, allocated_response: egui::Response, diff --git a/src/note.rs b/src/note.rs index a462a7b4..5abf1511 100644 --- a/src/note.rs +++ b/src/note.rs @@ -1,5 +1,6 @@ use crate::notecache::NoteCache; use nostrdb::{Ndb, Note, NoteKey, QueryResult, Transaction}; +use enostr::NoteId; use std::cmp::Ordering; #[derive(Debug, Eq, PartialEq, Copy, Clone)] @@ -8,6 +9,27 @@ pub struct NoteRef { pub created_at: u64, } +#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)] +pub struct RootNoteId([u8; 32]); + +impl RootNoteId { + pub fn to_note_id(self) -> NoteId { + NoteId::new(self.0) + } + + pub fn bytes(&self) -> &[u8; 32] { + &self.0 + } + + pub fn new(ndb: &Ndb, note_cache: &mut NoteCache, txn: &Transaction, id: &[u8; 32]) -> Self { + RootNoteId(*root_note_id_from_selected_id(ndb, note_cache, txn, id)) + } + + pub fn new_unsafe(id: &[u8; 32]) -> Self { + RootNoteId(*id) + } +} + impl NoteRef { pub fn new(key: NoteKey, created_at: u64) -> Self { NoteRef { key, created_at } diff --git a/src/notes_holder.rs b/src/notes_holder.rs deleted file mode 100644 index 8217c243..00000000 --- a/src/notes_holder.rs +++ /dev/null @@ -1,211 +0,0 @@ -use std::collections::HashMap; - -use enostr::{Filter, RelayPool}; -use nostrdb::{Ndb, Transaction}; -use tracing::{debug, info, warn}; - -use crate::{ - actionbar::NotesHolderResult, multi_subscriber::MultiSubscriber, note::NoteRef, - notecache::NoteCache, timeline::TimelineTab, Error, Result, -}; - -pub struct NotesHolderStorage { - pub id_to_object: HashMap<[u8; 32], M>, -} - -impl Default for NotesHolderStorage { - fn default() -> Self { - NotesHolderStorage { - id_to_object: HashMap::new(), - } - } -} - -pub enum Vitality<'a, M> { - Fresh(&'a mut M), - Stale(&'a mut M), -} - -impl<'a, M> Vitality<'a, M> { - pub fn get_ptr(self) -> &'a mut M { - match self { - Self::Fresh(ptr) => ptr, - Self::Stale(ptr) => ptr, - } - } - - pub fn is_stale(&self) -> bool { - match self { - Self::Fresh(_ptr) => false, - Self::Stale(_ptr) => true, - } - } -} - -impl NotesHolderStorage { - pub fn notes_holder_expected_mut(&mut self, id: &[u8; 32]) -> &mut M { - self.id_to_object - .get_mut(id) - .expect("notes_holder_expected_mut used but there was no NotesHolder") - } - - pub fn notes_holder_mutated<'a>( - &'a mut self, - ndb: &Ndb, - note_cache: &mut NoteCache, - txn: &Transaction, - id: &[u8; 32], - ) -> Vitality<'a, M> { - // we can't use the naive hashmap entry API here because lookups - // require a copy, wait until we have a raw entry api. We could - // also use hashbrown? - - if self.id_to_object.contains_key(id) { - return Vitality::Stale(self.notes_holder_expected_mut(id)); - } - - // we don't have the note holder, query for it! - let filters = M::filters(id); - - let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { - results - .into_iter() - .map(NoteRef::from_query_result) - .collect() - } else { - debug!( - "got no results from NotesHolder lookup for {}", - hex::encode(id) - ); - vec![] - }; - - if notes.is_empty() { - warn!("NotesHolder query returned 0 notes? ") - } else { - info!("found NotesHolder with {} notes", notes.len()); - } - - self.id_to_object.insert( - id.to_owned(), - M::new_notes_holder(txn, ndb, note_cache, id, M::filters(id), notes), - ); - Vitality::Fresh(self.id_to_object.get_mut(id).unwrap()) - } -} - -pub trait NotesHolder { - fn get_multi_subscriber(&mut self) -> Option<&mut MultiSubscriber>; - fn set_multi_subscriber(&mut self, subscriber: MultiSubscriber); - fn get_view(&mut self) -> &mut TimelineTab; - fn filters(for_id: &[u8; 32]) -> Vec; - fn filters_since(for_id: &[u8; 32], since: u64) -> Vec; - fn new_notes_holder( - txn: &Transaction, - ndb: &Ndb, - note_cache: &mut NoteCache, - id: &[u8; 32], - filters: Vec, - notes: Vec, - ) -> Self; - - #[must_use = "UnknownIds::update_from_note_refs should be used on this result"] - fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> { - if let Some(multi_subscriber) = self.get_multi_subscriber() { - let reversed = true; - let note_refs: Vec = multi_subscriber.poll_for_notes(ndb, txn)?; - self.get_view().insert(¬e_refs, reversed); - } else { - return Err(Error::Generic( - "NotesHolder unexpectedly has no MultiSubscriber".to_owned(), - )); - } - - Ok(()) - } - - /// Look for new thread notes since our last fetch - fn new_notes(notes: &[NoteRef], id: &[u8; 32], txn: &Transaction, ndb: &Ndb) -> Vec { - if notes.is_empty() { - return vec![]; - } - - let last_note = notes[0]; - let filters = Self::filters_since(id, last_note.created_at + 1); - - if let Ok(results) = ndb.query(txn, &filters, 1000) { - debug!("got {} results from NotesHolder update", results.len()); - results - .into_iter() - .map(NoteRef::from_query_result) - .collect() - } else { - debug!("got no results from NotesHolder update",); - vec![] - } - } - - /// Local NotesHolder unsubscribe - fn unsubscribe_locally( - txn: &Transaction, - ndb: &Ndb, - note_cache: &mut NoteCache, - notes_holder_storage: &mut NotesHolderStorage, - pool: &mut RelayPool, - id: &[u8; 32], - ) { - let notes_holder = notes_holder_storage - .notes_holder_mutated(ndb, note_cache, txn, id) - .get_ptr(); - - if let Some(multi_subscriber) = notes_holder.get_multi_subscriber() { - multi_subscriber.unsubscribe(ndb, pool); - } - } - - fn open( - ndb: &Ndb, - note_cache: &mut NoteCache, - txn: &Transaction, - pool: &mut RelayPool, - storage: &mut NotesHolderStorage, - id: &[u8; 32], - ) -> Option { - let vitality = storage.notes_holder_mutated(ndb, note_cache, txn, id); - - let (holder, result) = match vitality { - Vitality::Stale(holder) => { - // The NotesHolder is stale, let's update it - let notes = M::new_notes(&holder.get_view().notes, id, txn, ndb); - let holder_result = if notes.is_empty() { - None - } else { - Some(NotesHolderResult::new_notes(notes, id.to_owned())) - }; - - // - // we can't insert and update the VirtualList now, because we - // are already borrowing it mutably. Let's pass it as a - // result instead - // - // holder.get_view().insert(¬es); <-- no - // - (holder, holder_result) - } - - Vitality::Fresh(thread) => (thread, None), - }; - - let multi_subscriber = if let Some(multi_subscriber) = holder.get_multi_subscriber() { - multi_subscriber - } else { - let filters = M::filters(id); - holder.set_multi_subscriber(MultiSubscriber::new(filters)); - holder.get_multi_subscriber().unwrap() - }; - - multi_subscriber.subscribe(ndb, pool); - - result - } -} diff --git a/src/profile.rs b/src/profile.rs index 662e4719..34d46893 100644 --- a/src/profile.rs +++ b/src/profile.rs @@ -4,10 +4,8 @@ use nostrdb::{FilterBuilder, Ndb, ProfileRecord, Transaction}; use crate::{ app::copy_notes_into_timeline, filter::{self, FilterState}, - multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, - notes_holder::NotesHolder, timeline::{PubkeySource, Timeline, TimelineKind}, }; @@ -49,11 +47,7 @@ pub fn get_profile_name<'a>(record: &'a ProfileRecord) -> Option } } -pub struct Profile { - pub timeline: Timeline, - pub multi_subscriber: Option, -} - +/* impl Profile { pub fn new( txn: &Transaction, @@ -82,48 +76,4 @@ impl Profile { } } -impl NotesHolder for Profile { - fn get_multi_subscriber(&mut self) -> Option<&mut MultiSubscriber> { - self.multi_subscriber.as_mut() - } - - fn get_view(&mut self) -> &mut crate::timeline::TimelineTab { - self.timeline.current_view_mut() - } - - fn filters(for_id: &[u8; 32]) -> Vec { - Profile::filters_raw(for_id) - .into_iter() - .map(|mut f| f.build()) - .collect() - } - - fn filters_since(for_id: &[u8; 32], since: u64) -> Vec { - Profile::filters_raw(for_id) - .into_iter() - .map(|f| f.since(since).build()) - .collect() - } - - fn new_notes_holder( - txn: &Transaction, - ndb: &Ndb, - note_cache: &mut NoteCache, - id: &[u8; 32], - filters: Vec, - notes: Vec, - ) -> Self { - Profile::new( - txn, - ndb, - note_cache, - PubkeySource::Explicit(Pubkey::new(*id)), - filters, - notes, - ) - } - - fn set_multi_subscriber(&mut self, subscriber: MultiSubscriber) { - self.multi_subscriber = Some(subscriber); - } -} +*/ diff --git a/src/route.rs b/src/route.rs index 27923fb3..91bdf04a 100644 --- a/src/route.rs +++ b/src/route.rs @@ -1,11 +1,14 @@ use enostr::{NoteId, Pubkey}; -use nostrdb::Ndb; +use nostrdb::{Ndb, Transaction}; use std::fmt::{self}; use crate::{ account_manager::AccountsRoute, column::Columns, - timeline::{TimelineId, TimelineRoute}, + note::RootNoteId, + notecache::NoteCache, + subscriptions::SubRefs, + timeline::{TimelineCache, TimelineId, TimelineRoute}, ui::profile::preview::{get_note_users_displayname_string, get_profile_displayname_string}, }; @@ -17,7 +20,6 @@ pub enum Route { Relays, ComposeNote, AddColumn, - Profile(Pubkey), Support, } @@ -38,6 +40,26 @@ impl Route { Route::Timeline(TimelineRoute::Timeline(timeline_id)) } + pub fn subscriptions<'a>( + &self, + ndb: &Ndb, + note_cache: &mut NoteCache, + txn: &Transaction, + columns: &'a Columns, + timeline_cache: &'a TimelineCache, + ) -> Option> { + match self { + Route::Timeline(tlr) => { + tlr.subscriptions(ndb, note_cache, txn, columns, timeline_cache) + } + Route::Accounts(_) => None, + Route::Relays => None, + Route::ComposeNote => None, + Route::AddColumn => None, + Route::Support => None, + } + } + pub fn timeline_id(&self) -> Option<&TimelineId> { if let Route::Timeline(TimelineRoute::Timeline(tid)) = self { Some(tid) @@ -50,7 +72,7 @@ impl Route { Route::Relays } - pub fn thread(thread_root: NoteId) -> Self { + pub fn thread(thread_root: RootNoteId) -> Self { Route::Timeline(TimelineRoute::Thread(thread_root)) } @@ -80,7 +102,10 @@ impl Route { timeline.kind.to_title(ndb) } TimelineRoute::Thread(id) => { - format!("{}'s Thread", get_note_users_displayname_string(ndb, id)) + format!( + "{}'s Thread", + get_note_users_displayname_string(ndb, &id.to_note_id()) + ) } TimelineRoute::Reply(id) => { format!("{}'s Reply", get_note_users_displayname_string(ndb, id)) @@ -198,6 +223,7 @@ impl fmt::Display for Route { TimelineRoute::Thread(_id) => write!(f, "Thread"), TimelineRoute::Reply(_id) => write!(f, "Reply"), TimelineRoute::Quote(_id) => write!(f, "Quote"), + TimelineRoute::Profile(_) => write!(f, "Profile"), }, Route::Relays => write!(f, "Relays"), @@ -209,7 +235,7 @@ impl fmt::Display for Route { Route::ComposeNote => write!(f, "Compose Note"), Route::AddColumn => write!(f, "Add Column"), - Route::Profile(_) => write!(f, "Profile"), + Route::Support => write!(f, "Support"), } } diff --git a/src/subscriptions.rs b/src/subscriptions.rs index d006bf42..e3c10382 100644 --- a/src/subscriptions.rs +++ b/src/subscriptions.rs @@ -1,5 +1,7 @@ use crate::timeline::{TimelineId, TimelineKind}; use std::collections::HashMap; +use nostrdb::{Subscription, Ndb}; +use enostr::RelayPool; #[derive(Debug, Clone)] pub enum SubKind { @@ -18,10 +20,85 @@ pub enum SubKind { FetchingContactList(TimelineId), } +pub struct RemoteSub { + /// Remote subscription id + id: String, + + /// Do we have a corresponding local subscription? + local: Option, + + /// What kind of subscription is this? + kind: SubKind +} + /// Subscriptions that need to be tracked at various stages. Sometimes we /// need to do A, then B, then C. Tracking requests at various stages by /// mapping uuid subids to explicit states happens here. #[derive(Default)] pub struct Subscriptions { - pub subs: HashMap, + pub active: Vec } + +impl Subscriptions { + /// Find a remote subscription given a local subscription id. This + /// is for subscriptions that have a one-to-one subscription mapping, which + /// may not always be the case + pub fn find_remote_sub(&self, sub_id: Subscription) -> Option<&RemoteSub> { + for sub in &self.active { + if let Some(local) = sub.local { + if local == sub_id { + return Some(sub); + } + } + } + + None + } + +} + +/// References to the remote and local parts of a subscription +pub struct SubRefs<'a> { + pub remote: Option<&'a str>, + pub local: Option, +} + +/// Owned version of SubRefs +pub struct SubRefsBuf { + pub remote: Option, + pub local: Option, +} + +impl SubRefsBuf { + pub fn new(local: Option, remote: Option<&str>) -> Self { + let remote = remote.map(|x| x.to_owned()); + Self { remote, local } + } + + pub fn borrow<'a>(&'a self) -> SubRefs<'a> { + SubRefs::new(self.local, self.remote.as_deref()) + } +} + +impl<'a> SubRefs<'a> { + pub fn new(local: Option, remote: Option<&'a str>) -> Self { + Self { + remote, local + } + } + + pub fn to_owned(&self) -> SubRefsBuf { + SubRefsBuf::new(self.local, self.remote) + } + + pub fn unsubscribe(&self, ndb: &Ndb, pool: &mut RelayPool) { + if let Some(r) = self.remote { + pool.unsubscribe(r.to_string()); + } + + if let Some(local) = self.local { + ndb.unsubscribe(local); + } + } +} + diff --git a/src/thread.rs b/src/thread.rs deleted file mode 100644 index 4a883dfd..00000000 --- a/src/thread.rs +++ /dev/null @@ -1,92 +0,0 @@ -use crate::{ - multi_subscriber::MultiSubscriber, - note::NoteRef, - notecache::NoteCache, - notes_holder::NotesHolder, - timeline::{TimelineTab, ViewFilter}, -}; -use nostrdb::{Filter, FilterBuilder, Ndb, Transaction}; - -#[derive(Default)] -pub struct Thread { - view: TimelineTab, - pub multi_subscriber: Option, -} - -impl Thread { - pub fn new(notes: Vec) -> Self { - let mut cap = ((notes.len() as f32) * 1.5) as usize; - if cap == 0 { - cap = 25; - } - let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap); - view.notes = notes; - - Thread { - view, - multi_subscriber: None, - } - } - - pub fn view(&self) -> &TimelineTab { - &self.view - } - - pub fn view_mut(&mut self) -> &mut TimelineTab { - &mut self.view - } - - fn filters_raw(root: &[u8; 32]) -> Vec { - vec![ - nostrdb::Filter::new().kinds([1]).event(root), - nostrdb::Filter::new().ids([root]).limit(1), - ] - } - - pub fn filters_since(root: &[u8; 32], since: u64) -> Vec { - Self::filters_raw(root) - .into_iter() - .map(|fb| fb.since(since).build()) - .collect() - } - - pub fn filters(root: &[u8; 32]) -> Vec { - Self::filters_raw(root) - .into_iter() - .map(|mut fb| fb.build()) - .collect() - } -} - -impl NotesHolder for Thread { - fn get_multi_subscriber(&mut self) -> Option<&mut MultiSubscriber> { - self.multi_subscriber.as_mut() - } - - fn filters(for_id: &[u8; 32]) -> Vec { - Thread::filters(for_id) - } - - fn new_notes_holder( - _: &Transaction, - _: &Ndb, - _: &mut NoteCache, - _: &[u8; 32], - _: Vec, - notes: Vec, - ) -> Self { - Thread::new(notes) - } - - fn get_view(&mut self) -> &mut TimelineTab { - &mut self.view - } - - fn filters_since(for_id: &[u8; 32], since: u64) -> Vec { - Thread::filters_since(for_id, since) - } - - fn set_multi_subscriber(&mut self, subscriber: MultiSubscriber) { - self.multi_subscriber = Some(subscriber); - } -} diff --git a/src/timeline/cache.rs b/src/timeline/cache.rs new file mode 100644 index 00000000..eab8c76e --- /dev/null +++ b/src/timeline/cache.rs @@ -0,0 +1,264 @@ +use crate::{ + actionbar::BarResult, + error::Error, + note::{NoteRef, RootNoteId}, + notecache::NoteCache, + subscriptions::SubRefs, + timeline::{Timeline, TimelineTab, ViewFilter}, +}; +use enostr::{Pubkey, RelayPool}; +use nostrdb::{Filter, FilterBuilder, Ndb, Transaction}; +use std::collections::HashMap; +use tracing::{debug, info, warn}; + +#[derive(Default)] +pub struct TimelineCache { + pub columns: Vec, + pub threads: HashMap, + pub profiles: HashMap, +} + +pub struct CachedTimeline { + timeline: Timeline, +} + +pub enum Vitality<'a, M> { + Fresh(&'a mut M), + Stale(&'a mut M), +} + +impl<'a, M> Vitality<'a, M> { + pub fn get_ptr(self) -> &'a mut M { + match self { + Self::Fresh(ptr) => ptr, + Self::Stale(ptr) => ptr, + } + } + + pub fn is_stale(&self) -> bool { + match self { + Self::Fresh(_ptr) => false, + Self::Stale(_ptr) => true, + } + } +} + +#[derive(Hash, Debug)] +pub enum TimelineCacheKey { + Profile(Pubkey), + Thread(RootNoteId), +} + +impl TimelineCacheKey { + pub fn profile(pubkey: &[u8; 32]) -> Self { + Self::Profile(Pubkey::new(*pubkey)) + } + + pub fn thread(root_id: RootNoteId) -> Self { + Self::Thread(RootNoteId::new(root_id)) + } + + pub fn bytes(&self) -> &[u8; 32] { + match self { + Self::Profile(pk) => pk.bytes(), + Self::Thread(root_id) => root_id.bytes(), + } + } +} + +impl TimelineCacheKey { + /// The filters used to update our timeline cache + pub fn filters_raw(&self) -> Vec { + match self { + TimelineCacheKey::Thread(root_id) => vec![ + nostrdb::Filter::new().kinds([1]).event(root_id.bytes()), + nostrdb::Filter::new().ids([root_id.bytes()]).limit(1), + ], + + TimelineCacheKey::Profile(pubkey) => vec![Filter::new() + .authors([pubkey.bytes()]) + .kinds([1]) + .limit(filter::default_limit())], + } + } + + pub fn filters_since(&self, since: u64) -> Vec { + self.filters_raw() + .into_iter() + .map(|fb| fb.since(since).build()) + .collect() + } + + pub fn filters(&self) -> Vec { + self.filters_raw() + .into_iter() + .map(|mut fb| fb.build()) + .collect() + } +} + +impl TimelineCache { + fn contains_key(&self, key: &TimelineCacheKey) -> bool { + match key { + TimelineCacheKey::Profile(pubkey) => self.profiles.contains_key(pubkey), + TimelineCacheKey::Thread(root_id) => self.threads.contains_key(root_id), + } + } + + fn insert( + &mut self, + key: &TimelineCacheKey, + timeline: CachedTimeline, + ) -> Option { + match key { + TimelineCacheKey::Profile(pubkey) => self.profiles.insert(pubkey.to_owned(), timeline), + TimelineCacheKey::Thread(root_id) => self.threads.insert(root_id.to_owned(), timeline), + } + } + + fn get_expected_mut(&mut self, key: &TimelineCacheKey) -> &mut CachedTimeline { + match key { + TimelineCacheKey::Profile(pubkey) => self.profiles.get_mut(pubkey), + TimelineCacheKey::Thread(root_id) => self.threads.get_mut(root_id), + } + .expect("expected notes in timline cache") + } + + /// Get and/or update the notes associated with this timeline + pub fn notes<'a>( + &'a mut self, + ndb: &Ndb, + note_cache: &mut NoteCache, + txn: &Transaction, + id: &TimelineCacheKey, + ) -> Vitality<'a, CachedTimeline> { + // we can't use the naive hashmap entry API here because lookups + // require a copy, wait until we have a raw entry api. We could + // also use hashbrown? + + if self.contains_key(id) { + return Vitality::Stale(self.get_expected_mut(id)); + } + + let filters = id.filters(); + let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { + results + .into_iter() + .map(NoteRef::from_query_result) + .collect() + } else { + debug!("got no results from NotesHolder lookup for {?:}", id); + vec![] + }; + + if notes.is_empty() { + warn!("NotesHolder query returned 0 notes? ") + } else { + info!("found NotesHolder with {} notes", notes.len()); + } + + self.insert( + id.to_owned(), + Self::new(txn, ndb, note_cache, id, filters, notes), + ); + Vitality::Fresh(self.get_expected_mut(id)) + } + + pub fn open( + &mut self, + ndb: &Ndb, + note_cache: &mut NoteCache, + txn: &Transaction, + pool: &mut RelayPool, + id: &TimelineCacheKey, + ) -> Option { + let vitality = self.notes(ndb, note_cache, txn, id); + + let (cached_timeline, result) = match vitality { + Vitality::Stale(cached_timeline) => { + // The timeline cache is stale, let's update it + let notes = + CachedTimeline::new_notes(&cached_timeline.get_view().notes, id, txn, ndb); + let cached_timeline_result = if notes.is_empty() { + None + } else { + Some(BarResult::new_notes(notes, *id)) + }; + + // + // we can't insert and update the VirtualList now, because we + // are already borrowing it mutably. Let's pass it as a + // result instead + // + // holder.get_view().insert(¬es); <-- no + // + (cached_timeline, cached_timeline_result) + } + + Vitality::Fresh(thread) => (thread, None), + }; + + let subscription = + if let Some(multi_subscriber) = cached_timeline.subscription { + multi_subscriber + } else { + cached_timeline.subscription = MultiSubscriber::new(id.filters()); + cached_timeline.multi_subscriber + }; + + multi_subscriber.subscribe(ndb, pool); + + result + } +} + +impl CachedTimeline { + pub fn new(notes: Vec, timeline: Timeline) -> Self { + Self { timeline } + } + + pub fn subscriptions<'a>(&'a self) -> Option> { + self.multi_subscriber.and_then(|ms| ms.to_subscriptions()) + } + + #[must_use = "UnknownIds::update_from_note_refs should be used on this result"] + pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result> { + if let Some(multi_subscriber) = self.get_multi_subscriber() { + let reversed = true; + let note_refs: Vec = multi_subscriber.poll_for_notes(ndb, txn)?; + self.get_view().insert(¬e_refs, reversed); + + Ok(note_refs) + } else { + Err(Error::Generic( + "NotesHolder unexpectedly has no MultiSubscriber".to_owned(), + )) + } + } + + /// Look for new thread notes since our last fetch + fn new_notes( + notes: &[NoteRef], + id: &TimelineCacheKey, + txn: &Transaction, + ndb: &Ndb, + ) -> Vec { + if notes.is_empty() { + return vec![]; + } + + let last_note = notes[0]; + let filters = Self::filters_since(id, last_note.created_at + 1); + + if let Ok(results) = ndb.query(txn, &filters, 1000) { + debug!("got {} results from NotesHolder update", results.len()); + results + .into_iter() + .map(NoteRef::from_query_result) + .collect() + } else { + debug!("got no results from NotesHolder update",); + vec![] + } + } +} diff --git a/src/timeline/mod.rs b/src/timeline/mod.rs index 72593375..2775daa3 100644 --- a/src/timeline/mod.rs +++ b/src/timeline/mod.rs @@ -1,23 +1,30 @@ -use crate::error::Error; -use crate::note::NoteRef; -use crate::notecache::{CachedNote, NoteCache}; -use crate::unknowns::UnknownIds; -use crate::Result; -use crate::{filter, filter::FilterState}; +use crate::{ + error::Error, + filter::{self, FilterState}, + note::NoteRef, + notecache::{CachedNote, NoteCache}, + unknowns::UnknownIds, + subscriptions::SubRefs, + Result, +}; + use std::fmt; use std::sync::atomic::{AtomicU32, Ordering}; use egui_virtual_list::VirtualList; +use enostr::RelayPool; use nostrdb::{Ndb, Note, Subscription, Transaction}; use std::cell::RefCell; use std::hash::Hash; use std::rc::Rc; -use tracing::{debug, error}; +use tracing::{debug, error, info, warn}; +pub mod cache; pub mod kind; pub mod route; +pub use cache::{CachedTimeline, TimelineCache, TimelineCacheKey}; pub use kind::{PubkeySource, TimelineKind}; pub use route::TimelineRoute; @@ -174,10 +181,41 @@ pub struct Timeline { pub selected_view: i32, /// Our nostrdb subscription - pub subscription: Option, + pub subscription: Option } impl Timeline { + pub fn subscriptions<'a>(&'a self) -> SubRefs<'a> { + SubRefs::new(self.subscription, self.remote_subid.as_deref()) + } + + pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { + let sub_id = if let Some(sub_id) = self.subscription { + sub_id + } else { + warn!("could not unsubscribe from timeline, no subscription id"); + return; + }; + + // unsubscribe from remote + if let Some(sub_ids) = &self.subscription.ids { + pool.unsubscribe(sub_ids.remote.clone()); + self.remote_subid = None; + } + + if let Err(e) = ndb.unsubscribe(sub_id) { + warn!("timeline unsubscribe error"); + } else { + info!( + "successfully unsubscribed from timeline {} with sub id {}", + self.id, + sub_id.id() + ); + } + + self.subscription = None; + } + /// Create a timeline from a contact list pub fn contact_list(contact_list: &Note, pk_src: PubkeySource) -> Result { let filter = filter::filter_from_tags(contact_list)?.into_follow_filter(); @@ -238,15 +276,15 @@ impl Timeline { } pub fn poll_notes_into_view( - timeline_idx: usize, - mut timelines: Vec<&mut Timeline>, + timeline_ind: usize, + timelines: &mut Vec, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds, note_cache: &mut NoteCache, ) -> Result<()> { let timeline = timelines - .get_mut(timeline_idx) + .get_mut(timeline_ind) .ok_or(Error::TimelineNotFound)?; let sub = timeline.subscription.ok_or(Error::no_active_sub())?; diff --git a/src/timeline/route.rs b/src/timeline/route.rs index 073479c6..b24d082d 100644 --- a/src/timeline/route.rs +++ b/src/timeline/route.rs @@ -3,11 +3,10 @@ use crate::{ column::Columns, draft::Drafts, imgcache::ImageCache, + note::RootNoteId, notecache::NoteCache, - notes_holder::NotesHolderStorage, - profile::Profile, - thread::Thread, - timeline::TimelineId, + subscriptions::SubRefs, + timeline::{TimelineCache, TimelineCacheKey, TimelineId}, ui::{ self, note::{ @@ -24,9 +23,49 @@ use nostrdb::{Ndb, Transaction}; #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum TimelineRoute { Timeline(TimelineId), - Thread(NoteId), + Thread(RootNoteId), Reply(NoteId), Quote(NoteId), + Profile(Pubkey), +} + +impl TimelineRoute { + // TODO(jb55): remove this and centralize subscriptions + pub fn subscriptions<'a>( + &self, + ndb: &Ndb, + note_cache: &mut NoteCache, + txn: &Transaction, + columns: &'a Columns, + timeline_cache: &'a TimelineCache, + ) -> Option> { + match self { + TimelineRoute::Reply(_) => None, + TimelineRoute::Quote(_) => None, + + TimelineRoute::Profile(pubkey) => timeline_cache + .notes( + ndb, + note_cache, + txn, + &TimelineCacheKey::pubkey(pubkey), + ) + .get_ptr() + .subscriptions(), + + TimelineRoute::Timeline(tlid) => Some(columns.find_timeline(*tlid)?.subscriptions()), + + TimelineRoute::Thread(root_note_id) => timeline_cache + .notes( + ndb, + note_cache, + txn, + &TimelineCacheKey::thread(*root_note_id), + ) + .get_ptr() + .subscriptions(), + } + } } pub enum AfterRouteExecution { @@ -48,7 +87,7 @@ pub fn render_timeline_route( drafts: &mut Drafts, img_cache: &mut ImageCache, note_cache: &mut NoteCache, - threads: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, accounts: &mut AccountManager, route: TimelineRoute, col: usize, @@ -65,7 +104,14 @@ pub fn render_timeline_route( let mut cur_column = columns.columns_mut(); let router = cur_column[col].router_mut(); - bar_action.execute_and_process_result(ndb, router, threads, note_cache, pool, &txn); + bar_action.execute_and_process_result( + ndb, + router, + timeline_cache, + note_cache, + pool, + &txn, + ); } timeline_response @@ -73,16 +119,41 @@ pub fn render_timeline_route( .map(AfterRouteExecution::OpenProfile) } + TimelineRoute::Profile(pubkey) => render_profile_route( + &pubkey, + ndb, + columns, + pool, + img_cache, + note_cache, + timeline_cache, + col, + ui, + ), + TimelineRoute::Thread(id) => { - let timeline_response = - ui::ThreadView::new(threads, ndb, note_cache, img_cache, id.bytes(), textmode) - .id_source(egui::Id::new(("threadscroll", col))) - .ui(ui); + let timeline_response = ui::ThreadView::new( + timeline_cache, + ndb, + note_cache, + img_cache, + id.bytes(), + textmode, + ) + .id_source(egui::Id::new(("threadscroll", col))) + .ui(ui); if let Some(bar_action) = timeline_response.bar_action { let txn = Transaction::new(ndb).expect("txn"); let mut cur_column = columns.columns_mut(); let router = cur_column[col].router_mut(); - bar_action.execute_and_process_result(ndb, router, threads, note_cache, pool, &txn); + bar_action.execute_and_process_result( + ndb, + router, + timeline_cache, + note_cache, + pool, + &txn, + ); } timeline_response @@ -160,22 +231,21 @@ pub fn render_profile_route( pubkey: &Pubkey, ndb: &Ndb, columns: &mut Columns, - profiles: &mut NotesHolderStorage, pool: &mut RelayPool, img_cache: &mut ImageCache, note_cache: &mut NoteCache, - threads: &mut NotesHolderStorage, + timeline_cache: &mut TimelineCache, col: usize, ui: &mut egui::Ui, ) -> Option { let timeline_response = - ProfileView::new(pubkey, col, profiles, ndb, note_cache, img_cache).ui(ui); + ProfileView::new(pubkey, col, timeline_cache, ndb, note_cache, img_cache).ui(ui); if let Some(bar_action) = timeline_response.bar_action { let txn = nostrdb::Transaction::new(ndb).expect("txn"); let mut cur_column = columns.columns_mut(); let router = cur_column[col].router_mut(); - bar_action.execute_and_process_result(ndb, router, threads, note_cache, pool, &txn); + bar_action.execute_and_process_result(ndb, router, timeline_cache, note_cache, pool, &txn); } timeline_response diff --git a/src/ui/add_column.rs b/src/ui/add_column.rs index 298addcc..29dcb455 100644 --- a/src/ui/add_column.rs +++ b/src/ui/add_column.rs @@ -31,9 +31,11 @@ impl AddColumnOption { AddColumnOption::Universe => TimelineKind::Universe .into_timeline(ndb, None) .map(AddColumnResponse::Timeline), + AddColumnOption::Notification(pubkey) => TimelineKind::Notifications(pubkey) .into_timeline(ndb, cur_account.map(|a| a.pubkey.bytes())) .map(AddColumnResponse::Timeline), + AddColumnOption::Home(pubkey) => { let tlk = TimelineKind::contact_list(pubkey); tlk.into_timeline(ndb, cur_account.map(|a| a.pubkey.bytes())) diff --git a/src/ui/profile/mod.rs b/src/ui/profile/mod.rs index f9de10cb..8a10218c 100644 --- a/src/ui/profile/mod.rs +++ b/src/ui/profile/mod.rs @@ -8,8 +8,10 @@ pub use picture::ProfilePic; pub use preview::ProfilePreview; use crate::{ - actionbar::NoteActionResponse, imgcache::ImageCache, notecache::NoteCache, - notes_holder::NotesHolderStorage, profile::Profile, + actionbar::NoteActionResponse, + imgcache::ImageCache, + notecache::NoteCache, + timeline::{TimelineCache, TimelineCacheKey}, }; use super::timeline::{tabs_ui, TimelineTabView}; @@ -17,7 +19,7 @@ use super::timeline::{tabs_ui, TimelineTabView}; pub struct ProfileView<'a> { pubkey: &'a Pubkey, col_id: usize, - profiles: &'a mut NotesHolderStorage, + timeline_cache: &'a mut TimelineCache, ndb: &'a Ndb, note_cache: &'a mut NoteCache, img_cache: &'a mut ImageCache, @@ -27,7 +29,7 @@ impl<'a> ProfileView<'a> { pub fn new( pubkey: &'a Pubkey, col_id: usize, - profiles: &'a mut NotesHolderStorage, + timeline_cache: &'a mut TimelineCache, ndb: &'a Ndb, note_cache: &'a mut NoteCache, img_cache: &'a mut ImageCache, @@ -35,7 +37,7 @@ impl<'a> ProfileView<'a> { ProfileView { pubkey, col_id, - profiles, + timeline_cache, ndb, note_cache, img_cache, @@ -53,14 +55,19 @@ impl<'a> ProfileView<'a> { ProfilePreview::new(&profile, self.img_cache).ui(ui); } let profile = self - .profiles - .notes_holder_mutated(self.ndb, self.note_cache, &txn, self.pubkey.bytes()) + .timeline_cache + .notes( + self.ndb, + self.note_cache, + &txn, + &TimelineCacheKey::pubkey(self.pubkey.bytes()), + ) .get_ptr(); - profile.timeline.selected_view = tabs_ui(ui); + profile.timeline_mut().selected_view = tabs_ui(ui); TimelineTabView::new( - profile.timeline.current_view(), + profile.timeline().current_view(), false, false, &txn, diff --git a/src/ui/thread.rs b/src/ui/thread.rs index 6cf6b48e..d7524f20 100644 --- a/src/ui/thread.rs +++ b/src/ui/thread.rs @@ -1,9 +1,9 @@ use crate::{ actionbar::NoteActionResponse, imgcache::ImageCache, + note::RootNoteId, notecache::NoteCache, - notes_holder::{NotesHolder, NotesHolderStorage}, - thread::Thread, + timeline::{TimelineCache, TimelineCacheKey}, }; use nostrdb::{Ndb, NoteKey, Transaction}; use tracing::error; @@ -11,7 +11,7 @@ use tracing::error; use super::timeline::TimelineTabView; pub struct ThreadView<'a> { - threads: &'a mut NotesHolderStorage, + timeline_cache: &'a mut TimelineCache, ndb: &'a Ndb, note_cache: &'a mut NoteCache, img_cache: &'a mut ImageCache, @@ -23,7 +23,7 @@ pub struct ThreadView<'a> { impl<'a> ThreadView<'a> { #[allow(clippy::too_many_arguments)] pub fn new( - threads: &'a mut NotesHolderStorage, + timeline_cache: &'a mut TimelineCache, ndb: &'a Ndb, note_cache: &'a mut NoteCache, img_cache: &'a mut ImageCache, @@ -32,7 +32,7 @@ impl<'a> ThreadView<'a> { ) -> Self { let id_source = egui::Id::new("threadscroll_threadview"); ThreadView { - threads, + timeline_cache, ndb, note_cache, img_cache, @@ -83,16 +83,21 @@ impl<'a> ThreadView<'a> { .note_cache .cached_note_or_insert(selected_note_key, ¬e); - cached_note + RootNoteId::new_unsafe(cached_note .reply .borrow(note.tags()) .root() - .map_or_else(|| self.selected_note_id, |nr| nr.id) + .map_or_else(|| self.selected_note_id, |nr| nr.id)) }; let thread = self - .threads - .notes_holder_mutated(self.ndb, self.note_cache, &txn, root_id) + .timeline_cache + .notes( + self.ndb, + self.note_cache, + &txn, + &TimelineCacheKey::thread(root_id), + ) .get_ptr(); // TODO(jb55): skip poll if ThreadResult is fresh?