Skip to content

Commit

Permalink
Cometindex performance improvements (#4851)
Browse files Browse the repository at this point in the history
## Describe your changes

This improves the performance of cometindex significantly, especially
when many events need to be indexed.

Two accidental problems:
1. we forgot to add an index between attributes and events, to
efficiently get the attributes associated with an event

This alone caused quadratic query performance, which is really bad.

2. Postgres was doing sorting and hashing to join each attribute + event
with the blocks and transactions, before grouping attributes.

First of all, we only want to join the blocks and transactions after
already having grouped the attributes together, to avoid adding a
constant factor overhead, since some events may have a handful of
attributes.
Second of all, we shouldn't be sorting or hash merging at all.
The query should be linear and streaming in complexity, and operate by
scanning the events table in order, and then selectively plucking other
tables columns using their indices references the event id.
This PR amends the query to make Postgres actually do this, mainly by
informing it that only a single block or transaction will get joined
with a transaction.

### Some performance evidence

Previously, when starting up pindexer from scratch, it would take 200
seconds before being able to start processing events.
Now it takes milliseconds.

old query:
```
penumbra_raw=# EXPLAIN SELECT
    events.rowid,
    events.type,
    blocks.height AS block_height,
    tx_results.tx_hash,
    jsonb_object_agg(attributes.key, attributes.value) AS attrs
FROM
    events
LEFT JOIN
    attributes ON events.rowid = attributes.event_id
JOIN
    blocks ON events.block_id = blocks.rowid
LEFT JOIN
    tx_results ON events.tx_id = tx_results.rowid
WHERE
    events.rowid > 1000
GROUP BY
    events.rowid,
    events.type,
    blocks.height,
    tx_results.tx_hash
ORDER BY
    events.rowid ASC;
                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=1444517.14..4825338.21 rows=27193816 width=162)
   Group Key: events.rowid, blocks.height, tx_results.tx_hash
   ->  Merge Left Join  (cost=1444517.14..4213477.35 rows=27193816 width=187)
         Merge Cond: (events.rowid = attributes.event_id)
         ->  Gather Merge  (cost=1444516.70..2690182.07 rows=10695484 width=130)
               Workers Planned: 2
               ->  Sort  (cost=1443516.68..1454657.81 rows=4456452 width=130)
                     Sort Key: events.rowid, blocks.height, tx_results.tx_hash
                     ->  Parallel Hash Left Join  (cost=28256.44..342071.06 rows=4456452 width=130)
                           Hash Cond: (events.tx_id = tx_results.rowid)
                           ->  Parallel Hash Join  (cost=19944.07..322060.42 rows=4456452 width=72)
                                 Hash Cond: (events.block_id = blocks.rowid)
                                 ->  Parallel Seq Scan on events  (cost=0.00..183912.12 rows=4456452 width=72)
                                       Filter: (rowid > 1000)
                                 ->  Parallel Hash  (cost=12626.92..12626.92 rows=420892 width=16)
                                       ->  Parallel Seq Scan on blocks  (cost=0.00..12626.92 rows=420892 width=16)
                           ->  Parallel Hash  (cost=7950.50..7950.50 rows=28950 width=74)
                                 ->  Parallel Seq Scan on tx_results  (cost=0.00..7950.50 rows=28950 width=74)
         ->  Index Scan using attributes_event_id_idx on attributes  (cost=0.44..1156627.19 rows=27196491 width=65)
 JIT:
   Functions: 28
   Options: Inlining true, Optimization true, Expressions true, Deforming true
```

new query:

```
penumbra_raw=# EXPLAIN SELECT
    events.rowid,
    events.type,
    blocks.height AS block_height,
    tx_results.tx_hash,
    events.attrs
FROM (
    SELECT
        rowid,
        type,
        block_id,
        tx_id,
        jsonb_object_agg(attributes.key, attributes.value) AS attrs
    FROM
        events
    LEFT JOIN
        attributes ON rowid = attributes.event_id
    WHERE
        rowid > 1000
    GROUP BY
        rowid,
        type,
        block_id,
        tx_id
) events
LEFT JOIN LATERAL (
    SELECT * FROM blocks WHERE blocks.rowid = events.block_id LIMIT 1
) blocks
ON TRUE
LEFT JOIN LATERAL (
    SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1
) tx_results
ON TRUE
ORDER BY
    events.rowid ASC;
                                                           QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=1.59..181924467.06 rows=10694733 width=162)
   ->  Nested Loop Left Join  (cost=1.30..92837341.17 rows=10694733 width=104)
         ->  GroupAggregate  (cost=0.87..2226215.83 rows=10694733 width=104)
               Group Key: events.rowid
               ->  Merge Left Join  (cost=0.87..1956570.52 rows=27192229 width=129)
                     Merge Cond: (events.rowid = attributes.event_id)
                     ->  Index Scan using events_pkey on events  (cost=0.43..433368.16 rows=10694733 width=72)
                           Index Cond: (rowid > 1000)
                     ->  Index Scan using attributes_event_id_idx on attributes  (cost=0.44..1156555.97 rows=27194904 width=65)
         ->  Limit  (cost=0.42..8.44 rows=1 width=56)
               ->  Index Scan using blocks_pkey on blocks  (cost=0.42..8.44 rows=1 width=56)
                     Index Cond: (rowid = events.block_id)
   ->  Limit  (cost=0.29..8.31 rows=1 width=126)
         ->  Index Scan using tx_results_pkey on tx_results  (cost=0.29..8.31 rows=1 width=126)
               Index Cond: (rowid = events.tx_id)
 JIT:
   Functions: 24
   Options: Inlining true, Optimization true, Expressions true, Deforming true
 ```

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: indexing only.

  > REPLACE THIS TEXT WITH RATIONALE (CAN BE BRIEF)
  • Loading branch information
cronokirby committed Sep 12, 2024
1 parent 5717176 commit 3beea92
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
62 changes: 42 additions & 20 deletions crates/util/cometindex/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Indexer {
let watermark = current_watermark.unwrap_or(0);

// Calculate new events count since the last watermark
sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM events WHERE rowid > $1")
sqlx::query_as::<_, (i64,)>("SELECT MAX(rowid) - $1 FROM events")
.bind(watermark)
.fetch_one(src_db)
.await
Expand Down Expand Up @@ -217,28 +217,50 @@ fn read_events(
watermark: i64,
) -> Pin<Box<dyn Stream<Item = Result<ContextualizedEvent>> + Send + '_>> {
let event_stream = sqlx::query_as::<_, (i64, String, i64, Option<String>, serde_json::Value)>(
// This query does some shenanigans to ensure good performance.
// The main trick is that we know that each event has 1 block and <= 1 transaction associated
// with it, so we can "encourage" (force) Postgres to avoid doing a hash join and
// then a sort, and instead work from the events in a linear fashion.
// Basically, this query ends up doing:
//
// for event in events >= id:
// attach attributes
// attach block
// attach transaction?
r#"
SELECT
events.rowid,
events.type,
SELECT
events.rowid,
events.type,
blocks.height AS block_height,
tx_results.tx_hash,
jsonb_object_agg(attributes.key, attributes.value) AS attrs
FROM
events
LEFT JOIN
attributes ON events.rowid = attributes.event_id
JOIN
blocks ON events.block_id = blocks.rowid
LEFT JOIN
tx_results ON events.tx_id = tx_results.rowid
WHERE
events.rowid > $1
GROUP BY
events.rowid,
events.type,
blocks.height,
tx_results.tx_hash
events.attrs
FROM (
SELECT
rowid,
type,
block_id,
tx_id,
jsonb_object_agg(attributes.key, attributes.value) AS attrs
FROM
events
LEFT JOIN
attributes ON rowid = attributes.event_id
WHERE
rowid > $1
GROUP BY
rowid,
type,
block_id,
tx_id
) events
LEFT JOIN LATERAL (
SELECT * FROM blocks WHERE blocks.rowid = events.block_id LIMIT 1
) blocks
ON TRUE
LEFT JOIN LATERAL (
SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1
) tx_results
ON TRUE
ORDER BY
events.rowid ASC
"#,
Expand Down
3 changes: 3 additions & 0 deletions crates/util/cometindex/vendor/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ CREATE TABLE attributes (
UNIQUE (event_id, key)
);

-- To make it efficient to fetch the attributes of a given event.
CREATE INDEX ON attributes(event_id);

-- A joined view of events and their attributes. Events that do not have any
-- attributes are represented as a single row with empty key and value fields.
CREATE VIEW event_attributes AS
Expand Down

0 comments on commit 3beea92

Please sign in to comment.