Skip to content

Commit

Permalink
Merge pull request #369 from matrix-org/kegan/migrate-stuck-invites
Browse files Browse the repository at this point in the history
Migrate stuck invites
  • Loading branch information
kegsay committed Nov 9, 2023
2 parents bb003c1 + 03f6287 commit d04ce7b
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 0 deletions.
83 changes: 83 additions & 0 deletions state/migrations/20231108122539_clear_stuck_invites.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package migrations

import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigrationContext(upClearStuckInvites, downClearStuckInvites)
}

// The purpose of this migration is to find users who have rooms which have
// not been properly processed by the proxy and invalidate their since token
// so they will do an initial sync on the next poller startup. This is specifically
// targeting stuck invites, where there is an invite in the invites table but
// the room is already joined. This is usually (always?) due to missing a create
// event when the room was joined, caused by a synapse bug outlined in
// https://github.com/matrix-org/sliding-sync/issues/367
// This isn't exclusively a problem with invites, though it manifests more clearly there.
func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error {
// The syncv3_unread table is updated any time A) a room is in rooms.join and B) the unread count has changed,
// where nil != 0. Therefore, we can use this table as a proxy for "have we seen a v2 response which has put this
// room into rooms.join"? For every room in rooms.join, we should have seen a create event for it, and hence have
// an entry in syncv3_rooms. If we do not have an entry in syncv3_rooms but do have an entry in syncv3_unread, this
// implies we failed to properly store this joined room and therefore the user who the unread marker is for should be
// reset to force an initial sync. On matrix.org, of the users using sliding sync, this will catch around ~1.82% of users
rows, err := tx.QueryContext(ctx, `
SELECT distinct(user_id) FROM syncv3_unread
WHERE room_id NOT IN (
SELECT room_id
FROM syncv3_rooms
)
`)
defer rows.Close()
if err != nil {
return fmt.Errorf("failed to select bad users: %w", err)
}

var usersToInvalidate []string
for rows.Next() {
var userID string
err = rows.Scan(&userID)
if err != nil {
return fmt.Errorf("failed to scan user: %w", err)
}
usersToInvalidate = append(usersToInvalidate, userID)
}
logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users")
if len(usersToInvalidate) < 50 {
logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users")
}

// for each user:
// - reset their since token for all devices
// - remove any outstanding invites (we'll be told about them again when they initial sync)
res, err := tx.ExecContext(ctx, `
UPDATE syncv3_sync2_devices SET since='' WHERE user_id=ANY($1)
`, pq.StringArray(usersToInvalidate))
if err != nil {
return fmt.Errorf("failed to invalidate since tokens: %w", err)
}
ra, _ := res.RowsAffected()
logger.Info().Int64("num_devices", ra).Msg("reset since tokens")

res, err = tx.ExecContext(ctx, `
DELETE FROM syncv3_invites WHERE user_id=ANY($1)
`, pq.StringArray(usersToInvalidate))
if err != nil {
return fmt.Errorf("failed to remove outstanding invites: %w", err)
}
ra, _ = res.RowsAffected()
logger.Info().Int64("num_invites", ra).Msg("reset invites")
return nil
}

func downClearStuckInvites(ctx context.Context, tx *sql.Tx) error {
// we can't roll this back
return nil
}
232 changes: 232 additions & 0 deletions state/migrations/20231108122539_clear_stuck_invites_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package migrations

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync2"
)

func TestClearStuckInvites(t *testing.T) {
db, close := connectToDB(t)
defer close()
roomsTable := state.NewRoomsTable(db)
inviteTable := state.NewInvitesTable(db)
unreadTable := state.NewUnreadTable(db)
deviceTable := sync2.NewDevicesTable(db)
tokensTable := sync2.NewTokensTable(db, "secret")

zero := 0
device1 := "TEST_1"
device2 := "TEST_2"
roomA := "!TestClearStuckInvites_a:localhost"
roomB := "!TestClearStuckInvites_b:localhost"
roomC := "!TestClearStuckInvites_c:localhost"
roomD := "!TestClearStuckInvites_d:localhost"
roomE := "!TestClearStuckInvites_e:localhost"
roomF := "!TestClearStuckInvites_f:localhost"
roomG := "!TestClearStuckInvites_g:localhost"
alice := "@TestClearStuckInvites_alice:localhost"
bob := "@TestClearStuckInvites_bob:localhost"
charlie := "@TestClearStuckInvites_charlie:localhost"
doris := "@TestClearStuckInvites_doris:localhost"
users := []string{
alice, bob, charlie, doris,
}

// Test cases:
// Room | In Invite Table? | In Unread Table? | In Room Table? | Comment
// A Y Y Y OK, Genuine invite, proxy in room
// B Y Y N BAD, Stuck invite, proxy never saw join
// C Y N Y OK, Genuine invite, proxy in room, no unread counts (unusual but valid)
// D Y N N OK, Genuine invite, proxy not in room
// E N Y Y OK, Genuine joined room
// F N Y N BAD, Stuck joined room, proxy never saw join
// G N N Y OK, Genuine joined room, no unread counts (unusual but valid)
// - N N N Impossible, room id isn't in any table!
roomToInfo := map[string]struct {
invitedUser string
unreadUser string
inRoomTable bool
}{
roomA: {
invitedUser: alice,
unreadUser: bob,
inRoomTable: true,
},
roomB: {
invitedUser: bob,
unreadUser: bob,
inRoomTable: false,
},
roomC: {
invitedUser: charlie,
inRoomTable: true,
},
roomD: {
invitedUser: doris,
inRoomTable: false,
},
roomE: {
unreadUser: alice,
inRoomTable: true,
},
roomF: {
unreadUser: doris,
},
roomG: {
inRoomTable: true,
},
}

err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
for roomID, info := range roomToInfo {
if info.inRoomTable {
err := roomsTable.Upsert(txn, state.RoomInfo{
ID: roomID,
}, 0, 0)
if err != nil {
return fmt.Errorf("Upsert room: %s", err)
}
}
if info.invitedUser != "" {
err := inviteTable.InsertInvite(info.invitedUser, roomID, []json.RawMessage{json.RawMessage(`{}`)})
if err != nil {
return fmt.Errorf("InsertInvite: %s", err)
}
}
if info.unreadUser != "" {
err := unreadTable.UpdateUnreadCounters(info.unreadUser, roomID, &zero, &zero)
if err != nil {
return fmt.Errorf("UpdateUnreadCounters: %s", err)
}
}
}
for _, userID := range users {
for _, deviceID := range []string{device1, device2} {
// each user has 2 devices
if err := deviceTable.InsertDevice(txn, userID, deviceID); err != nil {
return fmt.Errorf("InsertDevice: %s", err)
}
_, err := tokensTable.Insert(txn, userID+deviceID, userID, deviceID, time.Now())
if err != nil {
return fmt.Errorf("TokensTable.Insert: %s", err)
}
}
}
return nil
})
if err != nil {
t.Fatalf("failed to set up test configuration: %s", err)
}
// set since tokens (this is done without a txn hence cannot be bundled in as the UPDATE would fail)
for _, userID := range users {
for i, deviceID := range []string{device1, device2} {
// each user has 2 devices
since := fmt.Sprintf("since_%d", i)
if err := deviceTable.UpdateDeviceSince(userID, deviceID, since); err != nil {
t.Fatalf("UpdateDeviceSince: %s", err)
}
}
}

t.Log("Run the migration.")
tx, err := db.Beginx()
if err != nil {
t.Fatal(err)
}
if err := upClearStuckInvites(context.Background(), tx.Tx); err != nil {
t.Fatalf("upClearStuckInvites: %s", err)
}
tx.Commit()

// make a new txn for assertions
tx, err = db.Beginx()
if err != nil {
t.Fatal(err)
}

// users in room B (bob) and F (doris) should be reset.
tokens, err := tokensTable.TokenForEachDevice(tx)
if err != nil {
t.Fatalf("TokenForEachDevice: %s", err)
}
wantResults := map[[2]string]struct {
wantSinceReset bool
}{
{bob, device1}: {
wantSinceReset: true,
},
{bob, device2}: {
wantSinceReset: true,
},
{doris, device1}: {
wantSinceReset: true,
},
{doris, device2}: {
wantSinceReset: true,
},
// everyone else should NOT have since reset
{alice, device1}: {
wantSinceReset: false,
},
{alice, device2}: {
wantSinceReset: false,
},
{charlie, device1}: {
wantSinceReset: false,
},
{charlie, device2}: {
wantSinceReset: false,
},
}
for _, tok := range tokens {
key := [2]string{tok.UserID, tok.DeviceID}
want, ok := wantResults[key]
if !ok {
continue // different user in another test?
}
if want.wantSinceReset && tok.Since != "" {
t.Errorf("%s want since reset, got %+v", key, tok)
}
if !want.wantSinceReset && tok.Since == "" {
t.Errorf("%s did not want since reset, got %+v", key, tok)
}
}
// invites for Bob and Doris are gone
for _, userID := range []string{bob, doris} {
got, err := inviteTable.SelectAllInvitesForUser(userID)
if err != nil {
t.Fatalf("SelectAllInvitesForUser: %s", err)
}
if len(got) > 0 {
t.Fatalf("SelectAllInvitesForUser got invites for %s, wanted none: %+v", userID, got)
}
}
// ensure other invites remain
wantInvites := map[string][]string{
alice: {roomA},
charlie: {roomC},
}
for userID, wantInvitesRooms := range wantInvites {
got, err := inviteTable.SelectAllInvitesForUser(userID)
if err != nil {
t.Fatalf("SelectAllInvitesForUser: %s", err)
}
if len(got) != len(wantInvitesRooms) {
t.Fatalf("SelectAllInvitesForUser got %d invites for %s, wanted %d", len(got), userID, len(wantInvitesRooms))
}
for _, wantRoom := range wantInvitesRooms {
_, exists := got[wantRoom]
if !exists {
t.Fatalf("SelectAllInvitesForUser wanted invite for %s in %s, but it's missing", userID, wantRoom)
}
}
}
}

0 comments on commit d04ce7b

Please sign in to comment.