Skip to content

Commit

Permalink
Merge pull request #162 from matrix-org/kegan/more-uts
Browse files Browse the repository at this point in the history
Add structures for unit testing handler2
  • Loading branch information
kegsay committed Jun 15, 2023
2 parents 409431f + eefbe33 commit 86531a4
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 8 deletions.
11 changes: 5 additions & 6 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"hash/fnv"
"os"
"sync"

"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"

"github.com/getsentry/sentry-go"

"github.com/matrix-org/sliding-sync/internal"
Expand All @@ -30,12 +31,11 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
// processing v2 data (as a sync2.V2DataReceiver) and publishing updates (pubsub.Payload to V2Listeners);
// and receiving and processing EnsurePolling events.
type Handler struct {
pMap *sync2.PollerMap
pMap sync2.IPollerMap
v2Store *sync2.Storage
Store *state.Storage
v2Pub pubsub.Notifier
v3Sub *pubsub.V3Sub
client sync2.Client
unreadMap map[string]struct {
Highlight int
Notif int
Expand All @@ -48,13 +48,12 @@ type Handler struct {
}

func NewHandler(
connStr string, pMap *sync2.PollerMap, v2Store *sync2.Storage, store *state.Storage, client sync2.Client,
pMap sync2.IPollerMap, v2Store *sync2.Storage, store *state.Storage,
pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool,
) (*Handler, error) {
h := &Handler{
pMap: pMap,
v2Store: v2Store,
client: client,
Store: store,
subSystem: "poller",
unreadMap: make(map[string]struct {
Expand Down
164 changes: 164 additions & 0 deletions sync2/handler2/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package handler2_test

import (
"os"
"reflect"
"sync"
"testing"
"time"

"github.com/matrix-org/sliding-sync/pubsub"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync2/handler2"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/rs/zerolog"
)

var postgresURI string

func TestMain(m *testing.M) {
postgresURI = testutils.PrepareDBConnectionString()
exitCode := m.Run()
os.Exit(exitCode)
}

type pollInfo struct {
pid sync2.PollerID
accessToken string
v2since string
isStartup bool
}

type mockPollerMap struct {
calls []pollInfo
}

func (p *mockPollerMap) NumPollers() int {
return 0
}
func (p *mockPollerMap) Terminate() {}

func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) {
p.calls = append(p.calls, pollInfo{
pid: pid,
accessToken: accessToken,
v2since: v2since,
isStartup: isStartup,
})
}
func (p *mockPollerMap) assertCallExists(t *testing.T, pi pollInfo) {
for _, c := range p.calls {
if reflect.DeepEqual(pi, c) {
return
}
}
t.Fatalf("assertCallExists: did not find %+v", pi)
}

type mockPub struct {
calls []pubsub.Payload
mu *sync.Mutex
waiters map[string][]chan struct{}
}

func newMockPub() *mockPub {
return &mockPub{
mu: &sync.Mutex{},
waiters: make(map[string][]chan struct{}),
}
}

// Notify chanName that there is a new payload p. Return an error if we failed to send the notification.
func (p *mockPub) Notify(chanName string, payload pubsub.Payload) error {
p.calls = append(p.calls, payload)
p.mu.Lock()
for _, ch := range p.waiters[payload.Type()] {
close(ch)
}
p.waiters[payload.Type()] = nil // don't re-notify for 2nd+ payload
p.mu.Unlock()
return nil
}

func (p *mockPub) WaitForPayloadType(t string) chan struct{} {
ch := make(chan struct{})
p.mu.Lock()
p.waiters[t] = append(p.waiters[t], ch)
p.mu.Unlock()
return ch
}

func (p *mockPub) DoWait(t *testing.T, errMsg string, ch chan struct{}) {
select {
case <-ch:
return
case <-time.After(time.Second):
t.Fatalf("DoWait: timed out waiting: %s", errMsg)
}
}

// Close is called when we should stop listening.
func (p *mockPub) Close() error { return nil }

type mockSub struct{}

// Begin listening on this channel with this callback starting from this position. Blocks until Close() is called.
func (s *mockSub) Listen(chanName string, fn func(p pubsub.Payload)) error { return nil }

// Close the listener. No more callbacks should fire.
func (s *mockSub) Close() error { return nil }

func assertNoError(t *testing.T, err error) {
t.Helper()
if err == nil {
return
}
t.Fatalf("assertNoError: %v", err)
}

// Test that if you call EnsurePolling you get back V2InitialSyncComplete down pubsub and the poller
// map is called correctly
func TestHandlerFreshEnsurePolling(t *testing.T) {
store := state.NewStorage(postgresURI)
v2Store := sync2.NewStore(postgresURI, "secret")
pMap := &mockPollerMap{}
pub := newMockPub()
sub := &mockSub{}
h, err := handler2.NewHandler(pMap, v2Store, store, pub, sub, false)
assertNoError(t, err)
alice := "@alice:localhost"
deviceID := "ALICE"
token := "aliceToken"

// the device and token needs to already exist prior to EnsurePolling
err = v2Store.DevicesTable.InsertDevice(alice, deviceID)
assertNoError(t, err)
tok, err := v2Store.TokensTable.Insert(token, alice, deviceID, time.Now())
assertNoError(t, err)

payloadInitialSyncComplete := pubsub.V2InitialSyncComplete{
UserID: alice,
DeviceID: deviceID,
}
ch := pub.WaitForPayloadType(payloadInitialSyncComplete.Type())
// ask the handler to start polling
h.EnsurePolling(&pubsub.V3EnsurePolling{
UserID: alice,
DeviceID: deviceID,
AccessTokenHash: tok.AccessTokenHash,
})
pub.DoWait(t, "didn't see V2InitialSyncComplete", ch)

// make sure we polled with the token i.e it did a db hit
pMap.assertCallExists(t, pollInfo{
pid: sync2.PollerID{
UserID: alice,
DeviceID: deviceID,
},
accessToken: token,
v2since: "",
isStartup: false,
})

}
9 changes: 8 additions & 1 deletion sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/getsentry/sentry-go"
"runtime/debug"
"sync"
"sync/atomic"
"time"

"github.com/getsentry/sentry-go"

"github.com/matrix-org/sliding-sync/internal"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -55,6 +56,12 @@ type V2DataReceiver interface {
OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
}

type IPollerMap interface {
EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)
NumPollers() int
Terminate()
}

// PollerMap is a map of device ID to Poller
type PollerMap struct {
v2Client Client
Expand Down
2 changes: 1 addition & 1 deletion v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han

pMap := sync2.NewPollerMap(v2Client, opts.AddPrometheusMetrics)
// create v2 handler
h2, err := handler2.NewHandler(postgresURI, pMap, storev2, store, v2Client, pubSub, pubSub, opts.AddPrometheusMetrics)
h2, err := handler2.NewHandler(pMap, storev2, store, pubSub, pubSub, opts.AddPrometheusMetrics)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 86531a4

Please sign in to comment.