From 01070776418258b121cbbc314bcd7b1ff6ace6bd Mon Sep 17 00:00:00 2001 From: Doug Luce Date: Mon, 15 Apr 2024 17:29:42 -0700 Subject: [PATCH] Support non-colorized output, logging refactor Three things are happening here: 1) An environment variable is added to allow for non-colorized text to be output. This is so /usr/sbin/daemon can direct stdout to syslog without escaped control sequences: Apr 15 14:48:52 orla syncv3[41839]: ^[[90m14:48:52^[[0m ^[[32mINF^[[0m Poller: accumulated data ^[[36mdevice [events,changed,left,account]=^[[0m[0,0,0,0] ^[[36mdevice_id=^[[0mQRZLYYCRTO ^[[36mrooms [timeline,state,typing,receipts,invites]=^[[0m[0,0,0,0,0] ^[[36muser_id=^[[0m@doug:yutz.horph.com I would have preferred a command-line switch instead of the env var but am following the existing pattern. 2) zerolog use is refactored. Since there really are no different types of log output and only minimal use of additional context, it makes sense to use zerolog's existing global logger. This simplifies things a bit both textually and cyclomatically and allows for central changes to the logger output based on the new env var. 3) A `go fmt ./...` changed a few very minor things. --- cmd/syncv3/main.go | 45 +++++++++++---- internal/errors.go | 10 +--- pubsub/pubsub.go | 7 --- pubsub/v2.go | 3 +- pubsub/v3.go | 6 +- sqlutil/sql.go | 13 ++--- state/accumulator.go | 15 ++--- state/event_table.go | 5 +- .../20230822180807_bogus_snapshot_cleanup.go | 15 ++--- .../20231108122539_clear_stuck_invites.go | 9 +-- .../20240517104423_device_list_table.go | 5 +- state/storage.go | 24 +++----- state/to_device_table.go | 5 +- sync2/handler2/handler.go | 56 ++++++++----------- sync2/handler2/handler_test.go | 3 +- sync2/poller.go | 42 +++++++------- sync2/poller_test.go | 30 +++++----- sync2/storage.go | 11 +--- sync2/tokens_table.go | 3 +- sync3/caches/global.go | 18 ++---- sync3/caches/user.go | 9 +-- sync3/conn.go | 11 ++-- sync3/connmap.go | 17 +++--- sync3/dispatcher.go | 12 +--- sync3/extensions/account_data.go | 7 ++- sync3/extensions/extensions.go | 7 --- sync3/extensions/receipts.go | 13 +++-- sync3/extensions/todevice.go | 3 +- sync3/handler/connstate.go | 17 +++--- sync3/handler/connstate_live.go | 11 ++-- sync3/handler/ensure_polling.go | 3 +- sync3/handler/handler.go | 28 ++++------ sync3/handler/rooms_builder.go | 18 +++--- sync3/lists.go | 3 +- sync3/main_test.go | 3 +- sync3/ops.go | 5 +- tests-e2e/gappy_state_test.go | 10 ++-- v3.go | 30 +++++----- 38 files changed, 250 insertions(+), 282 deletions(-) diff --git a/cmd/syncv3/main.go b/cmd/syncv3/main.go index 6c81ab2b..f42b6cac 100644 --- a/cmd/syncv3/main.go +++ b/cmd/syncv3/main.go @@ -3,7 +3,13 @@ package main import ( "flag" "fmt" - "log" + "github.com/getsentry/sentry-go" + sentryhttp "github.com/getsentry/sentry-go/http" + "github.com/pressly/goose/v3" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "net/http" _ "net/http/pprof" "os" @@ -14,13 +20,6 @@ import ( "syscall" "time" - "github.com/getsentry/sentry-go" - sentryhttp "github.com/getsentry/sentry-go/http" - "github.com/pressly/goose/v3" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/rs/zerolog" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - syncv3 "github.com/matrix-org/sliding-sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync2" @@ -52,6 +51,7 @@ const ( EnvOTLPPassword = "SYNCV3_OTLP_PASSWORD" EnvSentryDsn = "SYNCV3_SENTRY_DSN" EnvLogLevel = "SYNCV3_LOG_LEVEL" + EnvPlainOutput = "SYNCV3_PLAIN_OUTPUT" EnvMaxConns = "SYNCV3_MAX_DB_CONN" EnvIdleTimeoutSecs = "SYNCV3_DB_IDLE_TIMEOUT_SECS" EnvHTTPTimeoutSecs = "SYNCV3_HTTP_TIMEOUT_SECS" @@ -74,11 +74,12 @@ Environment var %s Default: unset. The Sentry DSN to report events to e.g https://sliding-sync@sentry.example.com/123 - if unset does not send sentry events. %s Default: info. The level of verbosity for messages logged. Available values are trace, debug, info, warn, error and fatal %s Default: unset. Max database connections to use when communicating with postgres. Unset or 0 means no limit. +%s Default: unset. Disable colorized output (for cleaner text logging). If set to 1, will output plain text. %s Default: 3600. The maximum amount of time a database connection may be idle, in seconds. 0 means no limit. %s Default: 300. The timeout in seconds for normal HTTP requests. %s Default: 1800. The timeout in seconds for initial sync requests. `, EnvServer, EnvDB, EnvSecret, EnvBindAddr, EnvTLSCert, EnvTLSKey, EnvPPROF, EnvPrometheus, EnvOTLP, EnvOTLPUsername, EnvOTLPPassword, - EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs) + EnvSentryDsn, EnvLogLevel, EnvMaxConns, EnvPlainOutput, EnvIdleTimeoutSecs, EnvHTTPTimeoutSecs, EnvHTTPInitialTimeoutSecs) func defaulting(in, dft string) string { if in == "" { @@ -113,6 +114,7 @@ func main() { EnvSentryDsn: os.Getenv(EnvSentryDsn), EnvLogLevel: os.Getenv(EnvLogLevel), EnvMaxConns: defaulting(os.Getenv(EnvMaxConns), "0"), + EnvPlainOutput: defaulting(os.Getenv(EnvPlainOutput), "0"), EnvIdleTimeoutSecs: defaulting(os.Getenv(EnvIdleTimeoutSecs), "3600"), EnvHTTPTimeoutSecs: defaulting(os.Getenv(EnvHTTPTimeoutSecs), "300"), EnvHTTPInitialTimeoutSecs: defaulting(os.Getenv(EnvHTTPInitialTimeoutSecs), "1800"), @@ -194,6 +196,25 @@ func main() { } } + if args[EnvPlainOutput] != "1" { + log.Logger = log.Output(zerolog.ConsoleWriter{ + Out: os.Stderr, + TimeFormat: "15:04:05", + }) + } else { + output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339} + output.FormatTimestamp = func(i interface{}) string { + return fmt.Sprintf("%v", i) + } + output.FormatLevel = func(i interface{}) string { + return strings.ToUpper(fmt.Sprintf("%s", i)) + } + output.FormatFieldName = func(i interface{}) string { + return fmt.Sprintf("%s=", i) + } + log.Logger = zerolog.New(output).With().Timestamp().Logger() + } + maxConnsInt, err := strconv.Atoi(args[EnvMaxConns]) if err != nil { panic("invalid value for " + EnvMaxConns + ": " + args[EnvMaxConns]) @@ -285,12 +306,12 @@ func executeMigrations() { db, err := goose.OpenDBWithDriver("postgres", envArgs[EnvDB]) if err != nil { - log.Fatalf("goose: failed to open DB: %v\n", err) + log.Fatal().Err(err).Msgf("goose: failed to open DB: %v\n", err) } defer func() { if err := db.Close(); err != nil { - log.Fatalf("goose: failed to close DB: %v\n", err) + log.Fatal().Err(err).Msgf("goose: failed to close DB: %v\n", err) } }() @@ -301,7 +322,7 @@ func executeMigrations() { goose.SetBaseFS(syncv3.EmbedMigrations) if err := goose.Run(command, db, "state/migrations", arguments...); err != nil { - log.Fatalf("goose %v: %v", command, err) + log.Fatal().Err(err).Msgf("goose %v: %v", command, err) } } diff --git a/internal/errors.go b/internal/errors.go index 14225693..135881d5 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -8,15 +8,9 @@ import ( "runtime" "github.com/getsentry/sentry-go" - - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type HandlerError struct { StatusCode int Err error @@ -103,7 +97,7 @@ func assert(msg string, expr bool) { if os.Getenv("SYNCV3_DEBUG") == "1" { panic(fmt.Sprintf("assert: %s", msg)) } - l := logger.Error() + l := log.Error() _, file, line, ok := runtime.Caller(1) if ok { l = l.Str("assertion", fmt.Sprintf("%s:%d", file, line)) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f3132ae8..20dd3879 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,19 +2,12 @@ package pubsub import ( "fmt" - "os" "sync" "time" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type Payload interface { // The type of payload; used mostly for logging and prometheus metrics Type() string diff --git a/pubsub/v2.go b/pubsub/v2.go index 317e96cd..0cbb27b3 100644 --- a/pubsub/v2.go +++ b/pubsub/v2.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) // The channel which has V2* payloads @@ -197,7 +198,7 @@ func (v *V2Sub) onMessage(p Payload) { case *V2StateRedaction: v.receiver.OnStateRedaction(pl) default: - logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type") + log.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type") } } diff --git a/pubsub/v3.go b/pubsub/v3.go index 77bb5db9..61fd2987 100644 --- a/pubsub/v3.go +++ b/pubsub/v3.go @@ -1,5 +1,9 @@ package pubsub +import ( + "github.com/rs/zerolog/log" +) + // The channel which has V3* payloads const ChanV3 = "v3ch" @@ -39,7 +43,7 @@ func (v *V3Sub) onMessage(p Payload) { case *V3EnsurePolling: v.receiver.EnsurePolling(pl) default: - logger.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type") + log.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type") } } diff --git a/sqlutil/sql.go b/sqlutil/sql.go index 5c4a6406..7b5461ee 100644 --- a/sqlutil/sql.go +++ b/sqlutil/sql.go @@ -4,18 +4,12 @@ import ( "context" "fmt" "github.com/matrix-org/sliding-sync/internal" - "github.com/rs/zerolog" - "os" "runtime/debug" "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // WithTransaction runs a block of code passing in an SQL transaction // If the code returns an error or panics then the transactions is rolled back // Otherwise the transaction is committed. @@ -30,7 +24,7 @@ func WithTransaction(db *sqlx.DB, fn func(txn *sqlx.Tx) error) (err error) { if err == nil && panicErr != nil { // TODO: thread a context through to here? ctx := context.Background() - logger.Error().Msg(string(debug.Stack())) + log.Error().Msg(string(debug.Stack())) internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr) err = fmt.Errorf("panic: %v", panicErr) } @@ -59,7 +53,8 @@ type Chunker interface { // Inserting events using NamedExec involves 3n params (n=number of events), meaning it's easy to hit // the limit in rooms like Matrix HQ. This function breaks up the events into chunks which can be // batch inserted in multiple statements. Without this, you'll see errors like: -// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters" +// +// "pq: got 95331 parameters but PostgreSQL only supports 65535 parameters" func Chunkify(numParamsPerStmt, maxParamsPerCall int, entries Chunker) []Chunker { // common case, most things are small if (entries.Len() * numParamsPerStmt) <= maxParamsPerCall { diff --git a/state/accumulator.go b/state/accumulator.go index 678ef4e8..9ea6404e 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -13,6 +13,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -77,7 +78,7 @@ func (a *Accumulator) calculateNewSnapshot(old StrippedEvents, new Event) (Strip // ruh roh. This should be impossible, but it can happen if the v2 response sends the same // event in both state and timeline. We need to alert the operator and whine badly as it means // we have lost an event by now. - logger.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg( + log.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg( "Detected different event IDs with the same NID when rolling forward state. This has resulted in data loss in this room (1 event). " + "This can happen when the v2 /sync response sends the same event in both state and timeline sections. " + "The event in this log line has been dropped!", @@ -227,7 +228,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia // we don't have a current snapshot for this room but yet no events are new, // no idea how this should be handled. const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug." - logger.Error().Str("room_id", roomID).Msg(errMsg) + log.Error().Str("room_id", roomID).Msg(errMsg) sentry.CaptureException(fmt.Errorf(errMsg)) } // Note: we otherwise ignore cases where the state has only changed to a @@ -398,7 +399,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s } else { // Bail out and complain loudly. const msg = "Accumulator: skipping processing of timeline, as no snapshot exists" - logger.Warn(). + log.Warn(). Str("event_id", newEvents[0].ID). Str("event_type", newEvents[0].Type). Str("event_state_key", newEvents[0].StateKey). @@ -484,7 +485,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s if roomVersion == "" { // Defaults to "1" if the key does not exist. roomVersion = "1" - logger.Warn().Str("room", roomID).Err(err).Msg( + log.Warn().Str("room", roomID).Err(err).Msg( "Redact: no content.room_version in create event, defaulting to v1", ) } @@ -576,13 +577,13 @@ func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineRes RoomID: roomID, } if err := e.ensureFieldsSetOnEvent(); err != nil { - logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg( + log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg( "Accumulator.filterToNewTimelineEvents: failed to parse event, ignoring", ) continue } if _, ok := seenEvents[e.ID]; ok { - logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( + log.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg( "Accumulator.filterToNewTimelineEvents: seen the same event ID twice, ignoring", ) continue @@ -671,7 +672,7 @@ func ensureStateHasCreateEvent(events []Event) error { }) sentry.CaptureMessage(errMsg) }) - logger.Warn(). + log.Warn(). Str("room_id", events[0].RoomID). Int("len_state", len(events)). Msg(errMsg) diff --git a/state/event_table.go b/state/event_table.go index 33c5c6c5..413112f9 100644 --- a/state/event_table.go +++ b/state/event_table.go @@ -14,6 +14,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" ) const ( @@ -393,7 +394,7 @@ func (t *EventTable) Redact(txn *sqlx.Tx, roomVer string, redacteeEventIDToRedac if err != nil { // unknown room version... let's just default to "1" rv = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV1) - logger.Warn().Str("version", roomVer).Err(err).Msg( + log.Warn().Str("version", roomVer).Err(err).Msg( "Redact: GetRoomVersion: unknown room version, defaulting to v1", ) } @@ -567,7 +568,7 @@ func filterAndEnsureFieldsSet(events []Event) []Event { for i := range events { ev := &events[i] if err := ev.ensureFieldsSetOnEvent(); err != nil { - logger.Warn().Str("event_id", ev.ID).Err(err).Msg( + log.Warn().Str("event_id", ev.ID).Err(err).Msg( "filterAndEnsureFieldsSet: failed to parse event, ignoring", ) continue diff --git a/state/migrations/20230822180807_bogus_snapshot_cleanup.go b/state/migrations/20230822180807_bogus_snapshot_cleanup.go index a42e950b..39330c38 100644 --- a/state/migrations/20230822180807_bogus_snapshot_cleanup.go +++ b/state/migrations/20230822180807_bogus_snapshot_cleanup.go @@ -4,17 +4,12 @@ import ( "context" "database/sql" "fmt" + "github.com/lib/pq" "github.com/pressly/goose/v3" - "github.com/rs/zerolog" - "os" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - func init() { goose.AddMigrationContext(upBogusSnapshotCleanup, downBogusSnapshotCleanup) } @@ -29,7 +24,7 @@ func upBogusSnapshotCleanup(ctx context.Context, tx *sql.Tx) error { if len(bogusRooms) == 0 { return nil } - logger.Info().Strs("room_ids", bogusRooms). + log.Info().Strs("room_ids", bogusRooms). Msgf("Found %d bogus rooms to cleanup", len(bogusRooms)) tables := []string{"syncv3_snapshots", "syncv3_events", "syncv3_rooms"} @@ -52,9 +47,9 @@ func deleteFromTable(ctx context.Context, tx *sql.Tx, table string, roomIDs []st } ra, err := result.RowsAffected() if err != nil { - logger.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table) + log.Warn().Err(err).Msgf("Couldn't get number of rows deleted from %s", table) } else { - logger.Info().Msgf("Deleted %d rows from %s", ra, table) + log.Info().Msgf("Deleted %d rows from %s", ra, table) } return nil } diff --git a/state/migrations/20231108122539_clear_stuck_invites.go b/state/migrations/20231108122539_clear_stuck_invites.go index acd4afc8..9ffc27a9 100644 --- a/state/migrations/20231108122539_clear_stuck_invites.go +++ b/state/migrations/20231108122539_clear_stuck_invites.go @@ -7,6 +7,7 @@ import ( "github.com/lib/pq" "github.com/pressly/goose/v3" + "github.com/rs/zerolog/log" ) func init() { @@ -49,9 +50,9 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { } usersToInvalidate = append(usersToInvalidate, userID) } - logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") + log.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") if len(usersToInvalidate) < 50 { - logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") + log.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") } // for each user: @@ -64,7 +65,7 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { return fmt.Errorf("failed to invalidate since tokens: %w", err) } ra, _ := res.RowsAffected() - logger.Info().Int64("num_devices", ra).Msg("reset since tokens") + log.Info().Int64("num_devices", ra).Msg("reset since tokens") res, err = tx.ExecContext(ctx, ` DELETE FROM syncv3_invites WHERE user_id=ANY($1) @@ -73,7 +74,7 @@ func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { return fmt.Errorf("failed to remove outstanding invites: %w", err) } ra, _ = res.RowsAffected() - logger.Info().Int64("num_invites", ra).Msg("reset invites") + log.Info().Int64("num_invites", ra).Msg("reset invites") return nil } diff --git a/state/migrations/20240517104423_device_list_table.go b/state/migrations/20240517104423_device_list_table.go index 315b79c2..cbe23cdc 100644 --- a/state/migrations/20240517104423_device_list_table.go +++ b/state/migrations/20240517104423_device_list_table.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/state" "github.com/pressly/goose/v3" + "github.com/rs/zerolog/log" ) type OldDeviceData struct { @@ -63,7 +64,7 @@ func upDeviceListTable(ctx context.Context, tx *sql.Tx) error { if err = tx.QueryRow(`SELECT count(*) FROM syncv3_device_data`).Scan(&count); err != nil { return err } - logger.Info().Int("count", count).Msg("transferring device list data for devices") + log.Info().Int("count", count).Msg("transferring device list data for devices") // scan for existing CBOR (streaming as the CBOR with cursors as it can be large) _, err = tx.Exec(`DECLARE device_data_migration_cursor CURSOR FOR SELECT user_id, device_id, data FROM syncv3_device_data`) @@ -82,7 +83,7 @@ func upDeviceListTable(ctx context.Context, tx *sql.Tx) error { // logging i++ if time.Since(lastUpdate) > updateFrequency { - logger.Info().Msgf("%d/%d process device list data", i, count) + log.Info().Msgf("%d/%d process device list data", i, count) lastUpdate = time.Now() } diff --git a/state/storage.go b/state/storage.go index 0d0faa74..08ed9d8f 100644 --- a/state/storage.go +++ b/state/storage.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" "strings" "time" @@ -18,15 +17,10 @@ import ( "github.com/jmoiron/sqlx" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sqlutil" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // Max number of parameters in a single SQL command const MaxPostgresParameters = 65535 @@ -79,7 +73,7 @@ func NewStorage(postgresURI string) *Storage { if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } return NewStorageWithDB(db, false) } @@ -545,7 +539,7 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str return fmt.Errorf("failed to select latest nids in rooms %v: %s", roomIDs, err) } if len(slowRooms) > 0 { - logger.Warn().Int("slow_rooms", len(slowRooms)).Msg("RoomStateAfterEventPosition: pos value provided is far behind the database copy, performance degraded") + log.Warn().Int("slow_rooms", len(slowRooms)).Msg("RoomStateAfterEventPosition: pos value provided is far behind the database copy, performance degraded") latestSlowEvents, err := s.Accumulator.eventsTable.LatestEventInRooms(txn, slowRooms, pos) if err != nil { return err @@ -653,7 +647,7 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str WITH nids AS ( SELECT `+nidcols+` AS allNids FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id = ANY(?) ) - SELECT syncv3_events.event_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event + SELECT syncv3_events.event_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event FROM syncv3_events, nids WHERE (`+strings.Join(wheres, " OR ")+`) AND syncv3_events.event_nid = ANY(nids.allNids) ORDER BY syncv3_events.event_nid ASC`, @@ -801,7 +795,7 @@ func (s *Storage) RemoveInaccessibleStateSnapshots() error { } rowsAffected, err := result.RowsAffected() if err == nil { - logger.Info().Int64("rows_affected", rowsAffected).Msg("RemoveInaccessibleStateSnapshots: deleted rows") + log.Info().Int64("rows_affected", rowsAffected).Msg("RemoveInaccessibleStateSnapshots: deleted rows") } return nil } @@ -1082,16 +1076,16 @@ Loop: if n < time.Hour { boundaryTime = now.Add(-1 * time.Hour) } - logger.Info().Time("boundaryTime", boundaryTime).Msg("Cleaner running") + log.Info().Time("boundaryTime", boundaryTime).Msg("Cleaner running") err := s.TransactionsTable.Clean(boundaryTime) if err != nil { - logger.Warn().Err(err).Msg("failed to clean txn ID table") + log.Warn().Err(err).Msg("failed to clean txn ID table") sentry.CaptureException(err) } // we also want to clean up stale state snapshots which are inaccessible, to // keep the size of the syncv3_snapshots table low. if err = s.RemoveInaccessibleStateSnapshots(); err != nil { - logger.Warn().Err(err).Msg("failed to remove inaccessible state snapshots") + log.Warn().Err(err).Msg("failed to remove inaccessible state snapshots") sentry.CaptureException(err) } case <-s.shutdownCh: @@ -1123,7 +1117,7 @@ func (s *Storage) LatestEventNIDInRooms(roomIDs []string, highestNID int64) (roo if len(slowRooms) == 0 { return nil // no work to do } - logger.Warn().Int("slow_rooms", len(slowRooms)).Msg("LatestEventNIDInRooms: pos value provided is far behind the database copy, performance degraded") + log.Warn().Int("slow_rooms", len(slowRooms)).Msg("LatestEventNIDInRooms: pos value provided is far behind the database copy, performance degraded") slowRoomToLatestNIDs, err := s.EventsTable.LatestEventNIDInRooms(txn, slowRooms, highestNID) if err != nil { diff --git a/state/to_device_table.go b/state/to_device_table.go index 8035dd77..fccd4e60 100644 --- a/state/to_device_table.go +++ b/state/to_device_table.go @@ -8,6 +8,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -109,7 +110,7 @@ func (t *ToDeviceTable) Messages(userID, deviceID string, from, limit int64) (ms m := gjson.ParseBytes(msgs[i]) msgId := m.Get(`content.org\.matrix\.msgid`).Str if msgId != "" { - logger.Info().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.Messages") + log.Info().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.Messages") } } upTo = rows[len(rows)-1].Position @@ -143,7 +144,7 @@ func (t *ToDeviceTable) InsertMessages(userID, deviceID string, msgs []json.RawM } msgId := m.Get(`content.org\.matrix\.msgid`).Str if msgId != "" { - logger.Debug().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.InsertMessages") + log.Debug().Str("msgid", msgId).Str("user", userID).Str("device", deviceID).Msg("ToDeviceTable.InsertMessages") } switch rows[i].Type { case "m.room_key_request": diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 7b6ecf5b..e48a14a5 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "hash/fnv" - "os" "sync" "time" @@ -19,16 +18,11 @@ import ( "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync2" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // Handler is responsible for starting v2 pollers at startup; // processing v2 data (as a sync2.V2DataReceiver) and publishing updates (pubsub.Payload to V2Listeners); // and receiving and processing EnsurePolling events. @@ -96,7 +90,7 @@ func (h *Handler) Listen() { defer internal.ReportPanicsToSentry() err := h.v3Sub.Listen() if err != nil { - logger.Err(err).Msg("Failed to listen for v3 messages") + log.Err(err).Msg("Failed to listen for v3 messages") sentry.CaptureException(err) } }() @@ -124,7 +118,7 @@ func (h *Handler) Teardown() { func (h *Handler) StartV2Pollers() { tokens, err := h.v2Store.TokensTable.TokenForEachDevice(nil) if err != nil { - logger.Err(err).Msg("StartV2Pollers: failed to query tokens") + log.Err(err).Msg("StartV2Pollers: failed to query tokens") sentry.CaptureException(err) return } @@ -143,7 +137,7 @@ func (h *Handler) StartV2Pollers() { ch <- t } close(ch) - logger.Info().Int("num_devices", len(tokens)).Int("num_fail_decrypt", numFails).Msg("StartV2Pollers") + log.Info().Int("num_devices", len(tokens)).Int("num_fail_decrypt", numFails).Msg("StartV2Pollers") var wg sync.WaitGroup wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { @@ -155,11 +149,9 @@ func (h *Handler) StartV2Pollers() { DeviceID: t.DeviceID, } _, err = h.pMap.EnsurePolling( - pid, t.AccessToken, t.Since, true, - logger.With().Str("user_id", t.UserID).Str("device_id", t.DeviceID).Logger(), - ) + pid, t.AccessToken, t.Since, true) if err != nil { - logger.Err(err).Str("user_id", t.UserID).Str("device_id", t.DeviceID).Msg("Failed to start poller") + log.Err(err).Str("user_id", t.UserID).Str("device_id", t.DeviceID).Msg("Failed to start poller") } else { h.updateMetrics() } @@ -172,7 +164,7 @@ func (h *Handler) StartV2Pollers() { }() } wg.Wait() - logger.Info().Msg("StartV2Pollers finished") + log.Info().Msg("StartV2Pollers finished") h.startPollerExpiryTicker() } @@ -198,7 +190,7 @@ func (h *Handler) OnTerminated(ctx context.Context, pollerID sync2.PollerID) { func (h *Handler) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) { err := h.v2Store.TokensTable.Delete(accessTokenHash) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token") + log.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // Notify v3 side so it can remove the connection from ConnMap @@ -222,7 +214,7 @@ func (h *Handler) addPrometheusMetrics() { func (h *Handler) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) { err := h.v2Store.DevicesTable.UpdateDeviceSince(userID, deviceID, since) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token") + log.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } @@ -237,7 +229,7 @@ func (h *Handler) OnE2EEData(ctx context.Context, userID, deviceID string, otkCo FallbackKeyTypes: fallbackKeyTypes, }, deviceListChanges) if err != nil { - logger.Err(err).Str("user", userID).Msg("failed to upsert device data") + log.Err(err).Str("user", userID).Msg("failed to upsert device data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) retErr = err return @@ -287,7 +279,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin // persist the txn IDs err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user") + log.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } @@ -295,7 +287,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin // Insert new events accResult, err := h.Store.Accumulate(userID, roomID, timeline) if err != nil { - logger.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room") + log.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -328,7 +320,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin return err }) if err != nil { - logger.Err(err). + log.Err(err). Int("timeline", len(timeline.Events)). Int("num_transaction_ids", len(eventIDsWithTxns)). Int("num_missing_transaction_ids", len(eventIDsLackingTxns)). @@ -376,7 +368,7 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra } res, err := h.Store.Initialise(roomID, state) if err != nil { - logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") + log.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -420,7 +412,7 @@ func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType st // else it returns nil newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent) if err != nil { - logger.Err(err).Str("room", roomID).Msg("failed to store receipts") + log.Err(err).Str("room", roomID).Msg("failed to store receipts") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -436,7 +428,7 @@ func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType st func (h *Handler) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error { _, err := h.Store.ToDeviceTable.InsertMessages(userID, deviceID, msgs) if err != nil { - logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages") + log.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -473,7 +465,7 @@ func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string, err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{ @@ -508,7 +500,7 @@ func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, even data, err := h.Store.InsertAccountData(userID, roomID, dedupedEvents) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data") sentry.CaptureException(err) return err } @@ -527,7 +519,7 @@ func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, even func (h *Handler) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) error { err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -542,7 +534,7 @@ func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEv // remove any invites for this user if they are rejecting an invite err := h.Store.InvitesTable.RemoveInvite(userID, roomID) if err != nil { - logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite") + log.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return err } @@ -562,7 +554,7 @@ func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEv } func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { - log := logger.With().Str("user_id", p.UserID).Str("device_id", p.DeviceID).Logger() + log := log.With().Str("user_id", p.UserID).Str("device_id", p.DeviceID).Logger() log.Info().Msg("EnsurePolling: new request") defer func() { log.Info().Msg("EnsurePolling: preprocessing done") @@ -581,7 +573,7 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { DeviceID: p.DeviceID, } _, err = h.pMap.EnsurePolling( - pid, accessToken, since, false, log, + pid, accessToken, since, false, ) if err != nil { log.Err(err).Msg("Failed to start poller") @@ -614,7 +606,7 @@ func (h *Handler) startPollerExpiryTicker() { func (h *Handler) ExpireOldPollers() { devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour) if err != nil { - logger.Err(err).Msg("Error fetching old devices") + log.Err(err).Msg("Error fetching old devices") sentry.CaptureException(err) return } @@ -625,7 +617,7 @@ func (h *Handler) ExpireOldPollers() { } numExpired := h.pMap.ExpirePollers(pids) if len(devices) > 0 { - logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices") + log.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices") } } diff --git a/sync2/handler2/handler_test.go b/sync2/handler2/handler_test.go index adeb5f0e..2b03658d 100644 --- a/sync2/handler2/handler_test.go +++ b/sync2/handler2/handler_test.go @@ -17,7 +17,6 @@ import ( "github.com/matrix-org/sliding-sync/sync2" "github.com/matrix-org/sliding-sync/sync2/handler2" "github.com/matrix-org/sliding-sync/testutils" - "github.com/rs/zerolog" ) var postgresURI string @@ -52,7 +51,7 @@ func (p *mockPollerMap) ExpirePollers([]sync2.PollerID) int { return 0 } -func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error) { +func (p *mockPollerMap) EnsurePolling(pid sync2.PollerID, accessToken, v2since string, isStartup bool) (bool, error) { p.calls = append(p.calls, pollInfo{ pid: pid, accessToken: accessToken, diff --git a/sync2/poller.go b/sync2/poller.go index e3e4839b..82e7405c 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -16,7 +16,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -70,7 +70,7 @@ type V2DataReceiver interface { } type IPollerMap interface { - EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (created bool, err error) + EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool) (created bool, err error) NumPollers() int Terminate() DeviceIDs(userID string) []string @@ -253,7 +253,7 @@ func (h *PollerMap) ExpirePollers(pids []PollerID) int { // Note that we will immediately return if there is a poller for the same user but a different device. // We do this to allow for logins on clients to be snappy fast, even though they won't yet have the // to-device msgs to decrypt E2EE rooms. -func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error) { +func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool) (bool, error) { h.pollerMu.Lock() if !h.executorRunning { h.executorRunning = true @@ -263,7 +263,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS // a poller exists and hasn't been terminated so we don't need to do anything if ok && !poller.terminated.Load() { if poller.accessToken != accessToken { - logger.Warn().Msg("PollerMap.EnsurePolling: poller already running with different access token") + log.Warn().Msg("PollerMap.EnsurePolling: poller already running with different access token") } h.pollerMu.Unlock() // this existing poller may not have completed the initial sync yet, so we need to make sure @@ -288,7 +288,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS // replace the poller. If we don't need to wait, then we just want to nab to-device events initially. // We don't do that on startup though as we cannot be sure that other pollers will not be using expired tokens. - poller = newPoller(pid, accessToken, h.v2Client, h, logger, !needToWait && !isStartup) + poller = newPoller(pid, accessToken, h.v2Client, h, !needToWait && !isStartup) poller.processHistogramVec = h.processHistogramVec poller.timelineSizeVec = h.timelineSizeHistogramVec poller.gappyStateSizeVec = h.gappyStateSizeVec @@ -301,7 +301,7 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS if needToWait { poller.WaitUntilInitialSync() } else { - logger.Info().Str("user", poller.userID).Msg("a poller exists for this user; not waiting for this device to do an initial sync") + log.Info().Str("user", poller.userID).Msg("a poller exists for this user; not waiting for this device to do an initial sync") } if poller.terminated.Load() { return false, fmt.Errorf("PollerMap.EnsurePolling: poller terminated after intial sync") @@ -429,7 +429,6 @@ type poller struct { accessToken string client Client receiver V2DataReceiver - logger zerolog.Logger initialToDeviceOnly bool @@ -461,7 +460,7 @@ type poller struct { totalNumPolls prometheus.Counter } -func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataReceiver, logger zerolog.Logger, initialToDeviceOnly bool) *poller { +func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataReceiver, initialToDeviceOnly bool) *poller { var wg sync.WaitGroup wg.Add(1) return &poller{ @@ -471,7 +470,6 @@ func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataR client: client, receiver: receiver, terminated: &atomic.Bool{}, - logger: logger, wg: &wg, initialToDeviceOnly: initialToDeviceOnly, } @@ -507,11 +505,11 @@ func (p *poller) Poll(since string) { }) ctx := sentry.SetHubOnContext(context.Background(), hub) - p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started") + log.Info().Str("since", since).Msg("Poller: v2 poll loop started") defer func() { panicErr := recover() if panicErr != nil { - logger.Error().Str("user", p.userID).Str("device", p.deviceID).Msgf("%s. Traceback:\n%s", panicErr, debug.Stack()) + log.Error().Str("user", p.userID).Str("device", p.deviceID).Msgf("%s. Traceback:\n%s", panicErr, debug.Stack()) internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr) } p.receiver.OnTerminated(ctx, PollerID{ @@ -554,7 +552,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { if s.failCount > 1000 { // 3s * 1000 = 3000s = 50 minutes errMsg := "poller: access token has failed >1000 times to /sync, terminating loop" - p.logger.Warn().Msg(errMsg) + log.Warn().Msg(errMsg) p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID) p.Terminate() return fmt.Errorf(errMsg) @@ -563,7 +561,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // period of time (on massive accounts on matrix.org) such that if you wait 2,4,8min between // requests it might force the server to do the work all over again :( waitTime := 3 * time.Second - p.logger.Warn().Str("duration", waitTime.String()).Int("fail-count", s.failCount).Msg("Poller: waiting before next poll") + log.Warn().Str("duration", waitTime.String()).Int("fail-count", s.failCount).Msg("Poller: waiting before next poll") timeSleep(waitTime) } if p.terminated.Load() { @@ -587,19 +585,19 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // check if temporary isFatal := statusCode == 401 || statusCode == 403 if !isFatal { - p.logger.Warn().Int("code", statusCode).Err(err).Msg("Poller: sync v2 poll returned temporary error") + log.Warn().Int("code", statusCode).Err(err).Msg("Poller: sync v2 poll returned temporary error") s.failCount += 1 return nil } else { errMsg := "poller: access token has been invalidated, terminating loop" - p.logger.Warn().Msg(errMsg) + log.Warn().Msg(errMsg) p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID) p.Terminate() return fmt.Errorf(errMsg) } } if s.since == "" { - p.logger.Info().Msg("Poller: valid initial sync response received") + log.Info().Msg("Poller: valid initial sync response received") } p.initialToDeviceOnly = false start = time.Now() @@ -609,19 +607,19 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // retry processing the same response after a brief period retryErr := p.parseE2EEData(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseE2EEData returned an error") + log.Err(retryErr).Msg("Poller: parseE2EEData returned an error") s.failCount += 1 return nil } retryErr = p.parseGlobalAccountData(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseGlobalAccountData returned an error") + log.Err(retryErr).Msg("Poller: parseGlobalAccountData returned an error") s.failCount += 1 return nil } retryErr = p.parseRoomsResponse(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseRoomsResponse returned an error") + log.Err(retryErr).Msg("Poller: parseRoomsResponse returned an error") s.failCount += 1 return nil } @@ -633,7 +631,7 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { // deduplicated. retryErr = p.parseToDeviceMessages(ctx, resp) if shouldRetry(retryErr) { - p.logger.Err(retryErr).Msg("Poller: parseToDeviceMessages returned an error") + log.Err(retryErr).Msg("Poller: parseToDeviceMessages returned an error") s.failCount += 1 return nil } @@ -824,7 +822,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro err = p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err == nil { const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state, info after moving create event" - logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int( + log.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int( "timeline", len(roomData.Timeline.Events), ).Int("state", len(roomData.State.Events)).Msg(warnMsg) hub := internal.GetSentryHubFromContextOrDefault(ctx) @@ -960,7 +958,7 @@ func (p *poller) maybeLogStats(force bool) { return } p.lastLogged = time.Now() - p.logger.Info().Ints( + log.Info().Ints( "rooms [timeline,state,typing,receipts,invites]", []int{ p.totalTimelineCalls, p.totalStateCalls, p.totalTyping, p.totalReceipts, p.totalInvites, }, diff --git a/sync2/poller_test.go b/sync2/poller_test.go index 4b145aab..b2c77805 100644 --- a/sync2/poller_test.go +++ b/sync2/poller_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "net/http" - "os" "strconv" "sync" "testing" @@ -13,7 +12,6 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/testutils" - "github.com/rs/zerolog" ) const initialSinceToken = "0" @@ -107,7 +105,7 @@ func TestPollerMapEnsurePolling(t *testing.T) { pm.EnsurePolling(PollerID{ UserID: "alice:localhost", DeviceID: "FOOBAR", - }, "access_token", "", false, zerolog.New(os.Stderr)) + }, "access_token", "", false) close(ensurePollingUnblocked) }() ensureBlocking := func() { @@ -192,7 +190,7 @@ func TestPollerMapEnsurePollingIdempotent(t *testing.T) { for i := 0; i < n; i++ { go func() { t.Logf("EnsurePolling") - pm.EnsurePolling(PollerID{UserID: "@alice:localhost", DeviceID: "FOOBAR"}, "access_token", "", false, zerolog.New(os.Stderr)) + pm.EnsurePolling(PollerID{UserID: "@alice:localhost", DeviceID: "FOOBAR"}, "access_token", "", false) wg.Done() t.Logf("EnsurePolling unblocked") }() @@ -256,7 +254,7 @@ func TestPollerMapEnsurePollingFailsWithExpiredToken(t *testing.T) { pm := NewPollerMap(client, false) pm.SetCallbacks(accumulator) - created, err := pm.EnsurePolling(PollerID{}, "dummy_token", "", true, zerolog.New(os.Stderr)) + created, err := pm.EnsurePolling(PollerID{}, "dummy_token", "", true) if created { t.Errorf("Expected created=false, got created=true") @@ -292,7 +290,7 @@ func TestPollerMap_ExpirePollers(t *testing.T) { for i, spec := range pollerSpecs { created, err := pm.EnsurePolling( PollerID{UserID: spec.UserID, DeviceID: spec.DeviceID}, - spec.Token, "", true, logger, + spec.Token, "", true, ) if err != nil { t.Errorf("EnsurePolling error for poller #%d (%v): %s", i, spec, err) @@ -327,7 +325,7 @@ func TestPollerMap_ExpirePollers(t *testing.T) { for i, spec := range pollerSpecs { created, err := pm.EnsurePolling( PollerID{UserID: spec.UserID, DeviceID: spec.DeviceID}, - spec.Token, "", true, logger, + spec.Token, "", true, ) if err != nil { t.Errorf("EnsurePolling error for poller #%d (%v): %s", i, spec, err) @@ -370,7 +368,7 @@ func TestPollerPollFromNothing(t *testing.T) { }) var wg sync.WaitGroup wg.Add(1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("") @@ -460,7 +458,7 @@ func TestPollerPollFromExisting(t *testing.T) { }) var wg sync.WaitGroup wg.Add(1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll(since) @@ -503,7 +501,7 @@ func TestPollerPollUpdateDeviceSincePeriodically(t *testing.T) { return <-syncResponses, 200, nil }) accumulator.updateSinceCalled = make(chan struct{}, 1) - poller := newPoller(pid, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, accumulator, false) defer poller.Terminate() go func() { poller.Poll(initialSinceToken) @@ -613,7 +611,7 @@ func TestPollerGivesUpEventually(t *testing.T) { }() var wg sync.WaitGroup wg.Add(1) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("") @@ -685,7 +683,7 @@ func TestPollerBackoff(t *testing.T) { }() var wg sync.WaitGroup wg.Add(1) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { defer wg.Done() poller.Poll("some_since_value") @@ -715,7 +713,7 @@ func TestPollerUnblocksIfTerminatedInitially(t *testing.T) { pollUnblocked := make(chan struct{}) waitUntilInitialSyncUnblocked := make(chan struct{}) - poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, false) go func() { poller.Poll("") close(pollUnblocked) @@ -1010,7 +1008,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) { }, } receiver := tc.generateReceiver() - poller := newPoller(pid, "Authorization: hello world", client, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, receiver, false) waitForInitialSync(t, poller) select { case <-waitForStuckPolling: @@ -1082,7 +1080,7 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) { }, 200, nil }, } - poller := newPoller(pid, "Authorization: hello world", client, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", client, receiver, false) waitForInitialSync(t, poller) select { case <-waitForSuccess: @@ -1112,7 +1110,7 @@ func TestPollerResendsOnDataErrorWithOtherErrors(t *testing.T) { return internal.NewDataError("onLeftRoom this is a test: %v", 42) }, } - poller := newPoller(pid, "Authorization: hello world", nil, receiver, zerolog.New(os.Stderr), false) + poller := newPoller(pid, "Authorization: hello world", nil, receiver, false) testCases := []struct { name string res SyncResponse diff --git a/sync2/storage.go b/sync2/storage.go index 2cfa4c8a..47575c3c 100644 --- a/sync2/storage.go +++ b/sync2/storage.go @@ -1,18 +1,11 @@ package sync2 import ( - "os" - "github.com/getsentry/sentry-go" "github.com/jmoiron/sqlx" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type Storage struct { DevicesTable *DevicesTable TokensTable *TokensTable @@ -24,7 +17,7 @@ func NewStore(postgresURI, secret string) *Storage { if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } return NewStoreWithDB(db, secret) } diff --git a/sync2/tokens_table.go b/sync2/tokens_table.go index 961e7bdd..7977ec2a 100644 --- a/sync2/tokens_table.go +++ b/sync2/tokens_table.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "fmt" "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" "io" "strings" "time" @@ -254,7 +255,7 @@ func (t *TokensTable) Delete(accessTokenHash string) error { return err } if ra != 1 { - logger.Warn().Msgf("Tokens.Delete: expected to delete one token, but actually deleted %d", ra) + log.Warn().Msgf("Tokens.Delete: expected to delete one token, but actually deleted %d", ra) } return nil } diff --git a/sync3/caches/global.go b/sync3/caches/global.go index e448c834..62625a8a 100644 --- a/sync3/caches/global.go +++ b/sync3/caches/global.go @@ -3,13 +3,12 @@ package caches import ( "context" "encoding/json" - "os" "sort" "sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -55,11 +54,6 @@ type EventData struct { ForceInitial bool } -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // The purpose of global cache is to store global-level information about all rooms the server is aware of. // Global-level information is represented as internal.RoomMetadata and includes things like Heroes, join/invite // counts, if the room is encrypted, etc. Basically anything that is the same for all users of the system. This @@ -124,7 +118,7 @@ func (c *GlobalCache) LoadRoomsFromMap(ctx context.Context, joinTimingsByRoomID func (c *GlobalCache) copyRoom(roomID string) *internal.RoomMetadata { sr := c.roomIDToMetadata[roomID] if sr == nil { - logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, returning stub") + log.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, returning stub") return internal.NewRoomMetadata(roomID) } return sr.DeepCopy() @@ -176,7 +170,7 @@ func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPos evType: {stateKey}, }) if err != nil { - logger.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state") + log.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -198,7 +192,7 @@ func (c *GlobalCache) LoadRoomState(ctx context.Context, roomIDs []string, loadP resultMap := make(map[string][]json.RawMessage, len(roomIDs)) roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(ctx, roomIDs, loadPosition, requiredStateMap.QueryStateMap()) if err != nil { - logger.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state") + log.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -383,13 +377,13 @@ func (c *GlobalCache) OnInvalidateRoom(ctx context.Context, roomID string) { metadata, ok := c.roomIDToMetadata[roomID] if !ok { - logger.Warn().Str("room_id", roomID).Msg("OnInvalidateRoom: room not in global cache") + log.Warn().Str("room_id", roomID).Msg("OnInvalidateRoom: room not in global cache") return } err := c.store.ResetMetadataState(metadata) if err != nil { internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) - logger.Warn().Err(err).Msg("OnInvalidateRoom: failed to reset metadata") + log.Warn().Err(err).Msg("OnInvalidateRoom: failed to reset metadata") } } diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 3a7ec0c1..4fd95381 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -137,7 +138,7 @@ func NewInviteData(ctx context.Context, userID, roomID string, inviteState []jso } if id.InviteEvent == nil { const errMsg = "cannot make invite, missing invite event for user" - logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg) + log.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg) hub := internal.GetSentryHubFromContextOrDefault(ctx) hub.WithScope(func(scope *sentry.Scope) { scope.SetContext(internal.SentryCtxKey, map[string]interface{}{ @@ -331,7 +332,7 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID result := make(map[string]state.LatestEvents) roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents) if err != nil { - logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") + log.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -401,7 +402,7 @@ func (c *UserCache) newRoomUpdate(ctx context.Context, roomID string) RoomUpdate if globalRooms == nil || globalRooms[roomID] == nil { // this can happen when we join a room we didn't know about because we process unread counts // before the timeline events. Warn and send a stub - logger.Warn().Str("room", roomID).Msg("UserCache update: room doesn't exist in global cache yet, generating stub") + log.Warn().Str("room", roomID).Msg("UserCache update: room doesn't exist in global cache yet, generating stub") r = internal.NewRoomMetadata(roomID) } else { r = globalRooms[roomID] @@ -482,7 +483,7 @@ func (c *UserCache) AnnotateWithTransactionIDs(ctx context.Context, userID strin event := events[data.i] newJSON, err := sjson.SetBytes(event, "unsigned.transaction_id", txnID) if err != nil { - logger.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed") + log.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { events[data.i] = newJSON diff --git a/sync3/conn.go b/sync3/conn.go index 7fcb32e0..7c421f11 100644 --- a/sync3/conn.go +++ b/sync3/conn.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // The amount of time to artificially wait if the server detects spamming clients. This time will @@ -97,7 +98,7 @@ func (c *Conn) tryRequest(ctx context.Context, req *Request, start time.Time) (r panicErr := recover() if panicErr != nil { err = fmt.Errorf("panic: %s", panicErr) - logger.Error().Msg(string(debug.Stack())) + log.Error().Msg(string(debug.Stack())) // Note: as we've captured the panicErr ourselves, there isn't much // difference between RecoverWithContext and CaptureException. But // there /is/ a small difference: @@ -160,7 +161,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T // are playing games if !isFirstRequest && !isRetransmit && !c.isOutstanding(req.pos) { // the client made up a position, reject them - logger.Trace().Int64("pos", req.pos).Msg("unknown pos") + log.Trace().Int64("pos", req.pos).Msg("unknown pos") return nil, internal.ExpiredSessionError() } @@ -181,7 +182,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T c.serverResponses = c.serverResponses[delIndex+1:] // slice out the first delIndex+1 elements defer func() { - l := logger.Trace().Int("num_res_acks", delIndex+1).Bool("is_retransmit", isRetransmit).Bool("is_first", isFirstRequest).Bool("is_same", isSameRequest).Int64("pos", req.pos).Str("user", c.UserID) + l := log.Trace().Int("num_res_acks", delIndex+1).Bool("is_retransmit", isRetransmit).Bool("is_first", isFirstRequest).Bool("is_same", isSameRequest).Int64("pos", req.pos).Str("user", c.UserID) if nextUnACKedResponse != nil { l.Int64("new_pos", nextUnACKedResponse.PosInt()) } @@ -196,13 +197,13 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T if isSameRequest { // this is the 2nd+ time we've seen this request, meaning the client likely retried this // request. Send the response we sent before. - logger.Trace().Int64("pos", req.pos).Msg("returning cached response for pos, with delay") + log.Trace().Int64("pos", req.pos).Msg("returning cached response for pos, with delay") // apply a small artificial wait to protect the proxy in case this is caused by a buggy // client sending the same request over and over time.Sleep(SpamProtectionInterval) return nextUnACKedResponse, nil } else { - logger.Info().Int64("pos", req.pos).Msg("client has resent this pos with different request data") + log.Info().Int64("pos", req.pos).Msg("client has resent this pos with different request data") // we need to fallthrough to process this request as the client will not resend this request data, } } diff --git a/sync3/connmap.go b/sync3/connmap.go index 287770fe..d03fc777 100644 --- a/sync3/connmap.go +++ b/sync3/connmap.go @@ -10,6 +10,7 @@ import ( "github.com/ReneKroon/ttlcache/v2" "github.com/matrix-org/sliding-sync/internal" "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog/log" ) // ConnMap stores a collection of Conns. @@ -126,7 +127,7 @@ func (m *ConnMap) getConn(cid ConnID) *Conn { return conn } // e.g buffer exceeded, close it and remove it from the cache - logger.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") + log.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)") m.closeConn(conn) if m.expiryBufferFullCounter != nil { m.expiryBufferFullCounter.Inc() @@ -149,7 +150,7 @@ func (m *ConnMap) CreateConn(cid ConnID, cancel context.CancelFunc, newConnHandl // /sync without a `?pos=` value. time.Sleep(SpamProtectionInterval) } - logger.Trace().Str("conn", cid.String()).Bool("spamming", isSpamming).Msg("closing connection due to CreateConn called again") + log.Trace().Str("conn", cid.String()).Bool("spamming", isSpamming).Msg("closing connection due to CreateConn called again") m.closeConn(conn) } h := newConnHandler() @@ -163,13 +164,13 @@ func (m *ConnMap) CreateConn(cid ConnID, cancel context.CancelFunc, newConnHandl } func (m *ConnMap) CloseConnsForDevice(userID, deviceID string) { - logger.Trace().Str("user", userID).Str("device", deviceID).Msg("closing connections due to CloseConn()") + log.Trace().Str("user", userID).Str("device", deviceID).Msg("closing connections due to CloseConn()") // gather open connections for this user|device connIDs := m.connIDsForDevice(userID, deviceID) for _, cid := range connIDs { err := m.cache.Remove(cid.String()) // this will fire TTL callbacks which calls closeConn if err != nil { - logger.Err(err).Str("cid", cid.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") + log.Err(err).Str("cid", cid.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") internal.GetSentryHubFromContextOrDefault(context.Background()).CaptureException(err) } } @@ -195,12 +196,12 @@ func (m *ConnMap) CloseConnsForUsers(userIDs []string) (closed int) { defer m.mu.Unlock() for _, userID := range userIDs { conns := m.userIDToConn[userID] - logger.Trace().Str("user", userID).Int("num_conns", len(conns)).Msg("closing all device connections due to CloseConn()") + log.Trace().Str("user", userID).Int("num_conns", len(conns)).Msg("closing all device connections due to CloseConn()") for _, conn := range conns { err := m.cache.Remove(conn.String()) // this will fire TTL callbacks which calls closeConn if err != nil { - logger.Err(err).Str("cid", conn.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") + log.Err(err).Str("cid", conn.String()).Msg("CloseConnsForDevice: cid did not exist in ttlcache") internal.GetSentryHubFromContextOrDefault(context.Background()).CaptureException(err) } } @@ -213,7 +214,7 @@ func (m *ConnMap) closeConnExpires(connID string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() conn := value.(*Conn) - logger.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache") + log.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache") if m.expiryTimedOutCounter != nil { m.expiryTimedOutCounter.Inc() } @@ -227,7 +228,7 @@ func (m *ConnMap) closeConn(conn *Conn) { } connKey := conn.ConnID.String() - logger.Trace().Str("conn", connKey).Msg("closing connection") + log.Trace().Str("conn", connKey).Msg("closing connection") // remove conn from all the maps delete(m.connIDToConn, connKey) h := conn.handler diff --git a/sync3/dispatcher.go b/sync3/dispatcher.go index a7bb24b1..e100c4e1 100644 --- a/sync3/dispatcher.go +++ b/sync3/dispatcher.go @@ -3,20 +3,14 @@ package sync3 import ( "context" "encoding/json" - "os" "sync" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" - "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - const DispatcherAllUsers = "-" // Receiver represents the callbacks that a Dispatcher may fire. @@ -83,7 +77,7 @@ func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) er d.userToReceiverMu.Lock() defer d.userToReceiverMu.Unlock() if _, ok := d.userToReceiver[userID]; ok { - logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered") + log.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered") } d.userToReceiver[userID] = r return r.OnRegistered(ctx) @@ -122,7 +116,7 @@ func (d *Dispatcher) newEventData(event json.RawMessage, roomID string, latestPo func (d *Dispatcher) OnNewInitialRoomState(ctx context.Context, roomID string, state []json.RawMessage) { // sanity check if _, jc := d.jrt.JoinedUsersForRoom(roomID, nil); jc > 0 { - logger.Warn().Int("join_count", jc).Str("room", roomID).Int("num_state", len(state)).Msg( + log.Warn().Int("join_count", jc).Str("room", roomID).Int("num_state", len(state)).Msg( "OnNewInitialRoomState but have entries in JoinedRoomsTracker already, this should be impossible. Degrading to live events", ) for _, s := range state { diff --git a/sync3/extensions/account_data.go b/sync3/extensions/account_data.go index a84d7f2a..b63d1d6b 100644 --- a/sync3/extensions/account_data.go +++ b/sync3/extensions/account_data.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // Client created request params @@ -66,7 +67,7 @@ func (r *AccountDataRequest) AppendLive(ctx context.Context, res *Response, extC } roomAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, update.RoomID()) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { if len(roomAccountData) > 0 { // else we can end up with `null` not `[]` @@ -109,7 +110,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, if len(roomIDs) > 0 { roomsAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, roomIDs...) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data") + log.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Rooms = make(map[string][]json.RawMessage) @@ -123,7 +124,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, if extCtx.IsInitial { globalAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data") + log.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Global = accountEventsAsJSON(globalAccountData) diff --git a/sync3/extensions/extensions.go b/sync3/extensions/extensions.go index 4eebd4c9..bec15a3c 100644 --- a/sync3/extensions/extensions.go +++ b/sync3/extensions/extensions.go @@ -3,20 +3,13 @@ package extensions import ( "context" - "os" "reflect" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" - "github.com/rs/zerolog" ) -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - type GenericRequest interface { // Name provides a name to identify the kind of request. At present, it's only // used to name opentracing spans; this isn't end-user visible. diff --git a/sync3/extensions/receipts.go b/sync3/extensions/receipts.go index cf5be0de..80e17781 100644 --- a/sync3/extensions/receipts.go +++ b/sync3/extensions/receipts.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // Client created request params @@ -42,7 +43,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx if res.Receipts == nil { edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -55,7 +56,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx // we have receipts already, but not for this room edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -65,7 +66,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx // aggregate receipts: we need to unpack then repack annoyingly. pub, priv, err := state.UnpackReceiptsFromEDU(update.RoomID(), res.Receipts.Rooms[update.RoomID()]) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -74,7 +75,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx receipts = append(receipts, update.Receipt) edu, err := state.PackReceiptsIntoEDU(receipts) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + log.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -94,7 +95,7 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext } receipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForEvents(roomID, timeline) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents") + log.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) continue } @@ -104,7 +105,7 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext // single shot query to pull out our own receipts for these rooms to always include our own receipts ownReceipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForUser(interestedRoomIDs, extCtx.UserID) if err != nil { - logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", interestedRoomIDs).Msg("failed to SelectReceiptsForUser") + log.Err(err).Str("user", extCtx.UserID).Strs("rooms", interestedRoomIDs).Msg("failed to SelectReceiptsForUser") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } diff --git a/sync3/extensions/todevice.go b/sync3/extensions/todevice.go index 430c3da5..a4d17339 100644 --- a/sync3/extensions/todevice.go +++ b/sync3/extensions/todevice.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3/caches" + "github.com/rs/zerolog/log" ) // used to remember since positions to warn when they are not incremented. This can happen @@ -64,7 +65,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext if r.Limit == 0 { r.Limit = 100 // default to 100 } - l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger() + l := log.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger() mapMu.Lock() lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID] diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index 2eed4974..b424d524 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -10,6 +10,7 @@ import ( "github.com/matrix-org/sliding-sync/sync3/caches" "github.com/matrix-org/sliding-sync/sync3/extensions" "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog/log" "github.com/tidwall/gjson" ) @@ -179,7 +180,7 @@ func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req if err != nil { // in practice this means DB hit failures. If we try again later maybe it'll work, and we will because // anchorLoadPosition is unset. - logger.Err(err).Str("conn", cid.String()).Msg("failed to load initial data") + log.Err(err).Str("conn", cid.String()).Msg("failed to load initial data") } region.End() } @@ -313,7 +314,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui // the sort/filter operations have changed, invalidate everything (if there were previous syncs), re-sort and re-SYNC if prevReqList != nil { // there were previous syncs for this list, INVALIDATE the lot - logger.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed") + log.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed") allRoomIDs := roomList.RoomIDs() for _, r := range prevRange { if r[0] >= roomList.Len() { @@ -333,7 +334,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui } // resort as either we changed the sort order or we added/removed a bunch of rooms if err := roomList.Sort(nextReqList.Sort); err != nil { - logger.Err(err).Str("key", listKey).Msg("cannot sort list") + log.Err(err).Str("key", listKey).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } addedRanges = nextReqList.Ranges @@ -342,7 +343,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui // send INVALIDATE for these ranges if len(removedRanges) > 0 { - logger.Trace().Interface("range", removedRanges).Msg("INVALIDATEing because ranges were removed") + log.Trace().Interface("range", removedRanges).Msg("INVALIDATEing because ranges were removed") } for i := range removedRanges { if removedRanges[i][0] >= (roomList.Len()) { @@ -431,7 +432,7 @@ func (s *ConnState) buildListSubscriptions(ctx context.Context, builder *RoomsBu for listKey, list := range listDeltas { if list.Curr == nil { // they deleted this list - logger.Debug().Str("key", listKey).Msg("list deleted") + log.Debug().Str("key", listKey).Msg("list deleted") s.lists.DeleteList(listKey) continue } @@ -451,7 +452,7 @@ func (s *ConnState) buildRoomSubscriptions(ctx context.Context, builder *RoomsBu sub, ok := s.muxedReq.RoomSubscriptions[roomID] if !ok { - logger.Warn().Str("room_id", roomID).Msg( + log.Warn().Str("room_id", roomID).Msg( "room listed in subscriptions but there is no subscription information in the request, ignoring room subscription.", ) continue @@ -724,7 +725,7 @@ func (s *ConnState) trackProcessDuration(ctx context.Context, dur time.Duration, // Called when the connection is torn down func (s *ConnState) Destroy() { s.userCache.Unsubscribe(s.userCacheID) - logger.Debug().Str("user_id", s.userID).Str("device_id", s.deviceID).Msg("cancelling any in-flight requests") + log.Debug().Str("user_id", s.userID).Str("device_id", s.deviceID).Msg("cancelling any in-flight requests") if s.cancelLatestReq != nil { s.cancelLatestReq() } @@ -761,7 +762,7 @@ func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate) { internal.AssertWithContext(ctx, "missing global room metadata", update.GlobalRoomMetadata() != nil) s.OnUpdate(ctx, update) default: - logger.Warn().Str("room_id", up.RoomID()).Msg("OnRoomUpdate unknown update type") + log.Warn().Str("room_id", up.RoomID()).Msg("OnRoomUpdate unknown update type") } } diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index 70da78db..ad5f294a 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/sync3/caches" "github.com/matrix-org/sliding-sync/sync3/extensions" + "github.com/rs/zerolog/log" ) // the amount of time to try to insert into a full buffer before giving up. @@ -37,7 +38,7 @@ func (s *connStateLive) onUpdate(up caches.Update) { select { case s.updates <- up: case <-time.After(BufferWaitTime): - logger.Warn().Interface("update", up).Str("user", s.userID).Str("device", s.deviceID).Msg( + log.Warn().Interface("update", up).Str("user", s.userID).Str("device", s.deviceID).Msg( "cannot send update to connection, buffer exceeded. Destroying connection.", ) s.bufferFull = true @@ -50,7 +51,7 @@ func (s *connStateLive) liveUpdate( ctx context.Context, req *sync3.Request, ex extensions.Request, isInitial bool, response *sync3.Response, ) { - log := logger.With().Str("user", s.userID).Str("device", s.deviceID).Logger() + log := log.With().Str("user", s.userID).Str("device", s.deviceID).Logger() // we need to ensure that we keep consuming from the updates channel, even if they want a response // immediately. If we have new list data we won't wait, but if we don't then we need to be able to // catch-up to the current head position, hence giving 100ms grace period for processing. @@ -150,7 +151,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, // Skip message events from ignored users. if roomEventUpdate.EventData.StateKey == nil && s.userCache.ShouldIgnore(roomEventUpdate.EventData.Sender) { - logger.Trace(). + log.Trace(). Str("user", s.userID). Str("type", roomEventUpdate.EventData.EventType). Str("sender", roomEventUpdate.EventData.Sender). @@ -386,14 +387,14 @@ func (s *connStateLive) processLiveUpdateForList( ) (hasUpdates bool) { switch update := up.(type) { case *caches.RoomEventUpdate: - logger.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update") + log.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update") if update.EventData.ForceInitial { // add room to sub: this applies for when we track all rooms too as we want joins/etc to come through with initial data subID := builder.AddSubscription(reqList.RoomSubscription) builder.AddRoomsToSubscription(ctx, subID, []string{update.RoomID()}) } case *caches.UnreadCountUpdate: - logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Bool("count_decreased", update.HasCountDecreased).Msg("received unread count update") + log.Trace().Str("user", s.userID).Str("room", update.RoomID()).Bool("count_decreased", update.HasCountDecreased).Msg("received unread count update") // normally we do not signal unread count increases to the client as we want to atomically // increase the count AND send the msg so there's no phantom msgs/notifications. However, // we must resort the list and send delta even if this is an increase else diff --git a/sync3/handler/ensure_polling.go b/sync3/handler/ensure_polling.go index 24a54cae..db63d142 100644 --- a/sync3/handler/ensure_polling.go +++ b/sync3/handler/ensure_polling.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/matrix-org/sliding-sync/pubsub" + "github.com/rs/zerolog/log" ) // pendingInfo tracks the status of a poller that we are (or previously were) waiting @@ -128,7 +129,7 @@ func (p *EnsurePoller) EnsurePolling(ctx context.Context, pid sync2.PollerID, to } func (p *EnsurePoller) OnInitialSyncComplete(payload *pubsub.V2InitialSyncComplete) { - log := logger.With().Str("user", payload.UserID).Str("device", payload.DeviceID).Logger() + log := log.With().Str("user", payload.UserID).Str("device", payload.DeviceID).Logger() log.Trace().Msg("OnInitialSyncComplete: got payload") pid := sync2.PollerID{UserID: payload.UserID, DeviceID: payload.DeviceID} p.mu.Lock() diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 9ab0db77..fd3ba615 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -8,7 +8,6 @@ import ( "fmt" "net/http" "net/url" - "os" "reflect" "strconv" "sync" @@ -34,11 +33,6 @@ import ( const DefaultSessionID = "default" -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) - // This is a net.http Handler for sync v3. It is responsible for pairing requests to Conns and to // ensure that the sync v2 poller is running for this client. type SyncLiveHandler struct { @@ -75,7 +69,7 @@ func NewSync3Handler( pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int, maxTransactionIDDelay time.Duration, ) (*SyncLiveHandler, error) { - logger.Info().Msg("creating handler") + log.Info().Msg("creating handler") sh := &SyncLiveHandler{ V2: v2Client, Storage: store, @@ -122,7 +116,7 @@ func (h *SyncLiveHandler) Listen() { defer internal.ReportPanicsToSentry() err := h.V2Sub.Listen() if err != nil { - logger.Err(err).Msg("Failed to listen for v2 messages") + log.Err(err).Msg("Failed to listen for v2 messages") sentry.CaptureException(err) } }() @@ -483,7 +477,7 @@ func (h *SyncLiveHandler) identifyUnknownAccessToken(ctx context.Context, access // Create a brand-new row for this token. token, err = h.V2Store.TokensTable.Insert(txn, accessToken, userID, deviceID, time.Now()) if err != nil { - logger.Warn().Err(err).Str("user", userID).Str("device", deviceID).Msg("failed to insert v2 token") + log.Warn().Err(err).Str("user", userID).Str("device", deviceID).Msg("failed to insert v2 token") return err } @@ -625,7 +619,7 @@ func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID strin dd, err := h.Storage.DeviceDataTable.Select(userID, deviceID, shouldSwap) if err != nil { - logger.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data") + log.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -637,7 +631,7 @@ func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID strin func (h *SyncLiveHandler) TransactionIDForEvents(userID string, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string) { eventIDToTxnID, err := h.Storage.TransactionsTable.Select(userID, deviceID, eventIDs) if err != nil { - logger.Warn().Str("err", err.Error()).Str("device", deviceID).Msg("failed to select txn IDs for events") + log.Warn().Str("err", err.Error()).Str("device", deviceID).Msg("failed to select txn IDs for events") } return } @@ -653,7 +647,7 @@ func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) { // note: events is sorted in ascending NID order, event if p.EventNIDs isn't. events, err := h.Storage.EventNIDs(p.EventNIDs) if err != nil { - logger.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs") + log.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -684,7 +678,7 @@ func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) { defer task.End() state, err := h.Storage.StateSnapshot(p.SnapshotNID) if err != nil { - logger.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot") + log.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -736,7 +730,7 @@ func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) { } inviteState, err := h.Storage.InvitesTable.SelectInviteState(p.UserID, p.RoomID) if err != nil { - logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state") + log.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -806,7 +800,7 @@ func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData) { } data, err := h.Storage.AccountData(p.UserID, p.RoomID, p.Types) if err != nil { - logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup") + log.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } @@ -848,7 +842,7 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) { }) hub.CaptureException(err) }) - logger.Err(err). + log.Err(err). Str("room_id", p.RoomID). Msg("Failed to fetch members after cache invalidation") return @@ -871,7 +865,7 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) { h.destroyedConns.Add(float64(destroyed)) } // invalidations are rare and dangerous if we get it wrong, so log information about it. - logger.Info(). + log.Info(). Str("room_id", p.RoomID).Int("joins", len(joins)).Int("invites", len(invites)).Int("leaves", len(leaves)). Int("del_user_caches", len(unregistered)).Int("conns_destroyed", destroyed).Msg("OnInvalidateRoom") } diff --git a/sync3/handler/rooms_builder.go b/sync3/handler/rooms_builder.go index e0e4efc8..0a9a15df 100644 --- a/sync3/handler/rooms_builder.go +++ b/sync3/handler/rooms_builder.go @@ -13,19 +13,21 @@ import ( // in the Response. It is not thread-safe and should only be called by the ConnState thread. // // The top-level `rooms` key is an amalgamation of: -// - Room subscriptions -// - Rooms within all sliding lists. +// - Room subscriptions +// - Rooms within all sliding lists. // // The purpose of this builder is to remember which rooms we will be returning data for, along with the // room subscription for that room. This then allows efficient database accesses. For example: -// - List A will return !a, !b, !c with Room Subscription X -// - List B will return !b, !c, !d with Room Subscription Y -// - Room sub for !a with Room Subscription Z +// - List A will return !a, !b, !c with Room Subscription X +// - List B will return !b, !c, !d with Room Subscription Y +// - Room sub for !a with Room Subscription Z +// // Rather than performing each operation in isolation and query for rooms multiple times (where the // response data will inevitably be dropped), we can instead amalgamate this into: -// - Room Subscription X+Z -> !a -// - Room Subscription X+Y -> !b, !c -// - Room Subscription Y -> !d +// - Room Subscription X+Z -> !a +// - Room Subscription X+Y -> !b, !c +// - Room Subscription Y -> !d +// // This data will not be wasted when it has been retrieved from the database. type RoomsBuilder struct { subs []sync3.RoomSubscription diff --git a/sync3/lists.go b/sync3/lists.go index 823637b4..13f46ba8 100644 --- a/sync3/lists.go +++ b/sync3/lists.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) type OverwriteVal bool @@ -244,7 +245,7 @@ func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, f if sort != nil { err := roomList.Sort(sort) if err != nil { - logger.Err(err).Strs("sort_by", sort).Msg("failed to sort") + log.Err(err).Strs("sort_by", sort).Msg("failed to sort") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } diff --git a/sync3/main_test.go b/sync3/main_test.go index 9638346d..551777b6 100644 --- a/sync3/main_test.go +++ b/sync3/main_test.go @@ -6,12 +6,13 @@ import ( "github.com/matrix-org/sliding-sync/testutils" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) var postgresConnectionString = "user=xxxxx dbname=syncv3_test sslmode=disable" func TestMain(m *testing.M) { - logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ + log.Logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ Out: os.Stderr, TimeFormat: "15:04:05", NoColor: true, diff --git a/sync3/ops.go b/sync3/ops.go index 9f051191..906c6b8f 100644 --- a/sync3/ops.go +++ b/sync3/ops.go @@ -3,6 +3,7 @@ package sync3 import ( "context" "github.com/matrix-org/sliding-sync/internal" + "github.com/rs/zerolog/log" ) type List interface { @@ -49,7 +50,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room list.Add(roomID) // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { - logger.Err(err).Msg("cannot sort list") + log.Err(err).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room @@ -67,7 +68,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room case ListOpChange: // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { - logger.Err(err).Msg("cannot sort list") + log.Err(err).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room diff --git a/tests-e2e/gappy_state_test.go b/tests-e2e/gappy_state_test.go index 209fc9b3..77078d7f 100644 --- a/tests-e2e/gappy_state_test.go +++ b/tests-e2e/gappy_state_test.go @@ -69,11 +69,11 @@ func TestGappyState(t *testing.T) { t.Log("Alice sends lots of other state events.") const numOtherState = 40 for i := 0; i < numOtherState; i++ { - alice.Unsafe_SendEventUnsynced(t, roomID, b.Event{ - Type: "com.example.dummy", - StateKey: ptr(fmt.Sprintf("%d", i)), - Content: map[string]any{}, - }) + alice.Unsafe_SendEventUnsynced(t, roomID, b.Event{ + Type: "com.example.dummy", + StateKey: ptr(fmt.Sprintf("%d", i)), + Content: map[string]any{}, + }) } t.Log("Alice sends a batch of message events.") diff --git a/v3.go b/v3.go index 53e40637..031ff0b6 100644 --- a/v3.go +++ b/v3.go @@ -24,17 +24,13 @@ import ( "github.com/matrix-org/sliding-sync/sync2/handler2" "github.com/matrix-org/sliding-sync/sync3/handler" "github.com/pressly/goose/v3" - "github.com/rs/zerolog" "github.com/rs/zerolog/hlog" + "github.com/rs/zerolog/log" ) //go:embed state/migrations/* var EmbedMigrations embed.FS -var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{ - Out: os.Stderr, - TimeFormat: "15:04:05", -}) var Version string type Opts struct { @@ -94,14 +90,14 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han // Sanity check that we can contact the upstream homeserver. _, err := v2Client.Versions(context.Background()) if err != nil { - logger.Warn().Err(err).Str("dest", destHomeserver).Msg("Could not contact upstream homeserver. Is SYNCV3_SERVER set correctly?") + log.Warn().Err(err).Str("dest", destHomeserver).Msg("Could not contact upstream homeserver. Is SYNCV3_SERVER set correctly?") } db, err := sqlx.Open("postgres", postgresURI) if err != nil { sentry.CaptureException(err) // TODO: if we panic(), will sentry have a chance to flush the event? - logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") + log.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB") } if opts.DBMaxConns > 0 { @@ -121,7 +117,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han goose.SetBaseFS(EmbedMigrations) err = goose.Up(db.DB, "state/migrations", goose.WithAllowMissing()) if err != nil { - logger.Panic().Err(err).Msg("failed to execute migrations") + log.Panic().Err(err).Msg("failed to execute migrations") } bufferSize := 50 @@ -152,7 +148,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han if err != nil { panic(err) } - logger.Info().Msg("retrieved global snapshot from database") + log.Info().Msg("retrieved global snapshot from database") h3.Startup(&storeSnapshot) // begin consuming from these positions @@ -188,7 +184,7 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str srv := &server{ chain: []func(next http.Handler) http.Handler{ - hlog.NewHandler(logger), + hlog.NewHandler(log.Logger), func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(internal.RequestContext(r.Context())) @@ -220,39 +216,39 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str // Block forever var err error if internal.IsUnixSocket(bindAddr) { - logger.Info().Msgf("listening on unix socket %s", bindAddr) + log.Info().Msgf("listening on unix socket %s", bindAddr) listener := unixSocketListener(bindAddr) err = http.Serve(listener, srv) } else { if tlsCert != "" && tlsKey != "" { - logger.Info().Msgf("listening TLS on %s", bindAddr) + log.Info().Msgf("listening TLS on %s", bindAddr) err = http.ListenAndServeTLS(bindAddr, tlsCert, tlsKey, srv) } else { - logger.Info().Msgf("listening on %s", bindAddr) + log.Info().Msgf("listening on %s", bindAddr) err = http.ListenAndServe(bindAddr, srv) } } if err != nil { sentry.CaptureException(err) // TODO: Fatal() calls os.Exit. Will that give time for sentry.Flush() to run? - logger.Fatal().Err(err).Msg("failed to listen and serve") + log.Fatal().Err(err).Msg("failed to listen and serve") } } func unixSocketListener(bindAddr string) net.Listener { err := os.Remove(bindAddr) if err != nil && !errors.Is(err, fs.ErrNotExist) { - logger.Fatal().Err(err).Msg("failed to remove existing unix socket") + log.Fatal().Err(err).Msg("failed to remove existing unix socket") } listener, err := net.Listen("unix", bindAddr) if err != nil { - logger.Fatal().Err(err).Msg("failed to serve unix socket") + log.Fatal().Err(err).Msg("failed to serve unix socket") } // least permissions and work out of box (-w--w--w-); could be extracted as // env variable if needed err = os.Chmod(bindAddr, 0222) if err != nil { - logger.Fatal().Err(err).Msg("failed to set unix socket permissions") + log.Fatal().Err(err).Msg("failed to set unix socket permissions") } return listener }