diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index 27e58acb83..fa21a0af1d 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -141,7 +141,7 @@ impl Indexer { let watermark = current_watermark.unwrap_or(0); // Calculate new events count since the last watermark - sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM events WHERE rowid > $1") + sqlx::query_as::<_, (i64,)>("SELECT MAX(rowid) - $1 FROM events") .bind(watermark) .fetch_one(src_db) .await @@ -217,28 +217,50 @@ fn read_events( watermark: i64, ) -> Pin> + Send + '_>> { let event_stream = sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( + // This query does some shenanigans to ensure good performance. + // The main trick is that we know that each event has 1 block and <= 1 transaction associated + // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and + // then a sort, and instead work from the events in a linear fashion. + // Basically, this query ends up doing: + // + // for event in events >= id: + // attach attributes + // attach block + // attach transaction? r#" -SELECT - events.rowid, - events.type, +SELECT + events.rowid, + events.type, blocks.height AS block_height, tx_results.tx_hash, - jsonb_object_agg(attributes.key, attributes.value) AS attrs -FROM - events -LEFT JOIN - attributes ON events.rowid = attributes.event_id -JOIN - blocks ON events.block_id = blocks.rowid -LEFT JOIN - tx_results ON events.tx_id = tx_results.rowid -WHERE - events.rowid > $1 -GROUP BY - events.rowid, - events.type, - blocks.height, - tx_results.tx_hash + events.attrs +FROM ( + SELECT + rowid, + type, + block_id, + tx_id, + jsonb_object_agg(attributes.key, attributes.value) AS attrs + FROM + events + LEFT JOIN + attributes ON rowid = attributes.event_id + WHERE + rowid > $1 + GROUP BY + rowid, + type, + block_id, + tx_id +) events +LEFT JOIN LATERAL ( + SELECT * FROM blocks WHERE blocks.rowid = events.block_id LIMIT 1 +) blocks +ON TRUE +LEFT JOIN LATERAL ( + SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1 +) tx_results +ON TRUE ORDER BY events.rowid ASC "#, diff --git a/crates/util/cometindex/vendor/schema.sql b/crates/util/cometindex/vendor/schema.sql index ce5a241bad..fd78e677e0 100644 --- a/crates/util/cometindex/vendor/schema.sql +++ b/crates/util/cometindex/vendor/schema.sql @@ -65,6 +65,9 @@ CREATE TABLE attributes ( UNIQUE (event_id, key) ); +-- To make it efficient to fetch the attributes of a given event. +CREATE INDEX ON attributes(event_id); + -- A joined view of events and their attributes. Events that do not have any -- attributes are represented as a single row with empty key and value fields. CREATE VIEW event_attributes AS