diff --git a/Cargo.lock b/Cargo.lock index 8edacc81..ff9d6e98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1192,7 +1192,7 @@ dependencies = [ [[package]] name = "egui_nav" version = "0.1.0" -source = "git+https://github.com/damus-io/egui-nav?rev=6ba42de2bae384d10e35c532f3856b81d2e9f645#6ba42de2bae384d10e35c532f3856b81d2e9f645" +source = "git+https://github.com/damus-io/egui-nav?rev=956338a90e09c7cda951d554626483e0cdbc7825#956338a90e09c7cda951d554626483e0cdbc7825" dependencies = [ "egui", "egui_extras", diff --git a/Cargo.toml b/Cargo.toml index e0d60cb6..350bf89e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ eframe = { git = "https://github.com/emilk/egui", rev = "fcb7764e48ce00f8f8e58da egui_extras = { git = "https://github.com/emilk/egui", rev = "fcb7764e48ce00f8f8e58da10f937410d65b0bfb", package = "egui_extras", features = ["all_loaders"] } ehttp = "0.2.0" egui_tabs = { git = "https://github.com/damus-io/egui-tabs", branch = "egui-0.28" } -egui_nav = { git = "https://github.com/damus-io/egui-nav", rev = "6ba42de2bae384d10e35c532f3856b81d2e9f645" } +egui_nav = { git = "https://github.com/damus-io/egui-nav", rev = "956338a90e09c7cda951d554626483e0cdbc7825" } egui_virtual_list = { git = "https://github.com/jb55/hello_egui", branch = "egui-0.28", package = "egui_virtual_list" } reqwest = { version = "0.12.4", default-features = false, features = [ "rustls-tls-native-roots" ] } image = { version = "0.25", features = ["jpeg", "png", "webp"] } diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs index de4e1efc..ed8d37aa 100644 --- a/enostr/src/relay/pool.rs +++ b/enostr/src/relay/pool.rs @@ -18,6 +18,20 @@ pub struct PoolEvent<'a> { pub event: ewebsock::WsEvent, } +impl<'a> PoolEvent<'a> { + pub fn into_owned(self) -> PoolEventBuf { + PoolEventBuf { + relay: self.relay.to_owned(), + event: self.event, + } + } +} + +pub struct PoolEventBuf { + pub relay: String, + pub event: ewebsock::WsEvent, +} + pub struct PoolRelay { pub relay: Relay, pub last_ping: Instant, diff --git a/src/app.rs b/src/app.rs index d886b5d5..6c9f4620 100644 --- a/src/app.rs +++ b/src/app.rs @@ -6,20 +6,18 @@ use crate::{ args::Args, column::Columns, draft::Drafts, - error::{Error, FilterError}, - filter::{self, FilterState}, + filter::FilterState, frame_history::FrameHistory, imgcache::ImageCache, nav, - note::NoteRef, - notecache::{CachedNote, NoteCache}, + notecache::NoteCache, notes_holder::NotesHolderStorage, profile::Profile, storage::{self, DataPath, DataPathType, Directory, FileKeyStorage, KeyStorageType}, subscriptions::{SubKind, Subscriptions}, support::Support, thread::Thread, - timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, + timeline::{self, Timeline, TimelineKind}, ui::{self, DesktopSidePanel}, unknowns::UnknownIds, view_state::ViewState, @@ -32,18 +30,17 @@ use uuid::Uuid; use egui::{Context, Frame, Style}; use egui_extras::{Size, StripBuilder}; -use nostrdb::{Config, Filter, Ndb, Note, Transaction}; +use nostrdb::{Config, Filter, Ndb, Transaction}; use std::collections::HashMap; use std::path::Path; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, info, trace, warn}; #[derive(Debug, Eq, PartialEq, Clone)] pub enum DamusState { Initializing, Initialized, - NewTimelineSub(TimelineId), } /// We derive Deserialize/Serialize so we can persist app state on shutdown. @@ -99,98 +96,6 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { } } -fn send_initial_timeline_filter( - ndb: &Ndb, - can_since_optimize: bool, - subs: &mut Subscriptions, - pool: &mut RelayPool, - timeline: &mut Timeline, - to: &str, -) { - let filter_state = timeline.filter.clone(); - - match filter_state { - FilterState::Broken(err) => { - error!( - "FetchingRemote state in broken state when sending initial timeline filter? {err}" - ); - } - - FilterState::FetchingRemote(_unisub) => { - error!("FetchingRemote state when sending initial timeline filter?"); - } - - FilterState::GotRemote(_sub) => { - error!("GotRemote state when sending initial timeline filter?"); - } - - FilterState::Ready(filter) => { - let filter = filter.to_owned(); - let new_filters = filter.into_iter().map(|f| { - // limit the size of remote filters - let default_limit = filter::default_remote_limit(); - let mut lim = f.limit().unwrap_or(default_limit); - let mut filter = f; - if lim > default_limit { - lim = default_limit; - filter = filter.limit_mut(lim); - } - - let notes = timeline.notes(ViewFilter::NotesAndReplies); - - // Should we since optimize? Not always. For example - // if we only have a few notes locally. One way to - // determine this is by looking at the current filter - // and seeing what its limit is. If we have less - // notes than the limit, we might want to backfill - // older notes - if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { - filter = filter::since_optimize_filter(filter, notes); - } else { - warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); - } - - filter - }).collect(); - - //let sub_id = damus.gen_subid(&SubKind::Initial); - let sub_id = Uuid::new_v4().to_string(); - subs.subs.insert(sub_id.clone(), SubKind::Initial); - - let cmd = ClientMessage::req(sub_id, new_filters); - pool.send_to(&cmd, to); - } - - // we need some data first - FilterState::NeedsRemote(filter) => { - let sub_kind = SubKind::FetchingContactList(timeline.id); - //let sub_id = damus.gen_subid(&sub_kind); - let sub_id = Uuid::new_v4().to_string(); - let local_sub = ndb.subscribe(&filter).expect("sub"); - - timeline.filter = FilterState::fetching_remote(sub_id.clone(), local_sub); - - subs.subs.insert(sub_id.clone(), sub_kind); - - pool.subscribe(sub_id, filter.to_owned()); - } - } -} - -fn send_initial_filters(damus: &mut Damus, relay_url: &str) { - info!("Sending initial filters to {}", relay_url); - for timeline in damus.columns.timelines_mut() { - send_initial_timeline_filter( - &damus.ndb, - damus.since_optimize, - &mut damus.subscriptions, - &mut damus.pool, - timeline, - relay_url, - ); - } -} - fn handle_key_events(input: &egui::InputState, _pixels_per_point: f32, columns: &mut Columns) { for event in &input.raw.events { if let egui::Event::Key { @@ -226,17 +131,31 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> { }; damus.pool.keepalive_ping(wakeup); - // pool stuff - while let Some(ev) = damus.pool.try_recv() { - let relay = ev.relay.to_owned(); + // NOTE: we don't use the while let loop due to borrow issues + #[allow(clippy::while_let_loop)] + loop { + let ev = if let Some(ev) = damus.pool.try_recv() { + ev.into_owned() + } else { + break; + }; match (&ev.event).into() { - RelayEvent::Opened => send_initial_filters(damus, &relay), + RelayEvent::Opened => { + timeline::send_initial_timeline_filters( + &damus.ndb, + damus.since_optimize, + &mut damus.columns, + &mut damus.subscriptions, + &mut damus.pool, + &ev.relay, + ); + } // TODO: handle reconnects - RelayEvent::Closed => warn!("{} connection closed", &relay), - RelayEvent::Error(e) => error!("{}: {}", &relay, e), + RelayEvent::Closed => warn!("{} connection closed", &ev.relay), + RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), RelayEvent::Other(msg) => trace!("other event {:?}", &msg), - RelayEvent::Message(msg) => process_message(damus, &relay, &msg), + RelayEvent::Message(msg) => process_message(damus, &ev.relay, &msg), } } @@ -244,9 +163,11 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> { for timeline_ind in 0..n_timelines { let is_ready = { let timeline = &mut damus.columns.timelines[timeline_ind]; - matches!( - is_timeline_ready(&damus.ndb, &mut damus.pool, &mut damus.note_cache, timeline), - Ok(true) + timeline::is_timeline_ready( + &damus.ndb, + &mut damus.pool, + &mut damus.note_cache, + timeline, ) }; @@ -286,183 +207,11 @@ fn unknown_id_send(damus: &mut Damus) { damus.pool.send(&msg); } -/// Check our timeline filter and see if we have any filter data ready. -/// Our timelines may require additional data before it is functional. For -/// example, when we have to fetch a contact list before we do the actual -/// following list query. -fn is_timeline_ready( - ndb: &Ndb, - pool: &mut RelayPool, - note_cache: &mut NoteCache, - timeline: &mut Timeline, -) -> Result { - let sub = match &timeline.filter { - FilterState::GotRemote(sub) => *sub, - FilterState::Ready(_f) => return Ok(true), - _ => return Ok(false), - }; - - // We got at least one eose for our filter request. Let's see - // if nostrdb is done processing it yet. - let res = ndb.poll_for_notes(sub, 1); - if res.is_empty() { - debug!( - "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", - timeline - ); - return Ok(false); - } - - info!("notes found for contact timeline after GotRemote!"); - - let note_key = res[0]; - - let filter = { - let txn = Transaction::new(ndb).expect("txn"); - let note = ndb.get_note_by_key(&txn, note_key).expect("note"); - filter::filter_from_tags(¬e).map(|f| f.into_follow_filter()) - }; - - // TODO: into_follow_filter is hardcoded to contact lists, let's generalize - match filter { - Err(Error::Filter(e)) => { - error!("got broken when building filter {e}"); - timeline.filter = FilterState::broken(e); - } - Err(err) => { - error!("got broken when building filter {err}"); - timeline.filter = FilterState::broken(FilterError::EmptyContactList); - return Err(err); - } - Ok(filter) => { - // we just switched to the ready state, we should send initial - // queries and setup the local subscription - info!("Found contact list! Setting up local and remote contact list query"); - setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init"); - timeline.filter = FilterState::ready(filter.clone()); - - //let ck = &timeline.kind; - //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); - let subid = Uuid::new_v4().to_string(); - pool.subscribe(subid, filter) - } - } - - Ok(true) -} - #[cfg(feature = "profiling")] fn setup_profiling() { puffin::set_scopes_on(true); // tell puffin to collect data } -fn setup_initial_timeline( - ndb: &Ndb, - timeline: &mut Timeline, - note_cache: &mut NoteCache, - filters: &[Filter], -) -> Result<()> { - timeline.subscription = Some(ndb.subscribe(filters)?); - let txn = Transaction::new(ndb)?; - debug!( - "querying nostrdb sub {:?} {:?}", - timeline.subscription, timeline.filter - ); - let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; - let notes = ndb - .query(&txn, filters, lim)? - .into_iter() - .map(NoteRef::from_query_result) - .collect(); - - copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes); - - Ok(()) -} - -pub fn copy_notes_into_timeline( - timeline: &mut Timeline, - txn: &Transaction, - ndb: &Ndb, - note_cache: &mut NoteCache, - notes: Vec, -) { - let filters = { - let views = &timeline.views; - let filters: Vec bool> = - views.iter().map(|v| v.filter.filter()).collect(); - filters - }; - - for note_ref in notes { - for (view, filter) in filters.iter().enumerate() { - if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { - if filter( - note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), - ¬e, - ) { - timeline.views[view].notes.push(note_ref) - } - } - } - } -} - -fn setup_initial_nostrdb_subs( - ndb: &Ndb, - note_cache: &mut NoteCache, - columns: &mut Columns, -) -> Result<()> { - for timeline in columns.timelines_mut() { - setup_nostrdb_sub(ndb, note_cache, timeline)? - } - - Ok(()) -} - -fn setup_nostrdb_sub(ndb: &Ndb, note_cache: &mut NoteCache, timeline: &mut Timeline) -> Result<()> { - match &timeline.filter { - FilterState::Ready(filters) => { - { setup_initial_timeline(ndb, timeline, note_cache, &filters.clone()) }? - } - - FilterState::Broken(err) => { - error!("FetchingRemote state broken in setup_initial_nostr_subs: {err}") - } - FilterState::FetchingRemote(_) => { - error!("FetchingRemote state in setup_initial_nostr_subs") - } - FilterState::GotRemote(_) => { - error!("GotRemote state in setup_initial_nostr_subs") - } - FilterState::NeedsRemote(_filters) => { - // can't do anything yet, we defer to first connect to send - // remote filters - } - } - - Ok(()) -} - -fn setup_new_nostrdb_sub( - ndb: &Ndb, - note_cache: &mut NoteCache, - columns: &mut Columns, - new_timeline_id: TimelineId, -) -> Result<()> { - if let Some(timeline) = columns.find_timeline_mut(new_timeline_id) { - info!("Setting up timeline sub for {}", timeline.id); - if let FilterState::Ready(filters) = &timeline.filter { - for filter in filters { - info!("Setting up filter {:?}", filter.json()); - } - } - setup_nostrdb_sub(ndb, note_cache, timeline)? - } - - Ok(()) -} - fn update_damus(damus: &mut Damus, ctx: &egui::Context) { match damus.state { DamusState::Initializing => { @@ -474,34 +223,12 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { damus .subscriptions() .insert("unknownids".to_string(), SubKind::OneShot); - setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns) - .expect("home subscription failed"); - } - - DamusState::NewTimelineSub(new_timeline_id) => { - info!("adding new timeline {}", new_timeline_id); - setup_new_nostrdb_sub( + if let Err(err) = timeline::setup_initial_nostrdb_subs( &damus.ndb, &mut damus.note_cache, &mut damus.columns, - new_timeline_id, - ) - .expect("new timeline subscription failed"); - - if let Some(filter) = { - let timeline = damus - .columns - .find_timeline(new_timeline_id) - .expect("timeline"); - match &timeline.filter { - FilterState::Ready(filters) => Some(filters.clone()), - _ => None, - } - } { - let subid = Uuid::new_v4().to_string(); - damus.pool.subscribe(subid, filter); - - damus.state = DamusState::Initialized; + ) { + warn!("update_damus init: {err}"); } } @@ -573,10 +300,12 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { return Ok(()); }; + let filter_state = timeline.filter.get(relay_url); + // If this request was fetching a contact list, our filter // state should be "FetchingRemote". We look at the local // subscription for that filter state and get the subscription id - let local_sub = if let FilterState::FetchingRemote(unisub) = &timeline.filter { + let local_sub = if let FilterState::FetchingRemote(unisub) = filter_state { unisub.local } else { // TODO: we could have multiple contact list results, we need @@ -588,10 +317,17 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { return Ok(()); }; + info!( + "got contact list from {}, updating filter_state to got_remote", + relay_url + ); + // We take the subscription id and pass it to the new state of // "GotRemote". This will let future frames know that it can try // to look for the contact list in nostrdb. - timeline.filter = FilterState::got_remote(local_sub); + timeline + .filter + .set_relay_state(relay_url.to_string(), FilterState::got_remote(local_sub)); } } @@ -819,10 +555,6 @@ impl Damus { } } - pub fn subscribe_new_timeline(&mut self, timeline_id: TimelineId) { - self.state = DamusState::NewTimelineSub(timeline_id); - } - pub fn mock>(data_path: P) -> Self { let mut columns = Columns::new(); let filter = Filter::from_json(include_str!("../queries/global.json")).unwrap(); diff --git a/src/filter.rs b/src/filter.rs index 85a08eed..1b470a2a 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -2,6 +2,7 @@ use crate::error::{Error, FilterError}; use crate::note::NoteRef; use crate::Result; use nostrdb::{Filter, FilterBuilder, Note, Subscription}; +use std::collections::HashMap; use tracing::{debug, warn}; /// A unified subscription has a local and remote component. The remote subid @@ -12,6 +13,74 @@ pub struct UnifiedSubscription { pub remote: String, } +/// Each relay can have a different filter state. For example, some +/// relays may have the contact list, some may not. Let's capture all of +/// these states so that some relays don't stop the states of other +/// relays. +#[derive(Debug)] +pub struct FilterStates { + pub initial_state: FilterState, + pub states: HashMap, +} + +impl FilterStates { + pub fn get(&mut self, relay: &str) -> &FilterState { + // if our initial state is ready, then just use that + if let FilterState::Ready(_) = self.initial_state { + &self.initial_state + } else { + // otherwise we look at relay states + if !self.states.contains_key(relay) { + self.states + .insert(relay.to_string(), self.initial_state.clone()); + } + self.states.get(relay).unwrap() + } + } + + pub fn get_any_gotremote(&self) -> Option<(&str, Subscription)> { + for (k, v) in self.states.iter() { + if let FilterState::GotRemote(sub) = v { + return Some((k, *sub)); + } + } + + None + } + + pub fn get_any_ready(&self) -> Option<&Vec> { + if let FilterState::Ready(fs) = &self.initial_state { + Some(fs) + } else { + for (_k, v) in self.states.iter() { + if let FilterState::Ready(ref fs) = v { + return Some(fs); + } + } + + None + } + } + + pub fn new(initial_state: FilterState) -> Self { + Self { + initial_state, + states: HashMap::new(), + } + } + + pub fn set_relay_state(&mut self, relay: String, state: FilterState) { + if self.states.contains_key(&relay) { + let current_state = self.states.get(&relay).unwrap(); + debug!( + "set_relay_state: {:?} -> {:?} on {}", + current_state, state, &relay, + ); + } + self.states.insert(relay, state); + } +} + /// We may need to fetch some data from relays before our filter is ready. /// [`FilterState`] tracks this. #[derive(Debug, Clone)] diff --git a/src/nav.rs b/src/nav.rs index 9dcf6ed5..c4ebfc7d 100644 --- a/src/nav.rs +++ b/src/nav.rs @@ -42,8 +42,9 @@ pub fn render_nav(col: usize, app: &mut Damus, ui: &mut egui::Ui) -> bool { let nav_response = Nav::new(routes) .navigating(app.columns_mut().column_mut(col).router_mut().navigating) .returning(app.columns_mut().column_mut(col).router_mut().returning) + .id_source(egui::Id::new(col_id)) .title(48.0, title_bar) - .show_mut(col_id, ui, |ui, nav| { + .show_mut(ui, |ui, nav| { let column = app.columns.column_mut(col); match &nav.top().route { Route::Timeline(tlr) => render_timeline_route( diff --git a/src/profile.rs b/src/profile.rs index 662e4719..3bfb3a7c 100644 --- a/src/profile.rs +++ b/src/profile.rs @@ -2,13 +2,12 @@ use enostr::{Filter, Pubkey}; use nostrdb::{FilterBuilder, Ndb, ProfileRecord, Transaction}; use crate::{ - app::copy_notes_into_timeline, filter::{self, FilterState}, multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, notes_holder::NotesHolder, - timeline::{PubkeySource, Timeline, TimelineKind}, + timeline::{copy_notes_into_timeline, PubkeySource, Timeline, TimelineKind}, }; pub enum DisplayName<'a> { diff --git a/src/subscriptions.rs b/src/subscriptions.rs index d006bf42..5396c17f 100644 --- a/src/subscriptions.rs +++ b/src/subscriptions.rs @@ -1,5 +1,6 @@ use crate::timeline::{TimelineId, TimelineKind}; use std::collections::HashMap; +use uuid::Uuid; #[derive(Debug, Clone)] pub enum SubKind { @@ -25,3 +26,7 @@ pub enum SubKind { pub struct Subscriptions { pub subs: HashMap, } + +pub fn new_sub_id() -> String { + Uuid::new_v4().to_string() +} diff --git a/src/timeline/mod.rs b/src/timeline/mod.rs index d92e0f97..cf1e864e 100644 --- a/src/timeline/mod.rs +++ b/src/timeline/mod.rs @@ -1,20 +1,26 @@ -use crate::error::Error; -use crate::note::NoteRef; -use crate::notecache::{CachedNote, NoteCache}; -use crate::unknowns::UnknownIds; -use crate::Result; -use crate::{filter, filter::FilterState}; +use crate::{ + column::Columns, + error::{Error, FilterError}, + filter::{self, FilterState, FilterStates}, + note::NoteRef, + notecache::{CachedNote, NoteCache}, + subscriptions::{self, SubKind, Subscriptions}, + unknowns::UnknownIds, + Result, +}; + use std::fmt; use std::sync::atomic::{AtomicU32, Ordering}; use egui_virtual_list::VirtualList; -use nostrdb::{Ndb, Note, Subscription, Transaction}; +use enostr::{Relay, RelayPool}; +use nostrdb::{Filter, Ndb, Note, Subscription, Transaction}; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::hash::Hash; use std::rc::Rc; -use tracing::{debug, error}; +use tracing::{debug, error, info, warn}; pub mod kind; pub mod route; @@ -170,7 +176,7 @@ pub struct Timeline { pub kind: TimelineKind, // We may not have the filter loaded yet, so let's make it an option so // that codepaths have to explicitly handle it - pub filter: FilterState, + pub filter: FilterStates, pub views: Vec, pub selected_view: i32, @@ -209,10 +215,11 @@ impl Timeline { Timeline::make_view_id(self.id, self.selected_view) } - pub fn new(kind: TimelineKind, filter: FilterState) -> Self { + pub fn new(kind: TimelineKind, filter_state: FilterState) -> Self { // global unique id for all new timelines static UIDS: AtomicU32 = AtomicU32::new(0); + let filter = FilterStates::new(filter_state); let subscription: Option = None; let notes = TimelineTab::new(ViewFilter::Notes); let replies = TimelineTab::new(ViewFilter::NotesAndReplies); @@ -370,3 +377,304 @@ pub fn merge_sorted_vecs(vec1: &[T], vec2: &[T]) -> (Vec, Merg (merged, result.unwrap_or(MergeKind::FrontInsert)) } + +/// When adding a new timeline, we may have a situation where the +/// FilterState is NeedsRemote. This can happen if we don't yet have the +/// contact list, etc. For these situations, we query all of the relays +/// with the same sub_id. We keep track of this sub_id and update the +/// filter with the latest version of the returned filter (ie contact +/// list) when they arrive. +/// +/// We do this by maintaining this sub_id in the filter state, even when +/// in the ready state. See: [`FilterReady`] +pub fn setup_new_timeline( + timeline: &mut Timeline, + ndb: &Ndb, + subs: &mut Subscriptions, + pool: &mut RelayPool, + note_cache: &mut NoteCache, + since_optimize: bool, +) { + // if we're ready, setup local subs + if is_timeline_ready(ndb, pool, note_cache, timeline) { + if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) { + error!("setup_new_timeline: {err}"); + } + } + + for relay in &mut pool.relays { + send_initial_timeline_filter(ndb, since_optimize, subs, &mut relay.relay, timeline); + } +} + +/// Send initial filters for a specific relay. This typically gets called +/// when we first connect to a new relay for the first time. For +/// situations where you are adding a new timeline, use +/// setup_new_timeline. +pub fn send_initial_timeline_filters( + ndb: &Ndb, + since_optimize: bool, + columns: &mut Columns, + subs: &mut Subscriptions, + pool: &mut RelayPool, + relay_id: &str, +) -> Option<()> { + info!("Sending initial filters to {}", relay_id); + let relay = &mut pool + .relays + .iter_mut() + .find(|r| r.relay.url == relay_id)? + .relay; + + for timeline in columns.timelines_mut() { + send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); + } + + Some(()) +} + +pub fn send_initial_timeline_filter( + ndb: &Ndb, + can_since_optimize: bool, + subs: &mut Subscriptions, + relay: &mut Relay, + timeline: &mut Timeline, +) { + let filter_state = timeline.filter.get(&relay.url); + + match filter_state { + FilterState::Broken(err) => { + error!( + "FetchingRemote state in broken state when sending initial timeline filter? {err}" + ); + } + + FilterState::FetchingRemote(_unisub) => { + error!("FetchingRemote state when sending initial timeline filter?"); + } + + FilterState::GotRemote(_sub) => { + error!("GotRemote state when sending initial timeline filter?"); + } + + FilterState::Ready(filter) => { + let filter = filter.to_owned(); + let new_filters = filter.into_iter().map(|f| { + // limit the size of remote filters + let default_limit = filter::default_remote_limit(); + let mut lim = f.limit().unwrap_or(default_limit); + let mut filter = f; + if lim > default_limit { + lim = default_limit; + filter = filter.limit_mut(lim); + } + + let notes = timeline.notes(ViewFilter::NotesAndReplies); + + // Should we since optimize? Not always. For example + // if we only have a few notes locally. One way to + // determine this is by looking at the current filter + // and seeing what its limit is. If we have less + // notes than the limit, we might want to backfill + // older notes + if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { + filter = filter::since_optimize_filter(filter, notes); + } else { + warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); + } + + filter + }).collect(); + + //let sub_id = damus.gen_subid(&SubKind::Initial); + let sub_id = subscriptions::new_sub_id(); + subs.subs.insert(sub_id.clone(), SubKind::Initial); + + relay.subscribe(sub_id, new_filters); + } + + // we need some data first + FilterState::NeedsRemote(filter) => { + fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline) + } + } +} + +fn fetch_contact_list( + filter: Vec, + ndb: &Ndb, + subs: &mut Subscriptions, + relay: &mut Relay, + timeline: &mut Timeline, +) { + let sub_kind = SubKind::FetchingContactList(timeline.id); + let sub_id = subscriptions::new_sub_id(); + let local_sub = ndb.subscribe(&filter).expect("sub"); + + timeline.filter.set_relay_state( + relay.url.clone(), + FilterState::fetching_remote(sub_id.clone(), local_sub), + ); + + subs.subs.insert(sub_id.clone(), sub_kind); + + info!("fetching contact list from {}", &relay.url); + relay.subscribe(sub_id, filter); +} + +fn setup_initial_timeline( + ndb: &Ndb, + timeline: &mut Timeline, + note_cache: &mut NoteCache, + filters: &[Filter], +) -> Result<()> { + timeline.subscription = Some(ndb.subscribe(filters)?); + let txn = Transaction::new(ndb)?; + debug!( + "querying nostrdb sub {:?} {:?}", + timeline.subscription, timeline.filter + ); + let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; + let notes = ndb + .query(&txn, filters, lim)? + .into_iter() + .map(NoteRef::from_query_result) + .collect(); + + copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes); + + Ok(()) +} + +pub fn copy_notes_into_timeline( + timeline: &mut Timeline, + txn: &Transaction, + ndb: &Ndb, + note_cache: &mut NoteCache, + notes: Vec, +) { + let filters = { + let views = &timeline.views; + let filters: Vec bool> = + views.iter().map(|v| v.filter.filter()).collect(); + filters + }; + + for note_ref in notes { + for (view, filter) in filters.iter().enumerate() { + if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { + if filter( + note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), + ¬e, + ) { + timeline.views[view].notes.push(note_ref) + } + } + } + } +} + +pub fn setup_initial_nostrdb_subs( + ndb: &Ndb, + note_cache: &mut NoteCache, + columns: &mut Columns, +) -> Result<()> { + for timeline in columns.timelines_mut() { + setup_timeline_nostrdb_sub(ndb, note_cache, timeline)?; + } + + Ok(()) +} + +fn setup_timeline_nostrdb_sub( + ndb: &Ndb, + note_cache: &mut NoteCache, + timeline: &mut Timeline, +) -> Result<()> { + let filter_state = timeline + .filter + .get_any_ready() + .ok_or(Error::empty_contact_list())? + .to_owned(); + + setup_initial_timeline(ndb, timeline, note_cache, &filter_state)?; + + Ok(()) +} + +/// Check our timeline filter and see if we have any filter data ready. +/// Our timelines may require additional data before it is functional. For +/// example, when we have to fetch a contact list before we do the actual +/// following list query. +pub fn is_timeline_ready( + ndb: &Ndb, + pool: &mut RelayPool, + note_cache: &mut NoteCache, + timeline: &mut Timeline, +) -> bool { + // TODO: we should debounce the filter states a bit to make sure we have + // seen all of the different contact lists from each relay + if let Some(_f) = timeline.filter.get_any_ready() { + return true; + } + + let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() { + (relay_id.to_string(), sub) + } else { + return false; + }; + + // We got at least one eose for our filter request. Let's see + // if nostrdb is done processing it yet. + let res = ndb.poll_for_notes(sub, 1); + if res.is_empty() { + debug!( + "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", + timeline + ); + return false; + } + + info!("notes found for contact timeline after GotRemote!"); + + let note_key = res[0]; + + let filter = { + let txn = Transaction::new(ndb).expect("txn"); + let note = ndb.get_note_by_key(&txn, note_key).expect("note"); + filter::filter_from_tags(¬e).map(|f| f.into_follow_filter()) + }; + + // TODO: into_follow_filter is hardcoded to contact lists, let's generalize + match filter { + Err(Error::Filter(e)) => { + error!("got broken when building filter {e}"); + timeline + .filter + .set_relay_state(relay_id, FilterState::broken(e)); + false + } + Err(err) => { + error!("got broken when building filter {err}"); + timeline + .filter + .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); + false + } + Ok(filter) => { + // we just switched to the ready state, we should send initial + // queries and setup the local subscription + info!("Found contact list! Setting up local and remote contact list query"); + setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init"); + timeline + .filter + .set_relay_state(relay_id, FilterState::ready(filter.clone())); + + //let ck = &timeline.kind; + //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); + let subid = subscriptions::new_sub_id(); + pool.subscribe(subid, filter); + true + } + } +} diff --git a/src/ui/add_column.rs b/src/ui/add_column.rs index 308186a2..2ca30bae 100644 --- a/src/ui/add_column.rs +++ b/src/ui/add_column.rs @@ -361,10 +361,16 @@ pub fn render_add_column_routes( if let Some(resp) = resp { match resp { - AddColumnResponse::Timeline(timeline) => { - let id = timeline.id; + AddColumnResponse::Timeline(mut timeline) => { + crate::timeline::setup_new_timeline( + &mut timeline, + &app.ndb, + &mut app.subscriptions, + &mut app.pool, + &mut app.note_cache, + app.since_optimize, + ); app.columns_mut().add_timeline_to_column(col, timeline); - app.subscribe_new_timeline(id); } AddColumnResponse::UndecidedNotification => { app.columns_mut().column_mut(col).router_mut().route_to(