Skip to content

Commit

Permalink
Make LazyLoadTimelines only load timelines
Browse files Browse the repository at this point in the history
Unit tests
  • Loading branch information
David Robertson committed Oct 2, 2023
1 parent 54aaa0a commit 7e48426
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 51 deletions.
45 changes: 20 additions & 25 deletions sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,18 @@ type UserCacheListener interface {
// Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData.
// This data is user-scoped, not global or connection scoped.
type UserCache struct {
LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData
UserID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.RWMutex
id int
store *state.Storage
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
LazyLoadTimelinesOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents
UserID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.RWMutex
id int
store *state.Storage
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
}

func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache {
Expand Down Expand Up @@ -306,34 +306,29 @@ func (c *UserCache) OnRegistered(ctx context.Context) error {
return nil
}

// Load timelines from the database. Uses cached UserRoomData for metadata purposes only.
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
// LazyLoadTimelines loads up to `maxTimelineEvents` from the database, plus other
// timeline-related data. Events from senders ignored by this user are dropped.
// Returns nil on error.
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
_, span := internal.StartSpan(ctx, "LazyLoadTimelines")
defer span.End()
if c.LazyRoomDataOverride != nil {
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
if c.LazyLoadTimelinesOverride != nil {
return c.LazyLoadTimelinesOverride(loadPos, roomIDs, maxTimelineEvents)
}
result := make(map[string]UserRoomData)
result := make(map[string]state.LatestEvents)
roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents)
if err != nil {
logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return nil
}
c.roomToDataMu.Lock()
for _, requestedRoomID := range roomIDs {
latestEvents := roomIDToLatestEvents[requestedRoomID]
urd, ok := c.roomToData[requestedRoomID]
if !ok {
urd = NewUserRoomData()
}
if latestEvents != nil {
latestEvents.DiscardIgnoredMessages(c.ShouldIgnore)
urd.RequestedLatestEvents = *latestEvents
result[requestedRoomID] = *latestEvents
}
result[requestedRoomID] = urd
}
c.roomToDataMu.Unlock()
return result
}

Expand Down
19 changes: 10 additions & 9 deletions sync3/handler/connstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,27 +562,28 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
// room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection
// has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the
// response to this call to assign new load positions for each room.
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))
roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...)
userRoomDatas := s.userCache.LoadRooms(roomIDs...)
timelines := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))

// 1. Prepare lazy loading data structures, txn IDs.
roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData))
roomToUsersInTimeline := make(map[string][]string, len(timelines))
roomToTimeline := make(map[string][]json.RawMessage)
for roomID, urd := range roomIDToUserRoomData {
for roomID, latestEvents := range timelines {
senders := make(map[string]struct{})
for _, ev := range urd.RequestedLatestEvents.Timeline {
for _, ev := range latestEvents.Timeline {
senders[gjson.GetBytes(ev, "sender").Str] = struct{}{}
}
roomToUsersInTimeline[roomID] = keys(senders)
roomToTimeline[roomID] = urd.RequestedLatestEvents.Timeline
roomToTimeline[roomID] = latestEvents.Timeline
// remember what we just loaded so if we see these events down the live stream we know to ignore them.
// This means that requesting a direct room subscription causes the connection to jump ahead to whatever
// is in the database at the time of the call, rather than gradually converging by consuming live data.
// This is fine, so long as we jump ahead on a per-room basis. We need to make sure (ideally) that the
// room state is also pinned to the load position here, else you could see weird things in individual
// responses such as an updated room.name without the associated m.room.name event (though this will
// come through on the next request -> it converges to the right state so it isn't critical).
s.loadPositions[roomID] = urd.RequestedLatestEvents.LatestNID
s.loadPositions[roomID] = latestEvents.LatestNID
}
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline)

Expand All @@ -600,7 +601,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
// since we'll be using the invite_state only.
loadRoomIDs := make([]string, 0, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := roomIDToUserRoomData[roomID]
userRoomData, ok := userRoomDatas[roomID]
if !ok || !userRoomData.IsInvite {
loadRoomIDs = append(loadRoomIDs, roomID)
}
Expand All @@ -617,7 +618,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
// 3. Build sync3.Room structs to return to clients.
rooms := make(map[string]sync3.Room, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := roomIDToUserRoomData[roomID]
userRoomData, ok := userRoomDatas[roomID]
if !ok {
userRoomData = caches.NewUserRoomData()
}
Expand Down Expand Up @@ -683,7 +684,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
IsDM: userRoomData.IsDM,
JoinedCount: metadata.JoinCount,
InvitedCount: &metadata.InviteCount,
PrevBatch: userRoomData.RequestedLatestEvents.PrevBatch,
PrevBatch: timelines[roomID].PrevBatch,
Timestamp: maxTs,
}
if roomSub.IncludeHeroes() && calculated {
Expand Down
35 changes: 18 additions & 17 deletions sync3/handler/connstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/state"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -44,12 +45,12 @@ func newRoomMetadata(roomID string, lastMsgTimestamp spec.Timestamp) internal.Ro
return *m
}

func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{[]byte(`{}`)}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{[]byte(`{}`)},
}
}
return result
}
Expand Down Expand Up @@ -98,12 +99,12 @@ func TestConnStateInitial(t *testing.T) {
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
return 1, roomMetadata, joinTimings, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
Expand Down Expand Up @@ -448,7 +449,7 @@ func TestBumpToOutsideRange(t *testing.T) {

}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
Expand Down Expand Up @@ -551,12 +552,12 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
Expand Down

0 comments on commit 7e48426

Please sign in to comment.