From 88a805351e4e3dbe8ba51792b66f36e8c2d0331f Mon Sep 17 00:00:00 2001 From: Ben Schumacher Date: Sat, 5 Oct 2024 11:39:43 +0200 Subject: [PATCH] [MM-57991] Add log message about deleted rows from Data Retention job (#26847) --- .../opentracinglayer/opentracinglayer.go | 6 +++--- .../channels/store/retrylayer/retrylayer.go | 10 +++++----- .../channels/store/sqlstore/reaction_store.go | 20 ++++++++++++------- .../store/sqlstore/retention_policy_store.go | 6 ++++-- server/channels/store/store.go | 2 +- .../store/storetest/mocks/ReactionStore.go | 20 ++++++++++++++----- server/channels/store/storetest/post_store.go | 15 +++++++++----- .../store/storetest/preference_store.go | 3 ++- .../store/storetest/reaction_store.go | 4 +++- .../channels/store/timerlayer/timerlayer.go | 6 +++--- 10 files changed, 59 insertions(+), 33 deletions(-) diff --git a/server/channels/store/opentracinglayer/opentracinglayer.go b/server/channels/store/opentracinglayer/opentracinglayer.go index e05d3ac0cbb8..9b1f31dec0de 100644 --- a/server/channels/store/opentracinglayer/opentracinglayer.go +++ b/server/channels/store/opentracinglayer/opentracinglayer.go @@ -7648,7 +7648,7 @@ func (s *OpenTracingLayerReactionStore) DeleteAllWithEmojiName(emojiName string) return err } -func (s *OpenTracingLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error { +func (s *OpenTracingLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) { origCtx := s.Root.Store.Context() span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "ReactionStore.DeleteOrphanedRowsByIds") s.Root.Store.SetContext(newCtx) @@ -7657,13 +7657,13 @@ func (s *OpenTracingLayerReactionStore) DeleteOrphanedRowsByIds(r *model.Retenti }() defer span.Finish() - err := s.ReactionStore.DeleteOrphanedRowsByIds(r) + result, err := s.ReactionStore.DeleteOrphanedRowsByIds(r) if err != nil { span.LogFields(spanlog.Error(err)) ext.Error.Set(span, true) } - return err + return result, err } func (s *OpenTracingLayerReactionStore) ExistsOnPost(postId string, emojiName string) (bool, error) { diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index 3db4b2d39ef4..470bb123ac67 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -8699,21 +8699,21 @@ func (s *RetryLayerReactionStore) DeleteAllWithEmojiName(emojiName string) error } -func (s *RetryLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error { +func (s *RetryLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) { tries := 0 for { - err := s.ReactionStore.DeleteOrphanedRowsByIds(r) + result, err := s.ReactionStore.DeleteOrphanedRowsByIds(r) if err == nil { - return nil + return result, nil } if !isRepeatableError(err) { - return err + return result, err } tries++ if tries >= 3 { err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return err + return result, err } timepkg.Sleep(100 * timepkg.Millisecond) } diff --git a/server/channels/store/sqlstore/reaction_store.go b/server/channels/store/sqlstore/reaction_store.go index 8d5cac17beaa..f3437c704b9d 100644 --- a/server/channels/store/sqlstore/reaction_store.go +++ b/server/channels/store/sqlstore/reaction_store.go @@ -330,10 +330,10 @@ func (s SqlReactionStore) PermanentDeleteByUser(userId string) error { return nil } -func (s *SqlReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error { +func (s *SqlReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) { txn, err := s.GetMasterX().Beginx() if err != nil { - return err + return 0, err } defer finalizeTransactionX(txn, &err) @@ -343,18 +343,24 @@ func (s *SqlReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDelet sq.Eq{"PostId": r.Ids}, ) - _, err = txn.ExecBuilder(query) + sqlResult, err := txn.ExecBuilder(query) if err != nil { - return errors.Wrapf(err, "failed to delete orphaned reactions with RetentionIdsForDeletion Id=%s", r.Id) + return 0, errors.Wrapf(err, "failed to delete orphaned reactions with RetentionIdsForDeletion Id=%s", r.Id) } err = deleteFromRetentionIdsTx(txn, r.Id) if err != nil { - return err + return 0, err } if err = txn.Commit(); err != nil { - return err + return 0, err } - return nil + + rowsAffected, err := sqlResult.RowsAffected() + if err != nil { + return 0, errors.Wrap(err, "unable to retrieve rows affected") + } + + return rowsAffected, nil } func (s *SqlReactionStore) PermanentDeleteBatch(endTime int64, limit int64) (int64, error) { diff --git a/server/channels/store/sqlstore/retention_policy_store.go b/server/channels/store/sqlstore/retention_policy_store.go index 7b643bd4daad..8f2535a3b448 100644 --- a/server/channels/store/sqlstore/retention_policy_store.go +++ b/server/channels/store/sqlstore/retention_policy_store.go @@ -1152,9 +1152,11 @@ func getDeleteQueriesForMySQL(r RetentionPolicyBatchDeletionInfo, query string) return fmt.Sprintf("DELETE %s FROM %s INNER JOIN (%s) AS A ON %s", r.Table, r.Table, query, joinClause) } -func deleteFromRetentionIdsTx(txn *sqlxTxWrapper, id string) (err error) { - if _, err := txn.Exec("DELETE FROM RetentionIdsForDeletion WHERE Id = ?", id); err != nil { +func deleteFromRetentionIdsTx(txn *sqlxTxWrapper, id string) error { + _, err := txn.Exec("DELETE FROM RetentionIdsForDeletion WHERE Id = ?", id) + if err != nil { return errors.Wrap(err, "Failed to delete from RetentionIdsForDeletion") } + return nil } diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 0463e500fda1..11a281876be6 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -751,7 +751,7 @@ type ReactionStore interface { DeleteAllWithEmojiName(emojiName string) error BulkGetForPosts(postIds []string) ([]*model.Reaction, error) GetSingle(userID, postID, remoteID, emojiName string) (*model.Reaction, error) - DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error + DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) PermanentDeleteBatch(endTime int64, limit int64) (int64, error) PermanentDeleteByUser(userID string) error } diff --git a/server/channels/store/storetest/mocks/ReactionStore.go b/server/channels/store/storetest/mocks/ReactionStore.go index a5136207f8d8..5f8c2e8fa840 100644 --- a/server/channels/store/storetest/mocks/ReactionStore.go +++ b/server/channels/store/storetest/mocks/ReactionStore.go @@ -93,21 +93,31 @@ func (_m *ReactionStore) DeleteAllWithEmojiName(emojiName string) error { } // DeleteOrphanedRowsByIds provides a mock function with given fields: r -func (_m *ReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error { +func (_m *ReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) { ret := _m.Called(r) if len(ret) == 0 { panic("no return value specified for DeleteOrphanedRowsByIds") } - var r0 error - if rf, ok := ret.Get(0).(func(*model.RetentionIdsForDeletion) error); ok { + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(*model.RetentionIdsForDeletion) (int64, error)); ok { + return rf(r) + } + if rf, ok := ret.Get(0).(func(*model.RetentionIdsForDeletion) int64); ok { r0 = rf(r) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(*model.RetentionIdsForDeletion) error); ok { + r1 = rf(r) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // ExistsOnPost provides a mock function with given fields: postId, emojiName diff --git a/server/channels/store/storetest/post_store.go b/server/channels/store/storetest/post_store.go index e53e0401b00c..0b87917f3845 100644 --- a/server/channels/store/storetest/post_store.go +++ b/server/channels/store/storetest/post_store.go @@ -4015,8 +4015,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss store. require.Equal(t, 1, len(rows)) require.Equal(t, 2, len(rows[0].Ids)) // Clean up retention ids table - err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) require.NoError(t, err) + require.Equal(t, int64(0), deleted) t.Run("with pagination", func(t *testing.T) { for i := 0; i < 3; i++ { @@ -4040,8 +4041,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss store. require.Equal(t, 2, len(rows[0].Ids)) // Clean up retention ids table - err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) require.NoError(t, err) + require.Equal(t, int64(0), deleted) deleted, _, err = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2, 2, cursor) require.NoError(t, err) @@ -4053,8 +4055,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss store. require.Equal(t, 1, len(rows[0].Ids)) // Clean up retention ids table - err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) require.NoError(t, err) + require.Equal(t, int64(0), deleted) }) t.Run("with data retention policies", func(t *testing.T) { @@ -4127,8 +4130,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss store. rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000) require.NoError(t, err) for _, row := range rows { - err = ss.Reaction().DeleteOrphanedRowsByIds(row) + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(row) require.NoError(t, err) + require.Equal(t, int64(0), deleted) } }) @@ -4203,8 +4207,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss store. // Clean up retention ids table for _, row := range rows { - err = ss.Reaction().DeleteOrphanedRowsByIds(row) + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(row) require.NoError(t, err) + require.Equal(t, int64(0), deleted) } }) } diff --git a/server/channels/store/storetest/preference_store.go b/server/channels/store/storetest/preference_store.go index 0a89b5bc1b47..501305d7ecde 100644 --- a/server/channels/store/storetest/preference_store.go +++ b/server/channels/store/storetest/preference_store.go @@ -390,8 +390,9 @@ func testPreferenceDeleteOrphanedRows(t *testing.T, rctx request.CTX, ss store.S require.Equal(t, 1, len(rows)) // Clean up retention ids table - err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) + deleted, err := ss.Reaction().DeleteOrphanedRowsByIds(rows[0]) require.NoError(t, err) + require.Equal(t, int64(0), deleted) _, nErr = ss.Preference().DeleteOrphanedRows(limit) assert.NoError(t, nErr) diff --git a/server/channels/store/storetest/reaction_store.go b/server/channels/store/storetest/reaction_store.go index 6b1ba3bf987c..e146aaaeeb16 100644 --- a/server/channels/store/storetest/reaction_store.go +++ b/server/channels/store/storetest/reaction_store.go @@ -714,8 +714,10 @@ func testReactionStorePermanentDeleteBatch(t *testing.T, rctx request.CTX, ss st require.Contains(t, rows[0].Ids, olderPost.Id) for _, row := range rows { - err = ss.Reaction().DeleteOrphanedRowsByIds(row) + var deleted int64 + deleted, err = ss.Reaction().DeleteOrphanedRowsByIds(row) require.NoError(t, err) + require.Equal(t, int64(2), deleted) } rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000) diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index 61dde64b5bc6..8ab2c0f69677 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -6905,10 +6905,10 @@ func (s *TimerLayerReactionStore) DeleteAllWithEmojiName(emojiName string) error return err } -func (s *TimerLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error { +func (s *TimerLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) (int64, error) { start := time.Now() - err := s.ReactionStore.DeleteOrphanedRowsByIds(r) + result, err := s.ReactionStore.DeleteOrphanedRowsByIds(r) elapsed := float64(time.Since(start)) / float64(time.Second) if s.Root.Metrics != nil { @@ -6918,7 +6918,7 @@ func (s *TimerLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsF } s.Root.Metrics.ObserveStoreMethodDuration("ReactionStore.DeleteOrphanedRowsByIds", success, elapsed) } - return err + return result, err } func (s *TimerLayerReactionStore) ExistsOnPost(postId string, emojiName string) (bool, error) {