Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Edited messages appear twice in fulltext search #3363

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clientapi/routing/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func AdminPurgeRoom(req *http.Request, rsAPI roomserverAPI.ClientRoomserverAPI)
}
}

func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI api.ClientUserAPI) util.JSONResponse {
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.Device, userAPI userapi.ClientUserAPI) util.JSONResponse {
if req.Body == nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
Expand Down Expand Up @@ -423,7 +423,7 @@ func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *api.Device,
}
}

func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI api.ClientKeyAPI) util.JSONResponse {
func AdminMarkAsStale(req *http.Request, cfg *config.ClientAPI, keyAPI userapi.ClientKeyAPI) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/pushrules.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func GetPushRulesByKind(ctx context.Context, scope, kind string, device *userapi
}
rulesPtr := pushRuleSetKindPointer(ruleSet, pushrules.Kind(kind))
// Even if rulesPtr is not nil, there may not be any rules for this kind
if rulesPtr == nil || (rulesPtr != nil && len(*rulesPtr) == 0) {
if rulesPtr == nil || len(*rulesPtr) == 0 {
return errorResponse(ctx, spec.InvalidParam("invalid push rules kind"), "pushRuleSetKindPointer failed")
}
return util.JSONResponse{
Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/sendevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func createEvents(eventsJSON []string, roomVer gomatrixserverlib.RoomVersion) ([
for i, eventJSON := range eventsJSON {
pdu, evErr := roomVerImpl.NewEventFromTrustedJSON([]byte(eventJSON), false)
if evErr != nil {
return nil, fmt.Errorf("failed to make event: %s", err.Error())
return nil, fmt.Errorf("failed to make event: %s", evErr.Error())
}
ev := types.HeaderedEvent{PDU: pdu}
events[i] = &ev
Expand Down
17 changes: 17 additions & 0 deletions syncapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,23 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
if err := s.fts.Index(e); err != nil {
return err
}
// If the event is an edited message we remove the original event from the index
// to avoid duplicates in the search results.
relatesTo := gjson.GetBytes(ev.Content(), "m\\.relates_to")
if relatesTo.Exists() {
relatedData := relatesTo.Value().(map[string]interface{})
if _, ok := relatedData["rel_type"]; ok && relatedData["rel_type"] == "m.replace" {
// We remove the original event from the index
if srcEventID, ok := relatedData["event_id"]; ok {
if err := s.fts.Delete(srcEventID.(string)); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"src_id": srcEventID,
}).WithError(err).Error("Failed to delete edited message from the fulltext index")
}
}
}
}
adnull marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
92 changes: 92 additions & 0 deletions syncapi/syncapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"

rstypes "github.com/matrix-org/dendrite/roomserver/types"
Expand Down Expand Up @@ -1324,6 +1327,95 @@ func TestUpdateRelations(t *testing.T) {
})
}

func TestRemoveEditedEventFromSearchIndex(t *testing.T) {
user := test.NewUser(t)
alice := userapi.Device{
ID: "ALICEID",
UserID: user.ID,
AccessToken: "ALICE_BEARER_TOKEN",
DisplayName: "Alice",
AccountType: userapi.AccountTypeUser,
}

routers := httputil.NewRouters()

cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()

// Use an actual roomserver for this
natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)

rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)

room := test.NewRoom(t, user)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)

if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}

ev1 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "first"})
ev2 := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{
"body": " * first",
"m.new_content": map[string]interface{}{
"body": "first",
adnull marked this conversation as resolved.
Show resolved Hide resolved
"msgtype": "m.text",
},
"m.relates_to": map[string]interface{}{
"event_id": ev1.EventID(),
"rel_type": "m.replace",
},
})
events := []*rstypes.HeaderedEvent{ev1, ev2}

for _, e := range events {
roomEvents := append([]*rstypes.HeaderedEvent{}, e)
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, roomEvents, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}

syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, e.EventID())

return gjson.Get(syncBody, path).Exists()
})

// We search that event is the only one nad is the exact event we sent
searchResult := searchRequest(t, routers.Client, alice.AccessToken, "first", []string{room.ID})
results := gjson.GetBytes(searchResult, fmt.Sprintf(`search_categories.room_events.groups.room_id.%s.results`, room.ID))
assert.True(t, results.Exists(), "Should be a search response")
assert.Equal(t, 1, len(results.Array()), "Should be exactly one result")
assert.Equal(t, e.EventID(), results.Array()[0].String(), "Should be only found exact event")
}
}

func searchRequest(t *testing.T, router *mux.Router, accessToken, searchTerm string, roomList []string) []byte {
t.Helper()
w := httptest.NewRecorder()
rq := test.NewRequest(t, "POST", "/_matrix/client/v3/search", test.WithQueryParams(map[string]string{
"access_token": accessToken,
}), test.WithJSONBody(t, map[string]interface{}{
"search_categories": map[string]interface{}{
"room_events": map[string]interface{}{
"filters": roomList,
"search_term": searchTerm,
},
},
}))

router.ServeHTTP(w, rq)
assert.Equal(t, 200, w.Code)
defer w.Result().Body.Close()
body, err := io.ReadAll(w.Result().Body)
assert.NoError(t, err)
return body
}
func syncUntil(t *testing.T,
routers httputil.Routers, accessToken string,
skip bool,
Expand Down
1 change: 1 addition & 0 deletions test/testrig/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func CreateConfig(t *testing.T, dbType test.DBType) (*config.Dendrite, *process.
SingleDatabase: false,
})
cfg.Global.ServerName = "test"
cfg.SyncAPI.Fulltext.Enabled = true
cfg.SyncAPI.Fulltext.InMemory = true
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
// the file system event with InMemory=true :(
Expand Down