From fcab9a8412967e537f997c8b83292005aae04931 Mon Sep 17 00:00:00 2001
From: Nick Barrett <nick@beeper.com>
Date: Sat, 4 Feb 2023 16:45:29 +0100
Subject: [PATCH 1/4] Use `receipts_linearized.event_stream_ordering` column

This removes a bunch of joins against the events table.
---
 .../databases/main/event_push_actions.py      | 12 +++++------
 synapse/storage/databases/main/receipts.py    | 20 +++++++++----------
 2 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 3a0c370fde1d..689a662af813 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1005,9 +1005,8 @@ def _get_receipts_by_room_txn(
         )
 
         sql = f"""
-            SELECT room_id, thread_id, MAX(stream_ordering)
+            SELECT room_id, thread_id, MAX(event_stream_ordering)
             FROM receipts_linearized
-            INNER JOIN events USING (room_id, event_id)
             WHERE {receipt_types_clause}
             AND user_id = ?
             GROUP BY room_id, thread_id
@@ -1486,11 +1485,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
         )
 
         sql = """
-            SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
-            FROM receipts_linearized AS r
-            INNER JOIN events AS e USING (event_id)
-            WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
-            ORDER BY r.stream_id ASC
+            SELECT stream_id, room_id, user_id, thread_id, event_stream_ordering
+            FROM receipts_linearized
+            WHERE ? < stream_id AND stream_id <= ? AND user_id LIKE ?
+            ORDER BY stream_id ASC
             LIMIT ?
         """
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 29972d520413..07d36d093460 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -34,6 +34,7 @@
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
+from synapse.storage.databases.main.stream import StreamWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.engines._base import IsolationLevel
 from synapse.storage.util.id_generators import (
@@ -52,7 +53,7 @@
 logger = logging.getLogger(__name__)
 
 
-class ReceiptsWorkerStore(SQLBaseStore):
+class ReceiptsWorkerStore(StreamWorkerStore, SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -144,14 +145,13 @@ def get_last_unthreaded_receipt_for_user_txn(
         )
 
         sql = f"""
-            SELECT event_id, stream_ordering
+            SELECT event_id, event_stream_ordering
             FROM receipts_linearized
-            INNER JOIN events USING (room_id, event_id)
             WHERE {clause}
             AND user_id = ?
             AND room_id = ?
             AND thread_id IS NULL
-            ORDER BY stream_ordering DESC
+            ORDER BY event_stream_ordering DESC
             LIMIT 1
         """
 
@@ -632,16 +632,15 @@ def _insert_linearized_receipt_txn(
         # have to compare orderings of existing receipts
         if stream_ordering is not None:
             if thread_id is None:
-                thread_clause = "r.thread_id IS NULL"
+                thread_clause = "thread_id IS NULL"
                 thread_args: Tuple[str, ...] = ()
             else:
-                thread_clause = "r.thread_id = ?"
+                thread_clause = "thread_id = ?"
                 thread_args = (thread_id,)
 
             sql = f"""
-            SELECT stream_ordering, event_id FROM events
-            INNER JOIN receipts_linearized AS r USING (event_id, room_id)
-            WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
+            SELECT event_stream_ordering, event_id FROM receipts_linearized
+            WHERE room_id = ? AND receipt_type = ? AND user_id = ? AND {thread_clause}
             """
             txn.execute(
                 sql,
@@ -654,7 +653,8 @@ def _insert_linearized_receipt_txn(
             )
 
             for so, eid in txn:
-                if int(so) >= stream_ordering:
+                # Guard against old receipts with no `event_stream_ordering`
+                if so and int(so) >= stream_ordering:
                     logger.debug(
                         "Ignoring new receipt for %s in favour of existing "
                         "one for later event %s",

From c5584dc07576436fd767da504b27af5cb4e78051 Mon Sep 17 00:00:00 2001
From: Nick Barrett <nick@beeper.com>
Date: Sat, 4 Feb 2023 16:46:11 +0100
Subject: [PATCH 2/4] Reject local user receipts for events we don't have

This should be impossible, check is here to pevent bad actors from breaking
synapse with receipts for non-events.
---
 synapse/storage/databases/main/receipts.py | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 07d36d093460..3b92d2c85eab 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -625,6 +625,16 @@ def _insert_linearized_receipt_txn(
             allow_none=True,
         )
 
+        # Local user receipts must have an event_stream_ordering so that push action
+        # summarisation works correctly. This should always be the case because any
+        # local user should only receive events from this server. This exception
+        # protects against bad actors sending dodgy receipts.
+        if res is None and self.hs.is_mine_id(user_id):
+            raise ValueError(
+                "Local users cannot send receipts for unknown events, "
+                f"roomID={room_id}, eventID={event_id}",
+            )
+
         stream_ordering = int(res["stream_ordering"]) if res else None
         rx_ts = res["received_ts"] if res else 0
 

From a0862bb6c89aa32dd0297e87ca6a0c52f62f0574 Mon Sep 17 00:00:00 2001
From: Nick Barrett <nick@beeper.com>
Date: Sat, 4 Feb 2023 16:46:30 +0100
Subject: [PATCH 3/4] Update tests to always send local user receipts for real
 events

---
 tests/handlers/test_appservice.py  | 14 ++++++++++----
 tests/rest/client/test_receipts.py |  9 +++++++--
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index a7495ab21a41..410cc7323a9b 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -588,7 +588,7 @@ def test_sending_read_receipt_batches_to_application_services(self) -> None:
                 ],
                 ApplicationService.NS_ROOMS: [
                     {
-                        "regex": "!fakeroom_.*",
+                        "regex": ".*",
                         "exclusive": True,
                     }
                 ],
@@ -597,16 +597,22 @@ def test_sending_read_receipt_batches_to_application_services(self) -> None:
 
         # Now, pretend that we receive a large burst of read receipts (300 total) that
         # all come in at once.
-        for i in range(300):
+        for _ in range(300):
+            room_id = self.helper.create_room_as(
+                self.local_user, tok=self.local_user_token
+            )
+            resp = self.helper.send(room_id, tok=self.local_user_token)
+            event_id = resp["event_id"]
+
             self.get_success(
                 # Insert a fake read receipt into the database
                 self.hs.get_datastores().main.insert_receipt(
                     # We have to use unique room ID + user ID combinations here, as the db query
                     # is an upsert.
-                    room_id=f"!fakeroom_{i}:test",
+                    room_id=room_id,
                     receipt_type="m.read",
                     user_id=self.local_user,
-                    event_ids=[f"$eventid_{i}"],
+                    event_ids=[event_id],
                     thread_id=None,
                     data={},
                 )
diff --git a/tests/rest/client/test_receipts.py b/tests/rest/client/test_receipts.py
index 2a7fcea38607..0f09566c2d70 100644
--- a/tests/rest/client/test_receipts.py
+++ b/tests/rest/client/test_receipts.py
@@ -14,7 +14,7 @@
 from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
-from synapse.rest.client import login, receipts, register
+from synapse.rest.client import login, receipts, register, room
 from synapse.server import HomeServer
 from synapse.util import Clock
 
@@ -26,6 +26,7 @@ class ReceiptsTestCase(unittest.HomeserverTestCase):
         login.register_servlets,
         register.register_servlets,
         receipts.register_servlets,
+        room.register_servlets,
         synapse.rest.admin.register_servlets,
     ]
 
@@ -34,9 +35,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.owner_tok = self.login("owner", "pass")
 
     def test_send_receipt(self) -> None:
+        room_id = self.helper.create_room_as(self.owner, tok=self.owner_tok)
+        resp = self.helper.send(room_id, tok=self.owner_tok)
+        event_id = resp["event_id"]
+
         channel = self.make_request(
             "POST",
-            "/rooms/!abc:beep/receipt/m.read/$def",
+            f"/rooms/!abc:beep/receipt/m.read/{event_id}",
             content={},
             access_token=self.owner_tok,
         )

From 5eb0ef2f122b56f7b7ecbd8c08dd7f37a70afaa1 Mon Sep 17 00:00:00 2001
From: Nick Barrett <nick@beeper.com>
Date: Sat, 4 Feb 2023 16:46:34 +0100
Subject: [PATCH 4/4] Add changelog file

---
 changelog.d/13918.misc | 1 +
 1 file changed, 1 insertion(+)
 create mode 100644 changelog.d/13918.misc

diff --git a/changelog.d/13918.misc b/changelog.d/13918.misc
new file mode 100644
index 000000000000..b03f6f42e5fd
--- /dev/null
+++ b/changelog.d/13918.misc
@@ -0,0 +1 @@
+Use new receipts column to optimise receipt and push action SQL queries. Contributed by Nick @ Beeper (@fizzadar).