diff --git a/dockerfiles/synapse/homeserver.yaml b/dockerfiles/synapse/homeserver.yaml index b2415afc..fab6922e 100644 --- a/dockerfiles/synapse/homeserver.yaml +++ b/dockerfiles/synapse/homeserver.yaml @@ -107,3 +107,5 @@ experimental_features: msc2403_enabled: true # Enable spaces support spaces_enabled: true + # Enable history backfilling support + msc2716_enabled: true diff --git a/dockerfiles/synapse/workers-shared.yaml b/dockerfiles/synapse/workers-shared.yaml index 8f9b4c8f..b824557a 100644 --- a/dockerfiles/synapse/workers-shared.yaml +++ b/dockerfiles/synapse/workers-shared.yaml @@ -64,5 +64,7 @@ federation_rr_transactions_per_room_per_second: 9999 experimental_features: # Enable knocking support msc2403_enabled: true + # Enable history backfilling support + msc2716_enabled: true # Enable spaces support spaces_enabled: true diff --git a/internal/b/hs_with_application_service.go b/internal/b/hs_with_application_service.go index ba9db923..e3aa059b 100644 --- a/internal/b/hs_with_application_service.go +++ b/internal/b/hs_with_application_service.go @@ -2,7 +2,7 @@ package b // BlueprintHSWithApplicationService who has an application service to interact with var BlueprintHSWithApplicationService = MustValidate(Blueprint{ - Name: "alice", + Name: "hs_with_application_service", Homeservers: []Homeserver{ { Name: "hs1", @@ -21,5 +21,14 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{ }, }, }, + { + Name: "hs2", + Users: []User{ + { + Localpart: "@charlie", + DisplayName: "Charlie", + }, + }, + }, }, }) diff --git a/internal/client/client.go b/internal/client/client.go index a59856b5..06125615 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -132,13 +132,13 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string { // Will time out after CSAPI.SyncUntilTimeout. func (c *CSAPI) SyncUntilTimelineHas(t *testing.T, roomID string, check func(gjson.Result) bool) { t.Helper() - c.SyncUntil(t, "", "rooms.join."+GjsonEscape(roomID)+".timeline.events", check) + c.SyncUntil(t, "", "", "rooms.join."+GjsonEscape(roomID)+".timeline.events", check) } // SyncUntil blocks and continually calls /sync until the `check` function returns true. // If the `check` function fails the test, the failing event will be automatically logged. // Will time out after CSAPI.SyncUntilTimeout. -func (c *CSAPI) SyncUntil(t *testing.T, since, key string, check func(gjson.Result) bool) { +func (c *CSAPI) SyncUntil(t *testing.T, since, filter, key string, check func(gjson.Result) bool) { t.Helper() start := time.Now() checkCounter := 0 @@ -152,6 +152,9 @@ func (c *CSAPI) SyncUntil(t *testing.T, since, key string, check func(gjson.Resu if since != "" { query["since"] = []string{since} } + if filter != "" { + query["filter"] = []string{filter} + } res := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "sync"}, WithQueries(query)) body := ParseJSON(t, res) since = GetJSONFieldStr(t, body, "next_batch") @@ -324,7 +327,7 @@ func (c *CSAPI) DoFunc(t *testing.T, method string, paths []string, opts ...Requ } // NewLoggedClient returns an http.Client which logs requests/responses -func NewLoggedClient(t *testing.T, cli *http.Client) *http.Client { +func NewLoggedClient(t *testing.T, hsName string, cli *http.Client) *http.Client { t.Helper() if cli == nil { cli = &http.Client{ @@ -335,22 +338,23 @@ func NewLoggedClient(t *testing.T, cli *http.Client) *http.Client { if transport == nil { transport = http.DefaultTransport } - cli.Transport = &loggedRoundTripper{t, transport} + cli.Transport = &loggedRoundTripper{t, hsName, transport} return cli } type loggedRoundTripper struct { - t *testing.T - wrap http.RoundTripper + t *testing.T + hsName string + wrap http.RoundTripper } func (t *loggedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { start := time.Now() res, err := t.wrap.RoundTrip(req) if err != nil { - t.t.Logf("%s %s => error: %s (%s)", req.Method, req.URL.Path, err, time.Since(start)) + t.t.Logf("%s %s%s => error: %s (%s)", req.Method, t.hsName, req.URL.Path, err, time.Since(start)) } else { - t.t.Logf("%s %s => %s (%s)", req.Method, req.URL.Path, res.Status, time.Since(start)) + t.t.Logf("%s %s%s => %s (%s)", req.Method, t.hsName, req.URL.Path, res.Status, time.Since(start)) } return res, err } @@ -368,6 +372,29 @@ func GetJSONFieldStr(t *testing.T, body []byte, wantKey string) string { return res.Str } +func GetJSONFieldStringArray(t *testing.T, body []byte, wantKey string) []string { + t.Helper() + + res := gjson.GetBytes(body, wantKey) + + if !res.Exists() { + t.Fatalf("JSONFieldStr: key '%s' missing from %s", wantKey, string(body)) + } + + arrLength := len(res.Array()) + arr := make([]string, arrLength) + i := 0 + res.ForEach(func(key, value gjson.Result) bool { + arr[i] = value.Str + + // Keep iterating + i++ + return true + }) + + return arr +} + // ParseJSON parses a JSON-encoded HTTP Response body into a byte slice func ParseJSON(t *testing.T, res *http.Response) []byte { t.Helper() diff --git a/internal/docker/builder.go b/internal/docker/builder.go index 894e11e6..c4cc912a 100644 --- a/internal/docker/builder.go +++ b/internal/docker/builder.go @@ -476,7 +476,9 @@ func generateASRegistrationYaml(as b.ApplicationService) string { fmt.Sprintf("sender_localpart: %s\n", as.SenderLocalpart) + fmt.Sprintf("rate_limited: %v\n", as.RateLimited) + "namespaces:\n" + - " users: []\n" + + " users:\n" + + " - exclusive: false\n" + + " regex: .*\n" + " rooms: []\n" + " aliases: []\n" } diff --git a/internal/docker/deployment.go b/internal/docker/deployment.go index 6b303afa..0e4082fa 100644 --- a/internal/docker/deployment.go +++ b/internal/docker/deployment.go @@ -53,7 +53,7 @@ func (d *Deployment) Client(t *testing.T, hsName, userID string) *client.CSAPI { UserID: userID, AccessToken: token, BaseURL: dep.BaseURL, - Client: client.NewLoggedClient(t, nil), + Client: client.NewLoggedClient(t, hsName, nil), SyncUntilTimeout: 5 * time.Second, Debug: d.Deployer.debugLogging, } @@ -69,7 +69,7 @@ func (d *Deployment) RegisterUser(t *testing.T, hsName, localpart, password stri } client := &client.CSAPI{ BaseURL: dep.BaseURL, - Client: client.NewLoggedClient(t, nil), + Client: client.NewLoggedClient(t, hsName, nil), SyncUntilTimeout: 5 * time.Second, Debug: d.Deployer.debugLogging, } diff --git a/internal/match/json.go b/internal/match/json.go index d6de20d0..4a2a1133 100644 --- a/internal/match/json.go +++ b/internal/match/json.go @@ -55,25 +55,7 @@ func JSONKeyTypeEqual(wantKey string, wantType gjson.Type) JSON { } } -// JSONCheckOff returns a matcher which will loop over `wantKey` and ensure that the items -// (which can be array elements or object keys) -// are present exactly once in any order in `wantItems`. If there are unexpected items or items -// appear more than once then the match fails. This matcher can be used to check off items in -// an array/object. The `mapper` function should map the item to an interface which will be -// comparable via `reflect.DeepEqual` with items in `wantItems`. The optional `fn` callback -// allows more checks to be performed other than checking off the item from the list. It is -// called with 2 args: the result of the `mapper` function and the element itself (or value if -// it's an object). -// -// Usage: (ensures `events` has these events in any order, with the right event type) -// JSONCheckOff("events", []interface{}{"$foo:bar", "$baz:quuz"}, func(r gjson.Result) interface{} { -// return r.Get("event_id").Str -// }, func(eventID interface{}, eventBody gjson.Result) error { -// if eventBody.Get("type").Str != "m.room.message" { -// return fmt.Errorf("expected event to be 'm.room.message'") -// } -// }) -func JSONCheckOff(wantKey string, wantItems []interface{}, mapper func(gjson.Result) interface{}, fn func(interface{}, gjson.Result) error) JSON { +func jsonCheckOffInternal(wantKey string, wantItems []interface{}, allowUnwantedItems bool, mapper func(gjson.Result) interface{}, fn func(interface{}, gjson.Result) error) JSON { return func(body []byte) error { res := gjson.GetBytes(body, wantKey) if !res.Exists() { @@ -103,12 +85,15 @@ func JSONCheckOff(wantKey string, wantItems []interface{}, mapper func(gjson.Res break } } - if want == -1 { + if !allowUnwantedItems && want == -1 { err = fmt.Errorf("JSONCheckOff: unexpected item %s", item) return false } - // delete the wanted item - wantItems = append(wantItems[:want], wantItems[want+1:]...) + + if want != -1 { + // delete the wanted item + wantItems = append(wantItems[:want], wantItems[want+1:]...) + } // do further checks if fn != nil { @@ -130,6 +115,50 @@ func JSONCheckOff(wantKey string, wantItems []interface{}, mapper func(gjson.Res } } +// JSONCheckOffAllowUnwanted returns a matcher which will loop over `wantKey` and ensure that the items +// (which can be array elements or object keys) +// are present exactly once in any order in `wantItems`. Allows unexpected items or items +// appear that more than once. This matcher can be used to check off items in +// an array/object. The `mapper` function should map the item to an interface which will be +// comparable via `reflect.DeepEqual` with items in `wantItems`. The optional `fn` callback +// allows more checks to be performed other than checking off the item from the list. It is +// called with 2 args: the result of the `mapper` function and the element itself (or value if +// it's an object). +// +// Usage: (ensures `events` has these events in any order, with the right event type) +// JSONCheckOffAllowUnwanted("events", []interface{}{"$foo:bar", "$baz:quuz"}, func(r gjson.Result) interface{} { +// return r.Get("event_id").Str +// }, func(eventID interface{}, eventBody gjson.Result) error { +// if eventBody.Get("type").Str != "m.room.message" { +// return fmt.Errorf("expected event to be 'm.room.message'") +// } +// }) +func JSONCheckOffAllowUnwanted(wantKey string, wantItems []interface{}, mapper func(gjson.Result) interface{}, fn func(interface{}, gjson.Result) error) JSON { + return jsonCheckOffInternal(wantKey, wantItems, true, mapper, fn) +} + +// JSONCheckOff returns a matcher which will loop over `wantKey` and ensure that the items +// (which can be array elements or object keys) +// are present exactly once in any order in `wantItems`. If there are unexpected items or items +// appear more than once then the match fails. This matcher can be used to check off items in +// an array/object. The `mapper` function should map the item to an interface which will be +// comparable via `reflect.DeepEqual` with items in `wantItems`. The optional `fn` callback +// allows more checks to be performed other than checking off the item from the list. It is +// called with 2 args: the result of the `mapper` function and the element itself (or value if +// it's an object). +// +// Usage: (ensures `events` has these events in any order, with the right event type) +// JSONCheckOff("events", []interface{}{"$foo:bar", "$baz:quuz"}, func(r gjson.Result) interface{} { +// return r.Get("event_id").Str +// }, func(eventID interface{}, eventBody gjson.Result) error { +// if eventBody.Get("type").Str != "m.room.message" { +// return fmt.Errorf("expected event to be 'm.room.message'") +// } +// }) +func JSONCheckOff(wantKey string, wantItems []interface{}, mapper func(gjson.Result) interface{}, fn func(interface{}, gjson.Result) error) JSON { + return jsonCheckOffInternal(wantKey, wantItems, false, mapper, fn) +} + // JSONArrayEach returns a matcher which will check that `wantKey` is an array then loops over each // item calling `fn`. If `fn` returns an error, iterating stops and an error is returned. func JSONArrayEach(wantKey string, fn func(gjson.Result) error) JSON { diff --git a/tests/msc2403_test.go b/tests/msc2403_test.go index 1fd5dd4e..a3844686 100644 --- a/tests/msc2403_test.go +++ b/tests/msc2403_test.go @@ -143,6 +143,7 @@ func knockingBetweenTwoUsersTest(t *testing.T, roomID string, inRoomUser, knocki knockingUser.SyncUntil( t, since, + "", "rooms.leave."+client.GjsonEscape(roomID)+".timeline.events", func(ev gjson.Result) bool { if ev.Get("type").Str != "m.room.member" || ev.Get("sender").Str != knockingUser.UserID { @@ -274,6 +275,7 @@ func knockOnRoomSynced(t *testing.T, c *client.CSAPI, roomID, reason string, ser c.SyncUntil( t, "", + "", "rooms.knock."+client.GjsonEscape(roomID)+".knock_state.events", func(ev gjson.Result) bool { // We don't currently define any required state event types to be sent. diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go new file mode 100644 index 00000000..8f99881b --- /dev/null +++ b/tests/msc2716_test.go @@ -0,0 +1,859 @@ +// +build msc2716 + +// This file contains tests for incrementally importing history to an existing room, +// a currently experimental feature defined by MSC2716, which you can read here: +// https://github.com/matrix-org/matrix-doc/pull/2716 + +package tests + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "testing" + "time" + + "github.com/matrix-org/complement/internal/b" + "github.com/matrix-org/complement/internal/client" + "github.com/matrix-org/complement/internal/match" + "github.com/matrix-org/complement/internal/must" + "github.com/tidwall/gjson" +) + +type event struct { + Type string + Sender string + OriginServerTS uint64 + StateKey *string + PrevEvents []string + Content map[string]interface{} +} + +// This is configurable because it can be nice to change it to `time.Second` while +// checking out the test result in a Synapse instance +const timeBetweenMessages = time.Millisecond + +var ( + insertionEventType = "org.matrix.msc2716.insertion" + markerEventType = "org.matrix.msc2716.marker" + + historicalContentField = "org.matrix.msc2716.historical" + nextChunkIDContentField = "org.matrix.msc2716.next_chunk_id" + markerInsertionContentField = "org.matrix.msc2716.marker.insertion" +) + +func TestBackfillingHistory(t *testing.T) { + deployment := Deploy(t, b.BlueprintHSWithApplicationService) + defer deployment.Destroy(t) + + // Create the application service bridge user that is able to backfill messages + asUserID := "@the-bridge-user:hs1" + as := deployment.Client(t, "hs1", asUserID) + + // Create the normal user which will send messages in the room + userID := "@alice:hs1" + alice := deployment.Client(t, "hs1", userID) + + // Create the federated user which will fetch the messages from a remote homeserver + remoteUserID := "@charlie:hs2" + remoteCharlie := deployment.Client(t, "hs2", remoteUserID) + + virtualUserLocalpart := "maria" + virtualUserID := fmt.Sprintf("@%s:hs1", virtualUserLocalpart) + // Register and join the virtual user + ensureVirtualUserRegistered(t, as, virtualUserLocalpart) + + t.Run("parallel", func(t *testing.T) { + // Test that the message events we insert between A and B come back in the correct order from /messages + // + // Final timeline output: ( [n] = historical chunk ) + // (oldest) A, B, [insertion, c, d, e] [insertion, f, g, h, insertion], I, J (newest) + // chunk 1 chunk 0 + t.Run("Backfilled historical events resolve with proper state in correct order", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + // Create some normal messages in the timeline. We're creating them in + // two batches so we can create some time in between where we are going + // to backfill. + // + // Create the first batch including the "live" event we are going to + // insert our backfilled events next to. + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) + eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] + timeAfterEventBefore := time.Now() + + // wait X number of ms to ensure that the timestamp changes enough for + // each of the messages we try to backfill later + numHistoricalMessages := 6 + time.Sleep(time.Duration(numHistoricalMessages) * timeBetweenMessages) + + // Create the second batch of events. + // This will also fill up the buffer so we have to scrollback to the + // inserted history later. + eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2) + + // Insert the most recent chunk of backfilled history + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore.Add(timeBetweenMessages*3), + "", + 3, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + nextChunkID := getNextChunkIdFromBatchSendResponseBody(t, batchSendResBody) + + // Insert another older chunk of backfilled history from the same user. + // Make sure the meta data and joins still work on the subsequent chunk + batchSendRes2 := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + nextChunkID, + 3, + // Status + 200, + ) + batchSendResBody2 := client.ParseJSON(t, batchSendRes2) + historicalEventIDs2 := getEventsFromBatchSendResponseBody(t, batchSendResBody2) + + var expectedEventIDOrder []string + expectedEventIDOrder = append(expectedEventIDOrder, eventIDsBefore...) + expectedEventIDOrder = append(expectedEventIDOrder, historicalEventIDs2...) + expectedEventIDOrder = append(expectedEventIDOrder, historicalEventIDs...) + expectedEventIDOrder = append(expectedEventIDOrder, eventIDsAfter...) + // Order events from newest to oldest + expectedEventIDOrder = reversed(expectedEventIDOrder) + + // 2 eventIDsBefore + [1 insertion event + 2 historical events + 1 insertion event] + [2 historical events + 1 insertion event] + 2 eventIDsAfter + // ^ chunk1 ^ chunk2 + if len(expectedEventIDOrder) != 13 { + t.Fatalf("Expected eventID list should be length 13 but saw %d: %s", len(expectedEventIDOrder), expectedEventIDOrder) + } + + messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + messsageResBody := client.ParseJSON(t, messagesRes) + eventDebugStringsFromResponse := getRelevantEventDebugStringsFromMessagesResponse(t, messsageResBody) + // Since the original body can only be read once, create a new one from the body bytes we just read + messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) + + // Copy the array by slice so we can modify it as we iterate in the foreach loop. + // We save the full untouched `expectedEventIDOrder` for use in the log messages + workingExpectedEventIDOrder := expectedEventIDOrder + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Find all events in order + if isRelevantEvent(r) { + // Pop the next message off the expected list + nextEventIdInOrder := workingExpectedEventIDOrder[0] + workingExpectedEventIDOrder = workingExpectedEventIDOrder[1:] + + if r.Get("event_id").Str != nextEventIdInOrder { + return fmt.Errorf("Next event found was %s but expected %s\nActualEvents (%d): %v\nExpectedEvents (%d): %v", r.Get("event_id").Str, nextEventIdInOrder, len(eventDebugStringsFromResponse), eventDebugStringsFromResponse, len(expectedEventIDOrder), expectedEventIDOrder) + } + } + + return nil + }), + }, + }) + + if len(workingExpectedEventIDOrder) != 0 { + t.Fatalf("Expected all events to be matched in message response but there were some left-over events: %s", workingExpectedEventIDOrder) + } + }) + + t.Run("Backfilled historical events from multiple users in the same chunk", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our backfilled events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // Register and join the other virtual users + virtualUserID2 := "@ricky:hs1" + ensureVirtualUserRegistered(t, as, "ricky") + virtualUserID3 := "@carol:hs1" + ensureVirtualUserRegistered(t, as, "carol") + + // Insert a backfilled event + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID, virtualUserID2, virtualUserID3}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 3, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + + messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + + t.Run("Backfilled historical events with m.historical do not come down in an incremental sync", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our backfilled events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // Create some "live" events to saturate and fill up the /sync response + createMessagesInRoom(t, alice, roomID, 5) + + // Insert a backfilled event + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 1, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + backfilledEventId := historicalEventIDs[0] + + // This is just a dummy event we search for after the backfilledEventId + eventIDsAfterBackfill := createMessagesInRoom(t, alice, roomID, 1) + eventIdAfterBackfill := eventIDsAfterBackfill[0] + + // Sync until we find the eventIdAfterBackfill. If we're able to see the eventIdAfterBackfill + // that occurs after the backfilledEventId without seeing eventIdAfterBackfill in between, + // we're probably safe to assume it won't sync + alice.SyncUntil(t, "", `{ "room": { "timeline": { "limit": 3 } } }`, "rooms.join."+client.GjsonEscape(roomID)+".timeline.events", func(r gjson.Result) bool { + if r.Get("event_id").Str == backfilledEventId { + t.Fatalf("We should not see the %s backfilled event in /sync response but it was present", backfilledEventId) + } + + return r.Get("event_id").Str == eventIdAfterBackfill + }) + }) + + t.Run("Unrecognised prev_event ID will throw an error", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + + batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + "$some-non-existant-event-id", + time.Now(), + "", + 1, + // Status + // TODO: Seems like this makes more sense as a 404 + // But the current Synapse code around unknown prev events will throw -> + // `403: No create event in auth events` + 403, + ) + }) + + t.Run("Normal users aren't allowed to backfill messages", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + batchSendHistoricalMessages( + t, + alice, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 1, + // Status + // Normal user alice should not be able to backfill messages + 403, + ) + }) + + t.Run("TODO: Test if historical avatar/display name set back in time are picked up on historical messages", func(t *testing.T) { + t.Skip("Skipping until implemented") + // TODO: Try adding avatar and displayName and see if historical messages get this info + }) + + t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { + t.Skip("Skipping until federation is implemented") + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) + + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 2, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + + // Join the room from a remote homeserver after the backfilled messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } + + return false + }) + + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + + t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { + t.Skip("Skipping until federation is implemented") + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // Create insertion event in the normal DAG + chunkId := "mynextchunkid123" + insertionEvent := b.Event{ + Type: insertionEventType, + Content: map[string]interface{}{ + nextChunkIDContentField: chunkId, + historicalContentField: true, + }, + } + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, "txn-m123"}, client.WithJSONBody(t, insertionEvent.Content)) + insertionSendBody := client.ParseJSON(t, insertionSendRes) + insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") + // Make sure the insertion event has reached the homeserver + alice.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == insertionEventID + }) + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) + + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + chunkId, + 2, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + + // Join the room from a remote homeserver after the backfilled messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } + + return false + }) + + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + + t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { + t.Skip("Skipping until federation is implemented") + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + // Join the room from a remote homeserver before any backfilled messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 10) + + // Mimic scrollback just through the latest messages + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + // Limited so we can only see a few of the latest messages + "limit": []string{"5"}, + })) + + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 2, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + baseInsertionEventID := historicalEventIDs[len(historicalEventIDs)-1] + + // [1 insertion event + 2 historical events + 1 insertion event] + if len(historicalEventIDs) != 4 { + t.Fatalf("Expected eventID list should be length 15 but saw %d: %s", len(historicalEventIDs), historicalEventIDs) + } + + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse(t, beforeMarkerMesssageResBody) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } + } + + return nil + }), + }, + }) + + // Send a marker event to let all of the homeservers know about the + // insertion point where all of the historical messages are at + markerEvent := b.Event{ + Type: markerEventType, + Content: map[string]interface{}{ + markerInsertionContentField: baseInsertionEventID, + }, + } + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, "txn-m123"}, client.WithJSONBody(t, markerEvent.Content)) + markerSendBody := client.ParseJSON(t, markerSendRes) + markerEventID := client.GetJSONFieldStr(t, markerSendBody, "event_id") + + // Make sure the marker event has reached the remote homeserver + remoteCharlie.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == markerEventID + }) + + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + + t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { + t.Skip("Skipping until federation is implemented") + t.Parallel() + + roomID := as.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "name": "the hangout spot", + }) + alice.JoinRoom(t, roomID, nil) + + // Join the room from a remote homeserver before any backfilled messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) + + // Mimic scrollback to all of the messages + // scrollbackMessagesRes + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Historical messages are inserted where we have already scrolled back to + batchSendRes := batchSendHistoricalMessages( + t, + as, + []string{virtualUserID}, + roomID, + eventIdBefore, + timeAfterEventBefore, + "", + 2, + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := getEventsFromBatchSendResponseBody(t, batchSendResBody) + + // TODO: Send marker event + + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + }) + }) +} + +func makeInterfaceSlice(slice []string) []interface{} { + interfaceSlice := make([]interface{}, len(slice)) + for i := range slice { + interfaceSlice[i] = slice[i] + } + + return interfaceSlice +} + +func reversed(in []string) []string { + out := make([]string, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[len(in)-i-1] + } + return out +} + +func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { + t.Helper() + start := time.Now() + checkCounter := 0 + for { + if time.Since(start) > c.SyncUntilTimeout { + t.Fatalf("fetchMessagesUntilResponseHas timed out. Called check function %d times", checkCounter) + } + + messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + messsageResBody := client.ParseJSON(t, messagesRes) + wantKey := "chunk" + keyRes := gjson.GetBytes(messsageResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + events := keyRes.Array() + for _, ev := range events { + if check(ev) { + return + } + } + + checkCounter++ + // Add a slight delay so we don't hammmer the messages endpoint + time.Sleep(500 * time.Millisecond) + } +} + +func isRelevantEvent(r gjson.Result) bool { + return len(r.Get("content").Get("body").Str) > 0 || r.Get("type").Str == insertionEventType || r.Get("type").Str == markerEventType +} + +func getRelevantEventDebugStringsFromMessagesResponse(t *testing.T, body []byte) (eventIDsFromResponse []string) { + t.Helper() + + wantKey := "chunk" + res := gjson.GetBytes(body, wantKey) + if !res.Exists() { + t.Fatalf("missing key '%s'", wantKey) + } + if !res.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", wantKey, res.Type) + } + + res.ForEach(func(key, r gjson.Result) bool { + if isRelevantEvent(r) { + eventIDsFromResponse = append(eventIDsFromResponse, r.Get("event_id").Str+" ("+r.Get("content").Get("body").Str+")") + } + return true + }) + + return eventIDsFromResponse +} + +// ensureVirtualUserRegistered makes sure the user is registered for the homeserver regardless +// if they are already registered or not. If unable to register, fails the test +func ensureVirtualUserRegistered(t *testing.T, c *client.CSAPI, virtualUserLocalpart string) { + res := c.DoFunc( + t, + "POST", + []string{"_matrix", "client", "r0", "register"}, + client.WithJSONBody(t, map[string]interface{}{"type": "m.login.application_service", "username": virtualUserLocalpart}), + client.WithContentType("application/json"), + ) + + if res.StatusCode == 200 { + return + } + + body := client.ParseJSON(t, res) + errcode := client.GetJSONFieldStr(t, body, "errcode") + + if res.StatusCode == 400 && errcode == "M_USER_IN_USE" { + return + } else { + errorMessage := client.GetJSONFieldStr(t, body, "error") + t.Fatalf("msc2716.ensureVirtualUserRegistered failed to register: (%s) %s", errcode, errorMessage) + } +} + +func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) { + eventIDs = make([]string, count) + for i := 0; i < len(eventIDs); i++ { + newEvent := b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("Message %d", i), + }, + } + newEventId := c.SendEventSynced(t, roomID, newEvent) + eventIDs[i] = newEventId + } + + return eventIDs +} + +var chunkCount int64 = 0 + +func batchSendHistoricalMessages( + t *testing.T, + c *client.CSAPI, + virtualUserIDs []string, + roomID string, + insertAfterEventId string, + insertTime time.Time, + chunkID string, + count int, + expectedStatus int, +) (res *http.Response) { + // Timestamp in milliseconds + insertOriginServerTs := uint64(insertTime.UnixNano() / int64(time.Millisecond)) + + timeBetweenMessagesMS := uint64(timeBetweenMessages / time.Millisecond) + + evs := make([]map[string]interface{}, count) + for i := 0; i < len(evs); i++ { + virtualUserID := virtualUserIDs[i%len(virtualUserIDs)] + + newEvent := map[string]interface{}{ + "type": "m.room.message", + "sender": virtualUserID, + "origin_server_ts": insertOriginServerTs + (timeBetweenMessagesMS * uint64(i)), + "content": map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("Historical %d (chunk=%d)", i, chunkCount), + historicalContentField: true, + }, + } + + evs[i] = newEvent + } + + state_evs := make([]map[string]interface{}, len(virtualUserIDs)) + for i, virtualUserID := range virtualUserIDs { + joinEvent := map[string]interface{}{ + "type": "m.room.member", + "sender": virtualUserID, + "origin_server_ts": insertOriginServerTs, + "content": map[string]interface{}{ + "membership": "join", + }, + "state_key": virtualUserID, + } + + state_evs[i] = joinEvent + } + + query := make(url.Values, 2) + query.Add("prev_event", insertAfterEventId) + // If provided, connect the chunk to the last insertion point + if chunkID != "" { + query.Add("chunk_id", chunkID) + } + + res = c.DoFunc( + t, + "POST", + []string{"_matrix", "client", "unstable", "org.matrix.msc2716", "rooms", roomID, "batch_send"}, + client.WithJSONBody(t, map[string]interface{}{ + "events": evs, + "state_events_at_start": state_evs, + }), + client.WithContentType("application/json"), + client.WithQueries(query), + ) + + if res.StatusCode != expectedStatus { + t.Fatalf("msc2716.batchSendHistoricalMessages got %d HTTP status code from batch send response but want %d", res.StatusCode, expectedStatus) + } + + chunkCount++ + + return res +} + +func getEventsFromBatchSendResponseBody(t *testing.T, body []byte) (eventIDs []string) { + eventIDs = client.GetJSONFieldStringArray(t, body, "events") + + return eventIDs +} + +func getNextChunkIdFromBatchSendResponseBody(t *testing.T, body []byte) (nextChunkID string) { + nextChunkID = client.GetJSONFieldStr(t, body, "next_chunk_id") + + return nextChunkID +}