diff --git a/state/migrations/20231108122539_clear_stuck_invites.go b/state/migrations/20231108122539_clear_stuck_invites.go new file mode 100644 index 00000000..acd4afc8 --- /dev/null +++ b/state/migrations/20231108122539_clear_stuck_invites.go @@ -0,0 +1,83 @@ +package migrations + +import ( + "context" + "database/sql" + "fmt" + + "github.com/lib/pq" + "github.com/pressly/goose/v3" +) + +func init() { + goose.AddMigrationContext(upClearStuckInvites, downClearStuckInvites) +} + +// The purpose of this migration is to find users who have rooms which have +// not been properly processed by the proxy and invalidate their since token +// so they will do an initial sync on the next poller startup. This is specifically +// targeting stuck invites, where there is an invite in the invites table but +// the room is already joined. This is usually (always?) due to missing a create +// event when the room was joined, caused by a synapse bug outlined in +// https://github.com/matrix-org/sliding-sync/issues/367 +// This isn't exclusively a problem with invites, though it manifests more clearly there. +func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { + // The syncv3_unread table is updated any time A) a room is in rooms.join and B) the unread count has changed, + // where nil != 0. Therefore, we can use this table as a proxy for "have we seen a v2 response which has put this + // room into rooms.join"? For every room in rooms.join, we should have seen a create event for it, and hence have + // an entry in syncv3_rooms. If we do not have an entry in syncv3_rooms but do have an entry in syncv3_unread, this + // implies we failed to properly store this joined room and therefore the user who the unread marker is for should be + // reset to force an initial sync. On matrix.org, of the users using sliding sync, this will catch around ~1.82% of users + rows, err := tx.QueryContext(ctx, ` + SELECT distinct(user_id) FROM syncv3_unread + WHERE room_id NOT IN ( + SELECT room_id + FROM syncv3_rooms + ) + `) + defer rows.Close() + if err != nil { + return fmt.Errorf("failed to select bad users: %w", err) + } + + var usersToInvalidate []string + for rows.Next() { + var userID string + err = rows.Scan(&userID) + if err != nil { + return fmt.Errorf("failed to scan user: %w", err) + } + usersToInvalidate = append(usersToInvalidate, userID) + } + logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") + if len(usersToInvalidate) < 50 { + logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") + } + + // for each user: + // - reset their since token for all devices + // - remove any outstanding invites (we'll be told about them again when they initial sync) + res, err := tx.ExecContext(ctx, ` + UPDATE syncv3_sync2_devices SET since='' WHERE user_id=ANY($1) + `, pq.StringArray(usersToInvalidate)) + if err != nil { + return fmt.Errorf("failed to invalidate since tokens: %w", err) + } + ra, _ := res.RowsAffected() + logger.Info().Int64("num_devices", ra).Msg("reset since tokens") + + res, err = tx.ExecContext(ctx, ` + DELETE FROM syncv3_invites WHERE user_id=ANY($1) + `, pq.StringArray(usersToInvalidate)) + if err != nil { + return fmt.Errorf("failed to remove outstanding invites: %w", err) + } + ra, _ = res.RowsAffected() + logger.Info().Int64("num_invites", ra).Msg("reset invites") + return nil +} + +func downClearStuckInvites(ctx context.Context, tx *sql.Tx) error { + // we can't roll this back + return nil +} diff --git a/state/migrations/20231108122539_clear_stuck_invites_test.go b/state/migrations/20231108122539_clear_stuck_invites_test.go new file mode 100644 index 00000000..616fd62e --- /dev/null +++ b/state/migrations/20231108122539_clear_stuck_invites_test.go @@ -0,0 +1,232 @@ +package migrations + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/matrix-org/sliding-sync/state" + "github.com/matrix-org/sliding-sync/sync2" +) + +func TestClearStuckInvites(t *testing.T) { + db, close := connectToDB(t) + defer close() + roomsTable := state.NewRoomsTable(db) + inviteTable := state.NewInvitesTable(db) + unreadTable := state.NewUnreadTable(db) + deviceTable := sync2.NewDevicesTable(db) + tokensTable := sync2.NewTokensTable(db, "secret") + + zero := 0 + device1 := "TEST_1" + device2 := "TEST_2" + roomA := "!TestClearStuckInvites_a:localhost" + roomB := "!TestClearStuckInvites_b:localhost" + roomC := "!TestClearStuckInvites_c:localhost" + roomD := "!TestClearStuckInvites_d:localhost" + roomE := "!TestClearStuckInvites_e:localhost" + roomF := "!TestClearStuckInvites_f:localhost" + roomG := "!TestClearStuckInvites_g:localhost" + alice := "@TestClearStuckInvites_alice:localhost" + bob := "@TestClearStuckInvites_bob:localhost" + charlie := "@TestClearStuckInvites_charlie:localhost" + doris := "@TestClearStuckInvites_doris:localhost" + users := []string{ + alice, bob, charlie, doris, + } + + // Test cases: + // Room | In Invite Table? | In Unread Table? | In Room Table? | Comment + // A Y Y Y OK, Genuine invite, proxy in room + // B Y Y N BAD, Stuck invite, proxy never saw join + // C Y N Y OK, Genuine invite, proxy in room, no unread counts (unusual but valid) + // D Y N N OK, Genuine invite, proxy not in room + // E N Y Y OK, Genuine joined room + // F N Y N BAD, Stuck joined room, proxy never saw join + // G N N Y OK, Genuine joined room, no unread counts (unusual but valid) + // - N N N Impossible, room id isn't in any table! + roomToInfo := map[string]struct { + invitedUser string + unreadUser string + inRoomTable bool + }{ + roomA: { + invitedUser: alice, + unreadUser: bob, + inRoomTable: true, + }, + roomB: { + invitedUser: bob, + unreadUser: bob, + inRoomTable: false, + }, + roomC: { + invitedUser: charlie, + inRoomTable: true, + }, + roomD: { + invitedUser: doris, + inRoomTable: false, + }, + roomE: { + unreadUser: alice, + inRoomTable: true, + }, + roomF: { + unreadUser: doris, + }, + roomG: { + inRoomTable: true, + }, + } + + err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error { + for roomID, info := range roomToInfo { + if info.inRoomTable { + err := roomsTable.Upsert(txn, state.RoomInfo{ + ID: roomID, + }, 0, 0) + if err != nil { + return fmt.Errorf("Upsert room: %s", err) + } + } + if info.invitedUser != "" { + err := inviteTable.InsertInvite(info.invitedUser, roomID, []json.RawMessage{json.RawMessage(`{}`)}) + if err != nil { + return fmt.Errorf("InsertInvite: %s", err) + } + } + if info.unreadUser != "" { + err := unreadTable.UpdateUnreadCounters(info.unreadUser, roomID, &zero, &zero) + if err != nil { + return fmt.Errorf("UpdateUnreadCounters: %s", err) + } + } + } + for _, userID := range users { + for _, deviceID := range []string{device1, device2} { + // each user has 2 devices + if err := deviceTable.InsertDevice(txn, userID, deviceID); err != nil { + return fmt.Errorf("InsertDevice: %s", err) + } + _, err := tokensTable.Insert(txn, userID+deviceID, userID, deviceID, time.Now()) + if err != nil { + return fmt.Errorf("TokensTable.Insert: %s", err) + } + } + } + return nil + }) + if err != nil { + t.Fatalf("failed to set up test configuration: %s", err) + } + // set since tokens (this is done without a txn hence cannot be bundled in as the UPDATE would fail) + for _, userID := range users { + for i, deviceID := range []string{device1, device2} { + // each user has 2 devices + since := fmt.Sprintf("since_%d", i) + if err := deviceTable.UpdateDeviceSince(userID, deviceID, since); err != nil { + t.Fatalf("UpdateDeviceSince: %s", err) + } + } + } + + t.Log("Run the migration.") + tx, err := db.Beginx() + if err != nil { + t.Fatal(err) + } + if err := upClearStuckInvites(context.Background(), tx.Tx); err != nil { + t.Fatalf("upClearStuckInvites: %s", err) + } + tx.Commit() + + // make a new txn for assertions + tx, err = db.Beginx() + if err != nil { + t.Fatal(err) + } + + // users in room B (bob) and F (doris) should be reset. + tokens, err := tokensTable.TokenForEachDevice(tx) + if err != nil { + t.Fatalf("TokenForEachDevice: %s", err) + } + wantResults := map[[2]string]struct { + wantSinceReset bool + }{ + {bob, device1}: { + wantSinceReset: true, + }, + {bob, device2}: { + wantSinceReset: true, + }, + {doris, device1}: { + wantSinceReset: true, + }, + {doris, device2}: { + wantSinceReset: true, + }, + // everyone else should NOT have since reset + {alice, device1}: { + wantSinceReset: false, + }, + {alice, device2}: { + wantSinceReset: false, + }, + {charlie, device1}: { + wantSinceReset: false, + }, + {charlie, device2}: { + wantSinceReset: false, + }, + } + for _, tok := range tokens { + key := [2]string{tok.UserID, tok.DeviceID} + want, ok := wantResults[key] + if !ok { + continue // different user in another test? + } + if want.wantSinceReset && tok.Since != "" { + t.Errorf("%s want since reset, got %+v", key, tok) + } + if !want.wantSinceReset && tok.Since == "" { + t.Errorf("%s did not want since reset, got %+v", key, tok) + } + } + // invites for Bob and Doris are gone + for _, userID := range []string{bob, doris} { + got, err := inviteTable.SelectAllInvitesForUser(userID) + if err != nil { + t.Fatalf("SelectAllInvitesForUser: %s", err) + } + if len(got) > 0 { + t.Fatalf("SelectAllInvitesForUser got invites for %s, wanted none: %+v", userID, got) + } + } + // ensure other invites remain + wantInvites := map[string][]string{ + alice: {roomA}, + charlie: {roomC}, + } + for userID, wantInvitesRooms := range wantInvites { + got, err := inviteTable.SelectAllInvitesForUser(userID) + if err != nil { + t.Fatalf("SelectAllInvitesForUser: %s", err) + } + if len(got) != len(wantInvitesRooms) { + t.Fatalf("SelectAllInvitesForUser got %d invites for %s, wanted %d", len(got), userID, len(wantInvitesRooms)) + } + for _, wantRoom := range wantInvitesRooms { + _, exists := got[wantRoom] + if !exists { + t.Fatalf("SelectAllInvitesForUser wanted invite for %s in %s, but it's missing", userID, wantRoom) + } + } + } +}