diff --git a/state/event_table.go b/state/event_table.go index 1f152969..6bea7114 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -317,6 +317,25 @@ func (t *EventTable) LatestEventInRooms(txn *sqlx.Tx, roomIDs []string, highestN return } +func (t *EventTable) LatestEventNIDInRooms(roomIDs []string, highestNID int64) (roomToNID map[string]int64, err error) { + // the position (event nid) may be for a random different room, so we need to find the highest nid <= this position for this room + var events []Event + err = t.db.Select( + &events, + `SELECT event_nid, room_id FROM syncv3_events + WHERE event_nid IN (SELECT max(event_nid) FROM syncv3_events WHERE event_nid <= $1 AND room_id = ANY($2) GROUP BY room_id)`, + highestNID, pq.StringArray(roomIDs), + ) + if err == sql.ErrNoRows { + err = nil + } + roomToNID = make(map[string]int64) + for _, ev := range events { + roomToNID[ev.RoomID] = ev.NID + } + return +} + func (t *EventTable) SelectEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error) { var events []Event err := txn.Select(&events, `SELECT event_nid, event FROM syncv3_events WHERE event_nid > $1 AND event_nid <= $2 AND room_id = $3 ORDER BY event_nid ASC LIMIT $4`, diff --git a/state/event_table_test.go b/state/event_table_test.go index 246c2b76..13eb1c7e 100644 --- a/state/event_table_test.go +++ b/state/event_table_test.go @@ -4,8 +4,10 @@ import ( "bytes" "database/sql" "fmt" + "reflect" "testing" + "github.com/jmoiron/sqlx" "github.com/tidwall/gjson" "github.com/matrix-org/sliding-sync/sqlutil" @@ -871,6 +873,93 @@ func TestRemoveUnsignedTXNID(t *testing.T) { } } +func TestLatestEventNIDInRooms(t *testing.T) { + db, close := connectToDB(t) + defer close() + table := NewEventTable(db) + + var result map[string]int64 + var err error + // Insert the following: + // - Room FIRST: [N] + // - Room SECOND: [N+1, N+2, N+3] (replace) + // - Room THIRD: [N+4] (max) + first := "!FIRST" + second := "!SECOND" + third := "!THIRD" + err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error { + result, err = table.Insert(txn, []Event{ + { + ID: "$N", + Type: "message", + RoomID: first, + JSON: []byte(`{}`), + }, + { + ID: "$N+1", + Type: "message", + RoomID: second, + JSON: []byte(`{}`), + }, + { + ID: "$N+2", + Type: "message", + RoomID: second, + JSON: []byte(`{}`), + }, + { + ID: "$N+3", + Type: "message", + RoomID: second, + JSON: []byte(`{}`), + }, + { + ID: "$N+4", + Type: "message", + RoomID: third, + JSON: []byte(`{}`), + }, + }, false) + return err + }) + assertNoError(t, err) + + testCases := []struct { + roomIDs []string + highestNID int64 + wantMap map[string]string + }{ + // We should see FIRST=N, SECOND=N+3, THIRD=N+4 when querying LatestEventNIDInRooms with N+4 + { + roomIDs: []string{first, second, third}, + highestNID: result["$N+4"], + wantMap: map[string]string{ + first: "$N", second: "$N+3", third: "$N+4", + }, + }, + // We should see FIRST=N, SECOND=N+2 when querying LatestEventNIDInRooms with N+2 + { + roomIDs: []string{first, second, third}, + highestNID: result["$N+2"], + wantMap: map[string]string{ + first: "$N", second: "$N+2", + }, + }, + } + for _, tc := range testCases { + gotRoomToNID, err := table.LatestEventNIDInRooms(tc.roomIDs, int64(tc.highestNID)) + assertNoError(t, err) + want := make(map[string]int64) // map event IDs to nids + for roomID, eventID := range tc.wantMap { + want[roomID] = int64(result[eventID]) + } + if !reflect.DeepEqual(gotRoomToNID, want) { + t.Errorf("%+v: got %v want %v", tc, gotRoomToNID, want) + } + } + +} + func TestEventTableSelectUnknownEventIDs(t *testing.T) { db, close := connectToDB(t) defer close() diff --git a/state/storage.go b/state/storage.go index d18c60f3..5c4dad14 100644 --- a/state/storage.go +++ b/state/storage.go @@ -32,6 +32,12 @@ type StartupSnapshot struct { AllJoinedMembers map[string][]string // room_id -> [user_id] } +type LatestEvents struct { + Timeline []json.RawMessage + PrevBatch string + LatestNID int64 +} + type Storage struct { Accumulator *Accumulator EventsTable *EventTable @@ -584,16 +590,16 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str return } -func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string][]json.RawMessage, map[string]string, error) { +func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error) { roomIDToRanges, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to) if err != nil { - return nil, nil, err + return nil, err } - result := make(map[string][]json.RawMessage, len(roomIDs)) - prevBatches := make(map[string]string, len(roomIDs)) + result := make(map[string]*LatestEvents, len(roomIDs)) err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error { for roomID, ranges := range roomIDToRanges { var earliestEventNID int64 + var latestEventNID int64 var roomEvents []json.RawMessage // start at the most recent range as we want to return the most recent `limit` events for i := len(ranges) - 1; i >= 0; i-- { @@ -608,6 +614,9 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, } // keep pushing to the front so we end up with A,B,C for _, ev := range events { + if latestEventNID == 0 { // set first time and never again + latestEventNID = ev.NID + } roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...) earliestEventNID = ev.NID if len(roomEvents) >= limit { @@ -615,19 +624,23 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, } } } + latestEvents := LatestEvents{ + LatestNID: latestEventNID, + Timeline: roomEvents, + } if earliestEventNID != 0 { // the oldest event needs a prev batch token, so find one now prevBatch, err := s.EventsTable.SelectClosestPrevBatch(roomID, earliestEventNID) if err != nil { return fmt.Errorf("failed to select prev_batch for room %s : %s", roomID, err) } - prevBatches[roomID] = prevBatch + latestEvents.PrevBatch = prevBatch } - result[roomID] = roomEvents + result[roomID] = &latestEvents } return nil }) - return result, prevBatches, err + return result, err } func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) { @@ -641,7 +654,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin return nil, fmt.Errorf("VisibleEventNIDsBetweenForRooms.SelectEventsWithTypeStateKeyInRooms: %s", err) } } - joinTimingsByRoomID, err := s.determineJoinedRoomsFromMemberships(membershipEvents) + joinTimingsAtFromByRoomID, err := s.determineJoinedRoomsFromMemberships(membershipEvents) if err != nil { return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err) } @@ -652,7 +665,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin return nil, fmt.Errorf("failed to load membership events: %s", err) } - return s.visibleEventNIDsWithData(joinTimingsByRoomID, membershipEvents, userID, from, to) + return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to) } // Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position, @@ -682,7 +695,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin // - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites) func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error) { // load *ALL* joined rooms for this user at from (inclusive) - joinTimingsByRoomID, err := s.JoinedRoomsAfterPosition(userID, from) + joinTimingsAtFromByRoomID, err := s.JoinedRoomsAfterPosition(userID, from) if err != nil { return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err) } @@ -693,10 +706,10 @@ func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[st return nil, fmt.Errorf("failed to load membership events: %s", err) } - return s.visibleEventNIDsWithData(joinTimingsByRoomID, membershipEvents, userID, from, to) + return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to) } -func (s *Storage) visibleEventNIDsWithData(joinTimingsByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) { +func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) { // load membership events in order and bucket based on room ID roomIDToLogs := make(map[string][]membershipEvent) for _, ev := range membershipEvents { @@ -758,7 +771,7 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsByRoomID map[string]intern // For each joined room, perform the algorithm and delete the logs afterwards result := make(map[string][][2]int64) - for joinedRoomID, _ := range joinTimingsByRoomID { + for joinedRoomID, _ := range joinTimingsAtFromByRoomID { roomResult := calculateVisibleEventNIDs(true, from, to, roomIDToLogs[joinedRoomID]) result[joinedRoomID] = roomResult delete(roomIDToLogs, joinedRoomID) diff --git a/sync3/caches/global.go b/sync3/caches/global.go index 28e9fd72..36c2867a 100644 --- a/sync3/caches/global.go +++ b/sync3/caches/global.go @@ -58,7 +58,7 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C // Dispatcher for new events. type GlobalCache struct { // LoadJoinedRoomsOverride allows tests to mock out the behaviour of LoadJoinedRooms. - LoadJoinedRoomsOverride func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, err error) + LoadJoinedRoomsOverride func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, latestNIDs map[string]int64, err error) // inserts are done by v2 poll loops, selects are done by v3 request threads // there are lots of overlapping keys as many users (threads) can be joined to the same room (key) @@ -135,23 +135,37 @@ func (c *GlobalCache) copyRoom(roomID string) *internal.RoomMetadata { // The two maps returned by this function have exactly the same set of keys. Each is nil // iff a non-nil error is returned. // TODO: remove with LoadRoomState? +// FIXME: return args are a mess func (c *GlobalCache) LoadJoinedRooms(ctx context.Context, userID string) ( - pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimingByRoomID map[string]internal.EventMetadata, err error, + pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimingByRoomID map[string]internal.EventMetadata, + latestNIDs map[string]int64, err error, ) { if c.LoadJoinedRoomsOverride != nil { return c.LoadJoinedRoomsOverride(userID) } initialLoadPosition, err := c.store.LatestEventNID() if err != nil { - return 0, nil, nil, err + return 0, nil, nil, nil, err } joinTimingByRoomID, err = c.store.JoinedRoomsAfterPosition(userID, initialLoadPosition) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, nil, err } + roomIDs := make([]string, len(joinTimingByRoomID)) + i := 0 + for roomID := range joinTimingByRoomID { + roomIDs[i] = roomID + i++ + } + + latestNIDs, err = c.store.EventsTable.LatestEventNIDInRooms(roomIDs, initialLoadPosition) + if err != nil { + return 0, nil, nil, nil, err + } + // TODO: no guarantee that this state is the same as latest unless called in a dispatcher loop rooms := c.LoadRoomsFromMap(ctx, joinTimingByRoomID) - return initialLoadPosition, rooms, joinTimingByRoomID, nil + return initialLoadPosition, rooms, joinTimingByRoomID, latestNIDs, nil } func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPosition int64, evType, stateKey string) json.RawMessage { diff --git a/sync3/caches/user.go b/sync3/caches/user.go index ee259e6c..efd7aed8 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -42,9 +42,9 @@ type UserRoomData struct { HighlightCount int Invite *InviteData - // these fields are set by LazyLoadTimelines and are per-function call, and are not persisted in-memory. - RequestedPrevBatch string - RequestedTimeline []json.RawMessage + // this field is set by LazyLoadTimelines and is per-function call, and is not persisted in-memory. + // The zero value of this safe to use (0 latest nid, no prev batch, no timeline). + RequestedLatestEvents state.LatestEvents // TODO: should Canonicalised really be in RoomConMetadata? It's only set in SetRoom AFAICS CanonicalisedName string // stripped leading symbols like #, all in lower case @@ -218,7 +218,7 @@ func (c *UserCache) Unsubscribe(id int) { func (c *UserCache) OnRegistered(ctx context.Context) error { // select all spaces the user is a part of to seed the cache correctly. This has to be done in // the OnRegistered callback which has locking guarantees. This is why... - _, joinedRooms, joinTimings, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID) + _, joinedRooms, joinTimings, _, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID) if err != nil { return fmt.Errorf("failed to load joined rooms: %s", err) } @@ -295,7 +295,7 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents) } result := make(map[string]UserRoomData) - roomIDToEvents, roomIDToPrevBatch, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents) + roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents) if err != nil { logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) @@ -303,16 +303,14 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID } c.roomToDataMu.Lock() for _, requestedRoomID := range roomIDs { - events := roomIDToEvents[requestedRoomID] + latestEvents := roomIDToLatestEvents[requestedRoomID] urd, ok := c.roomToData[requestedRoomID] if !ok { urd = NewUserRoomData() } - urd.RequestedTimeline = events - if len(events) > 0 { - urd.RequestedPrevBatch = roomIDToPrevBatch[requestedRoomID] + if latestEvents != nil { + urd.RequestedLatestEvents = *latestEvents } - result[requestedRoomID] = urd } c.roomToDataMu.Unlock() diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index 6c8c1a2e..7c92205c 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -30,8 +30,15 @@ type ConnState struct { // "is the user joined to this room?" whereas subscriptions in muxedReq are untrusted. roomSubscriptions map[string]sync3.RoomSubscription // room_id -> subscription - // TODO: remove this as it is unreliable when you have concurrent updates - loadPosition int64 + // This is some event NID which is used to anchor any requests for room data from the database + // to their per-room latest NIDs. It does this by selecting the latest NID for each requested room + // where the NID is <= this anchor value. Note that there are no ordering guarantees here: it's + // possible for the anchor to be higher than room X's latest NID and for this connection to have + // not yet seen room X's latest NID (it'll be sitting in the live buffer). This is why it's important + // that ConnState DOES NOT ignore events based on this value - it must ignore events based on the real + // load position for the room. + // If this value is negative or 0, it means that this connection has not been loaded yet. + anchorLoadPosition int64 // roomID -> latest load pos loadPositions map[string]int64 @@ -58,7 +65,7 @@ func NewConnState( userCache: userCache, userID: userID, deviceID: deviceID, - loadPosition: -1, + anchorLoadPosition: -1, loadPositions: make(map[string]int64), roomSubscriptions: make(map[string]sync3.RoomSubscription), lists: sync3.NewInternalRequestLists(), @@ -71,6 +78,8 @@ func NewConnState( ConnState: cs, updates: make(chan caches.Update, maxPendingEventUpdates), } + // subscribe for updates before loading. We risk seeing dupes but that's fine as load positions + // will stop us double-processing. cs.userCacheID = cs.userCache.Subsribe(cs) return cs } @@ -87,10 +96,13 @@ func NewConnState( // - load() bases its current state based on the latest position, which includes processing of these N events. // - post load() we read N events, processing them a 2nd time. func (s *ConnState) load(ctx context.Context, req *sync3.Request) error { - initialLoadPosition, joinedRooms, joinTimings, err := s.globalCache.LoadJoinedRooms(ctx, s.userID) + initialLoadPosition, joinedRooms, joinTimings, loadPositions, err := s.globalCache.LoadJoinedRooms(ctx, s.userID) if err != nil { return err } + for roomID, pos := range loadPositions { + s.loadPositions[roomID] = pos + } rooms := make([]sync3.RoomConnMetadata, len(joinedRooms)) i := 0 for _, metadata := range joinedRooms { @@ -143,16 +155,21 @@ func (s *ConnState) load(ctx context.Context, req *sync3.Request) error { for _, r := range rooms { s.lists.SetRoom(r) } - s.loadPosition = initialLoadPosition + s.anchorLoadPosition = initialLoadPosition return nil } // OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go) func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool) (*sync3.Response, error) { - if s.loadPosition == -1 { + if s.anchorLoadPosition <= 0 { // load() needs no ctx so drop it _, region := internal.StartSpan(ctx, "load") - s.load(ctx, req) + err := s.load(ctx, req) + if err != nil { + // in practice this means DB hit failures. If we try again later maybe it'll work, and we will because + // anchorLoadPosition is unset. + logger.Err(err).Str("conn", cid.String()).Msg("failed to load initial data") + } region.End() } return s.onIncomingRequest(ctx, req, isInitial) @@ -488,7 +505,7 @@ func (s *ConnState) lazyLoadTypingMembers(ctx context.Context, response *sync3.R continue } // load the state event - memberEvent := s.globalCache.LoadStateEvent(ctx, roomID, s.loadPosition, "m.room.member", typingUserID.Str) + memberEvent := s.globalCache.LoadStateEvent(ctx, roomID, s.loadPositions[roomID], "m.room.member", typingUserID.Str) if memberEvent != nil { room.RequiredState = append(room.RequiredState, memberEvent) s.lazyCache.AddUser(roomID, typingUserID.Str) @@ -505,15 +522,20 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu ctx, span := internal.StartSpan(ctx, "getInitialRoomData") defer span.End() rooms := make(map[string]sync3.Room, len(roomIDs)) - // We want to grab the user room data and the room metadata for each room ID. - roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.loadPosition, roomIDs, int(roomSub.TimelineLimit)) + // We want to grab the user room data and the room metadata for each room ID. We use the globally + // highest NID we've seen to act as an anchor for the request. This anchor does not guarantee that + // events returned here have already been seen - the position is not globally ordered - so because + // room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection + // has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the + // response to this call to assign new load positions for each room. + roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit)) roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...) // prepare lazy loading data structures, txn IDs roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData)) roomToTimeline := make(map[string][]json.RawMessage) for roomID, urd := range roomIDToUserRoomData { set := make(map[string]struct{}) - for _, ev := range urd.RequestedTimeline { + for _, ev := range urd.RequestedLatestEvents.Timeline { set[gjson.GetBytes(ev, "sender").Str] = struct{}{} } userIDs := make([]string, len(set)) @@ -523,11 +545,22 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu i++ } roomToUsersInTimeline[roomID] = userIDs - roomToTimeline[roomID] = urd.RequestedTimeline + roomToTimeline[roomID] = urd.RequestedLatestEvents.Timeline + // remember what we just loaded so if we see these events down the live stream we know to ignore them. + // This means that requesting a direct room subscription causes the connection to jump ahead to whatever + // is in the database at the time of the call, rather than gradually converging by consuming live data. + // This is fine, so long as we jump ahead on a per-room basis. We need to make sure (ideally) that the + // room state is also pinned to the load position here, else you could see weird things in individual + // responses such as an updated room.name without the associated m.room.name event (though this will + // come through on the next request -> it converges to the right state so it isn't critical). + s.loadPositions[roomID] = urd.RequestedLatestEvents.LatestNID } roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline) rsm := roomSub.RequiredStateMap(s.userID) - roomIDToState := s.globalCache.LoadRoomState(ctx, roomIDs, s.loadPosition, rsm, roomToUsersInTimeline) + // by reusing the same global load position anchor here, we can be sure that the state returned here + // matches the timeline we loaded earlier - the race conditions happen around pubsub updates and not + // the events table itself, so whatever position is picked based on this anchor is immutable. + roomIDToState := s.globalCache.LoadRoomState(ctx, roomIDs, s.anchorLoadPosition, rsm, roomToUsersInTimeline) if roomIDToState == nil { // e.g no required_state roomIDToState = make(map[string][]json.RawMessage) } @@ -565,7 +598,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu IsDM: userRoomData.IsDM, JoinedCount: metadata.JoinCount, InvitedCount: metadata.InviteCount, - PrevBatch: userRoomData.RequestedPrevBatch, + PrevBatch: userRoomData.RequestedLatestEvents.PrevBatch, } } diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index 648f650c..6e0a3703 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -117,8 +117,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, roomEventUpdate, _ := up.(*caches.RoomEventUpdate) // if this is a room event update we may not want to process this if the event nid is < loadPos, // as that means we have already taken it into account - if roomEventUpdate != nil && !roomEventUpdate.EventData.AlwaysProcess && roomEventUpdate.EventData.NID < s.loadPosition { - return false + if roomEventUpdate != nil && !roomEventUpdate.EventData.AlwaysProcess { + // check if we should skip this update. Do we know of this room (lp > 0) and if so, is this event + // behind what we've processed before? + lp := s.loadPositions[roomEventUpdate.RoomID()] + if lp > 0 && roomEventUpdate.EventData.NID < lp { + return false + } } // for initial rooms e.g a room comes into the window or a subscription now exists @@ -149,9 +154,6 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, rooms := s.buildRooms(ctx, builder.BuildSubscriptions()) for roomID, room := range rooms { response.Rooms[roomID] = room - // remember what point we snapshotted this room, incase we see live events which we have - // already snapshotted here. - s.loadPositions[roomID] = s.loadPosition } // TODO: find a better way to determine if the triggering event should be included e.g ask the lists? @@ -183,7 +185,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, sender := roomEventUpdate.EventData.Sender if s.lazyCache.IsLazyLoading(roomID) && !s.lazyCache.IsSet(roomID, sender) { // load the state event - memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPosition, "m.room.member", sender) + memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPositions[roomID], "m.room.member", sender) if memberEvent != nil { r.RequiredState = append(r.RequiredState, memberEvent) s.lazyCache.AddUser(roomID, sender) @@ -284,12 +286,9 @@ func (s *connStateLive) processGlobalUpdates(ctx context.Context, builder *Rooms }) } - if isRoomEventUpdate { - // TODO: we should do this check before lists.SetRoom - if roomEventUpdate.EventData.NID <= s.loadPosition { - return // if this update is in the past then ignore it - } - s.loadPosition = roomEventUpdate.EventData.NID + // update the anchor for this new event + if isRoomEventUpdate && roomEventUpdate.EventData.NID > s.anchorLoadPosition { + s.anchorLoadPosition = roomEventUpdate.EventData.NID } return } diff --git a/sync3/handler/connstate_test.go b/sync3/handler/connstate_test.go index 9e8c2d72..bed2beff 100644 --- a/sync3/handler/connstate_test.go +++ b/sync3/handler/connstate_test.go @@ -48,7 +48,7 @@ func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int result := make(map[string]caches.UserRoomData) for _, roomID := range roomIDs { u := caches.NewUserRoomData() - u.RequestedTimeline = []json.RawMessage{[]byte(`{}`)} + u.RequestedLatestEvents.Timeline = []json.RawMessage{[]byte(`{}`)} result[roomID] = u } return result @@ -84,7 +84,7 @@ func TestConnStateInitial(t *testing.T) { roomB.RoomID: {userID}, roomC.RoomID: {userID}, }) - globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, err error) { + globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) { return 1, map[string]*internal.RoomMetadata{ roomA.RoomID: &roomA, roomB.RoomID: &roomB, @@ -93,7 +93,7 @@ func TestConnStateInitial(t *testing.T) { roomA.RoomID: {NID: 123, Timestamp: 123}, roomB.RoomID: {NID: 456, Timestamp: 456}, roomC.RoomID: {NID: 780, Timestamp: 789}, - }, nil + }, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) dispatcher.Register(context.Background(), userCache.UserID, userCache) @@ -102,7 +102,7 @@ func TestConnStateInitial(t *testing.T) { result := make(map[string]caches.UserRoomData) for _, roomID := range roomIDs { u := caches.NewUserRoomData() - u.RequestedTimeline = []json.RawMessage{timeline[roomID]} + u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]} result[roomID] = u } return result @@ -256,7 +256,7 @@ func TestConnStateMultipleRanges(t *testing.T) { roomID: {userID}, }) } - globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, err error) { + globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) { roomMetadata := make(map[string]*internal.RoomMetadata) joinTimings = make(map[string]internal.EventMetadata) for i, r := range rooms { @@ -266,7 +266,7 @@ func TestConnStateMultipleRanges(t *testing.T) { Timestamp: 123456, } } - return 1, roomMetadata, joinTimings, nil + return 1, roomMetadata, joinTimings, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) userCache.LazyRoomDataOverride = mockLazyRoomOverride @@ -433,7 +433,7 @@ func TestBumpToOutsideRange(t *testing.T) { roomC.RoomID: {userID}, roomD.RoomID: {userID}, }) - globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, err error) { + globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) { return 1, map[string]*internal.RoomMetadata{ roomA.RoomID: &roomA, roomB.RoomID: &roomB, @@ -444,7 +444,7 @@ func TestBumpToOutsideRange(t *testing.T) { roomB.RoomID: {NID: 2, Timestamp: 2}, roomC.RoomID: {NID: 3, Timestamp: 3}, roomD.RoomID: {NID: 4, Timestamp: 4}, - }, nil + }, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) @@ -537,7 +537,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) { roomC.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "c"}), roomD.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "d"}), } - globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, err error) { + globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) { return 1, map[string]*internal.RoomMetadata{ roomA.RoomID: &roomA, roomB.RoomID: &roomB, @@ -548,14 +548,14 @@ func TestConnStateRoomSubscriptions(t *testing.T) { roomB.RoomID: {NID: 2, Timestamp: 2}, roomC.RoomID: {NID: 3, Timestamp: 3}, roomD.RoomID: {NID: 4, Timestamp: 4}, - }, nil + }, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData { result := make(map[string]caches.UserRoomData) for _, roomID := range roomIDs { u := caches.NewUserRoomData() - u.RequestedTimeline = []json.RawMessage{timeline[roomID]} + u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]} result[roomID] = u } return result