Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make extension scoping work with only one field #149

Merged
merged 16 commits into from
Aug 16, 2023
9 changes: 8 additions & 1 deletion sync3/extensions/account_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ func TestLiveAccountDataAggregation(t *testing.T) {
ext := &AccountDataRequest{
Core: Core{
Enabled: &boolTrue,
Lists: []string{"*"},
Rooms: []string{"*"},
},
}
var res Response
var extCtx Context
var extCtx = Context{
AllSubscribedRooms: []string{roomA},
}
room1 := &caches.RoomAccountDataUpdate{
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
Expand Down Expand Up @@ -78,6 +82,9 @@ func TestLiveAccountDataAggregation(t *testing.T) {
room1.AccountData[0].Data, room1.AccountData[1].Data,
},
}
if res.AccountData == nil {
t.Fatalf("Didn't get account data: %v", res)
}
if !reflect.DeepEqual(res.AccountData.Rooms, wantRoomAccountData) {
t.Fatalf("got %+v\nwant %+v", res.AccountData.Rooms, wantRoomAccountData)
}
Expand Down
64 changes: 56 additions & 8 deletions sync3/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type GenericRequest interface {
OnlyLists() []string
// Returns the value of the `rooms` JSON key. nil for "not specified".
OnlyRooms() []string
// InterpretAsInitial interprets this as an initial request rather than a delta, and
// overwrites fields accordingly. This can be useful when fields have default
// values, but is a little ugly. Use sparingly.
InterpretAsInitial()
Comment on lines +30 to +33
Copy link
Contributor Author

@DMRobertson DMRobertson Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of this ugliness comes from the fact that we use the same struct type to represent a blob of data D and an optional change to that blob of data.

// Overwrite fields in the request by side-effecting on this struct.
ApplyDelta(next GenericRequest)
// ProcessInitial provides a means for extensions to return data to clients immediately.
Expand Down Expand Up @@ -77,6 +81,17 @@ func (r *Core) OnlyRooms() []string {
return r.Rooms
}

func (r *Core) InterpretAsInitial() {
// An omitted/nil value for lists and rooms normally means "no change".
// If this extension has never been specified before, nil means "all lists/rooms".
if r.Lists == nil {
r.Lists = []string{"*"}
}
if r.Rooms == nil {
r.Rooms = []string{"*"}
}
}

kegsay marked this conversation as resolved.
Show resolved Hide resolved
func (r *Core) ApplyDelta(gnext GenericRequest) {
if gnext == nil {
return
Expand All @@ -100,22 +115,28 @@ func (r *Core) ApplyDelta(gnext GenericRequest) {
// according to the "core" extension scoping logic. Extensions are free to suppress
// updates for a room based on additional criteria.
func (r *Core) RoomInScope(roomID string, extCtx Context) bool {
// If the extension hasn't had its scope configured, process everything.
if r.Lists == nil && r.Rooms == nil {
return true
// First determine which rooms the extension is monitoring outside of any sliding windows.
roomsToMonitor := r.Rooms
if len(roomsToMonitor) > 0 && roomsToMonitor[0] == "*" {
roomsToMonitor = extCtx.AllSubscribedRooms
}

// If this extension has been explicitly subscribed to this room, process the update.
for _, roomInScope := range r.Rooms {
// Process the update if this room is one of those monitored rooms.
for _, roomInScope := range roomsToMonitor {
if roomInScope == roomID {
return true
}
}

// If the room belongs to one of the lists that this extension should process, process the update.
// Next determine which lists the extension is monitoring.
listsToMonitor := r.Lists
if len(listsToMonitor) > 0 && listsToMonitor[0] == "*" {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
listsToMonitor = extCtx.AllLists
}

// Process the update if the room is visible in one of those lists.
visibleInLists := extCtx.RoomIDsToLists[roomID]
for _, visibleInList := range visibleInLists {
for _, shouldProcessList := range r.Lists {
for _, shouldProcessList := range listsToMonitor {
if visibleInList == shouldProcessList {
return true
}
Expand Down Expand Up @@ -175,6 +196,8 @@ func (r Request) EnabledExtensions() (exts []GenericRequest) {
return
}

// ApplyDelta applies the `next` request as a delta atop the previous Request r, and
// returns the result as a new Request.
func (r Request) ApplyDelta(next *Request) Request {
currFields := r.fields()
nextFields := next.fields()
Expand All @@ -187,6 +210,7 @@ func (r Request) ApplyDelta(next *Request) Request {
}
if isNil(curr) {
// the next field is what we want to apply
next.InterpretAsInitial()
currFields[i] = next
hasChanges = true
} else {
Expand All @@ -202,6 +226,24 @@ func (r Request) ApplyDelta(next *Request) Request {
return r
}

func (r *Request) InterpretAsInitial() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this is supposed to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above the declaration of this in the interface. https://github.com/matrix-org/sliding-sync/pull/149/files#r1232127907

To illustrate: suppose I receive a request whose account data extension is configured with

{
   "enabled": "true"
}

If this is a brand-new request, the proxy should process this in two steps:

  1. Use the initial sticky parameters for lists and rooms (i.e. ["*"]).
  2. Set enabled to true

If this is a follow-up to a previous request, the proxy should interpret this as follows

  1. Use the previous sticky values for lists and rooms.
  2. Set enabled to true.

The way I wanted to handle this was something like:

  • On GenericRequest, define a Default() method that returns a new GenericRequest of the same type, filled with initial sticky values.
  • Use this in Request.ApplyDelta to generate curr in the brand-new case when isNil(curr). Then call curr.ApplyDelta(next), just like we already do in the follow-up case.

The problem is that I couldn't figure out a way to do the first bullet. It felt like I was fighting the language. So instead, InterpretAsInitial has some extra logic to mutate next in place. The intention is that
Default().ApplyDelta(next) would be equal to next after running next.InterpretAsInitial().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTAOD I'm not very happy with InterpretAsInitial. If there is some nice way to implement Default() or similar I'm all ears.

if r.ToDevice != nil {
r.ToDevice.InterpretAsInitial()
}
if r.E2EE != nil {
r.E2EE.InterpretAsInitial()
}
if r.AccountData != nil {
r.AccountData.InterpretAsInitial()
}
if r.Typing != nil {
r.Typing.InterpretAsInitial()
}
if r.Receipts != nil {
r.Receipts.InterpretAsInitial()
}
}

// Response represents the top-level `extensions` key in the JSON response.
//
// To add a new extension, add a field here and in fields().
Expand Down Expand Up @@ -233,6 +275,8 @@ func (r Response) HasData(isInitial bool) bool {
return false
}

// Context is a summary of useful information about the sync3.Request and the state of
// the requester's connection.
type Context struct {
*Handler
// RoomIDToTimeline is a map from room IDs to slices of event IDs. The keys are the
Expand All @@ -253,6 +297,10 @@ type Context struct {
// enclose those sliding windows. Values should be nonnil and nonempty, and may
// contain multiple list names.
RoomIDsToLists map[string][]string
// AllLists is the slice of list names provided to the Sliding Window API.
AllLists []string
// AllSubscribedRooms is the slice of room IDs provided to the Room Subscription API.
AllSubscribedRooms []string
}

type HandlerInterface interface {
Expand Down
2 changes: 2 additions & 0 deletions sync3/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func TestExtension_ApplyDelta(t *testing.T) {
AccountData: &AccountDataRequest{
Core: Core{
Enabled: &boolTrue,
Lists: []string{"*"},
Rooms: []string{"*"},
},
},
},
Expand Down
6 changes: 5 additions & 1 deletion sync3/extensions/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ func TestLiveReceiptsAggregation(t *testing.T) {
ext := &ReceiptsRequest{
Core: Core{
Enabled: &boolTrue,
Lists: []string{"*"},
Rooms: []string{"*"},
kegsay marked this conversation as resolved.
Show resolved Hide resolved
},
}
var res Response
var extCtx Context
extCtx := Context{
AllSubscribedRooms: []string{roomA, roomB},
}
receiptA1 := &caches.ReceiptUpdate{
Receipt: internal.Receipt{
RoomID: roomA,
Expand Down
8 changes: 6 additions & 2 deletions sync3/extensions/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ func TestLiveTypingAggregation(t *testing.T) {
ext := &TypingRequest{
Core: Core{
Enabled: &boolTrue,
Lists: []string{"*"},
Rooms: []string{"*"},
},
}
var res Response
var extCtx Context
extCtx := Context{
AllSubscribedRooms: []string{roomA, roomB, roomC},
}
typingA1 := &caches.TypingUpdate{
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
Expand Down Expand Up @@ -83,6 +87,6 @@ func TestLiveTypingAggregation(t *testing.T) {
ext.AppendLive(ctx, &res, extCtx, eventC1)
want[roomC] = eventC1.GlobalRoomMetadata().TypingEvent
if !reflect.DeepEqual(res.Typing.Rooms, want) {
t.Fatalf("got %+v\nwant %+v", res.Typing.Rooms, want)
t.Fatalf("got %s\nwant %s", res.Typing.Rooms, want)
}
}
24 changes: 20 additions & 4 deletions sync3/handler/connstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,13 @@ func (s *ConnState) onIncomingRequest(reqCtx context.Context, req *sync3.Request
// is being notified about (e.g. for room account data)
extCtx, region := internal.StartSpan(reqCtx, "extensions")
response.Extensions = s.extensionsHandler.Handle(extCtx, s.muxedReq.Extensions, extensions.Context{
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
IsInitial: isInitial,
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
IsInitial: isInitial,
RoomIDsToLists: s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists),
AllSubscribedRooms: keys(s.roomSubscriptions),
AllLists: s.muxedReq.ListKeys(),
})
region.End()

Expand Down Expand Up @@ -758,3 +761,16 @@ func clampSliceRangeToListSize(ctx context.Context, r [2]int64, totalRooms int64
return [2]int64{r[0], lastIndexWithRoom}
}
}

// Returns a slice containing copies of the keys of the given map, in no particular
// order.
func keys[K comparable, V any](m map[K]V) []K {
if m == nil {
return nil
}
output := make([]K, len(m))
for key := range m {
output = append(output, key)
}
return output
}
12 changes: 7 additions & 5 deletions sync3/handler/connstate_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,13 @@ func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update,
// pass event to extensions AFTER processing
roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists)
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
IsInitial: false,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDsToLists: roomIDsToLists,
IsInitial: false,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDsToLists: roomIDsToLists,
AllSubscribedRooms: keys(s.roomSubscriptions),
AllLists: s.muxedReq.ListKeys(),
})
}

Expand Down
6 changes: 6 additions & 0 deletions sync3/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"sort"
)

// SliceRanges is a slice of integer pairs [a, b]. Each pair represents the integers x
// in the range a <= x <= b (note: closed at both ends). The slice as a whole represents
// the set of integers x in any of the slice's closed intervals.
//
// Within the slice, pairs are arranged in no particular order. Two pairs may represent
// overlapping ranges of integers; use Valid to test for this.
type SliceRanges [][2]int64

func (r SliceRanges) Valid() bool {
Expand Down
10 changes: 10 additions & 0 deletions sync3/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ type RequestListDelta struct {
// request.
func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestDelta) {
if r == nil {
nextReq.Extensions.InterpretAsInitial()
result = &Request{
Extensions: nextReq.Extensions,
}
Expand Down Expand Up @@ -470,6 +471,15 @@ func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestD
return
}

// ListKeys builds a slice containing the names of the lists this request has defined.
func (r *Request) ListKeys() []string {
listKeys := make([]string, 0, len(r.Lists))
for listKey, _ := range r.Lists {
listKeys = append(listKeys, listKey)
}
return listKeys
}

type RequestFilters struct {
Spaces []string `json:"spaces"`
IsDM *bool `json:"is_dm"`
Expand Down
Loading
Loading