From 2ed823f8e36e1fac07629e8668db3d793cce2dff Mon Sep 17 00:00:00 2001 From: Anup Cowkur Date: Wed, 5 Mar 2025 13:43:32 +0530 Subject: [PATCH] chore(backend): refactor event, attachment & span cleanup closes #1840 --- backend/cleanup/cleanup/cleanup.go | 216 ++++++++++++----------------- 1 file changed, 89 insertions(+), 127 deletions(-) diff --git a/backend/cleanup/cleanup/cleanup.go b/backend/cleanup/cleanup/cleanup.go index 91eaacb42..569b21585 100644 --- a/backend/cleanup/cleanup/cleanup.go +++ b/backend/cleanup/cleanup/cleanup.go @@ -24,14 +24,6 @@ type Attachment struct { Location string `json:"location"` } -type StaleData struct { - AppID string `json:"app_id"` - RetentionDate time.Time `json:"retention_date"` - EventIDs []string `json:"event_ids"` - SpanIDs []string `json:"span_ids"` - Attachments []Attachment `json:"attachments"` -} - type AppRetention struct { AppID string `json:"app_id"` Threshold time.Time `json:"threshold"` @@ -72,61 +64,17 @@ func DeleteStaleData(ctx context.Context) { // delete sessions deleteSessions(ctx, appRetentions) - // delete events, spans and attachments - staleData := fetchStaleData(ctx, appRetentions) - - for _, st := range staleData { - // Delete attachments from object storage - if len(st.Attachments) > 0 { - fmt.Printf("Deleting %v attachments for app_id: %v\n", len(st.Attachments), st.AppID) - - err := deleteAttachments(ctx, st) - - if err != nil { - fmt.Printf("Failed to delete %v attachments for app_id: %v, err: %v\n", len(st.Attachments), st.AppID, err) - return - } - - fmt.Printf("Deleted %v attachments for app_id: %v\n", len(st.Attachments), st.AppID) - } - - // Delete events from clickhouse - if len(st.EventIDs) > 0 { - fmt.Printf("Deleting %v events from clickhouse for app_id: %v\n", len(st.EventIDs), st.AppID) - - deleteStmt := sqlf.DeleteFrom("default.events"). - Where("app_id = ?", st.AppID). - Where("timestamp < ?", st.RetentionDate) - - if err := server.Server.ChPool.Exec(ctx, deleteStmt.String(), deleteStmt.Args()...); err != nil { - fmt.Printf("Failed to delete %v events from clickhouse for app_id: %v, err: %v\n", len(st.EventIDs), st.AppID, err) - return - } - - fmt.Printf("Deleted %v events from clickhouse for app_id: %v\n", len(st.EventIDs), st.AppID) - } - - // Delete spans from clickhouse - if len(st.SpanIDs) > 0 { - fmt.Printf("Deleting %v spans from clickhouse for app_id: %v\n", len(st.SpanIDs), st.AppID) - - deleteStmt := sqlf.DeleteFrom("spans"). - Where("app_id = ?", st.AppID). - Where("start_time < ?", st.RetentionDate) - - if err := server.Server.ChPool.Exec(ctx, deleteStmt.String(), deleteStmt.Args()...); err != nil { - fmt.Printf("Failed to delete %v spans from clickhouse for app_id: %v, err: %v\n", len(st.SpanIDs), st.AppID, err) - return - } + // delete spans + deleteSpans(ctx, appRetentions) - fmt.Printf("Deleted %v spans from clickhouse for app_id: %v\n", len(st.SpanIDs), st.AppID) - } - } + // delete events and attachments + deleteEventsAndAttachments(ctx, appRetentions) - staleDataJson, _ := json.MarshalIndent(staleData, "", " ") - fmt.Printf("Succesfully deleted stale data %v\n", string(staleDataJson)) + fmt.Println("Finished cleaning up stale data") } +// deleteStaleShortenedFilters deletes stale shortened filters that +// have passed the expiry threshold func deleteStaleShortenedFilters(ctx context.Context) { threshold := time.Now().Add(-60 * time.Minute) // 1 hour expiry stmt := sqlf.PostgreSQL.DeleteFrom("public.short_filters"). @@ -341,63 +289,56 @@ func deleteSessions(ctx context.Context, retentions []AppRetention) { } } -// fetchAppRetentions fetches retention period -// for each app. -func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) { - // Fetch retention periods for each app - stmt := sqlf.PostgreSQL. - From("app_settings"). - Select("app_id"). - Select("retention_period") - - defer stmt.Close() - - rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...) - if err != nil { - return - } - - for rows.Next() { - var retention AppRetention - var period int +// deleteSpans deletes stale spans for each +// app's retention threshold. +func deleteSpans(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("spans"). + Where("app_id = toUUID(?)", retention.AppID). + Where("start_time < ?", retention.Threshold) - if err := rows.Scan(&retention.AppID, &period); err != nil { - fmt.Printf("Failed to scan row: %v\n", err) + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale spans for app id %q: %v\n", retention.AppID, err) + stmt.Close() continue } - retention.Threshold = time.Now().UTC().AddDate(0, 0, -period) - retentions = append(retentions, retention) + stmt.Close() } - err = rows.Err() - - return + if errCount < 1 { + fmt.Println("Successfully deleted stale spans") + } } -func fetchStaleData(ctx context.Context, retentions []AppRetention) (staleData []StaleData) { +// deleteEventsAndAttachments deletes stale events and their attachments for each +// app's retention threshold. +func deleteEventsAndAttachments(ctx context.Context, retentions []AppRetention) { + errCount := 0 for _, retention := range retentions { - // Fetch stale events from ClickHouse - fetchEventsStmt := sqlf.Select("id"). + + // Fetch attachments for current app's stale events + fetchAttachmentsStmt := sqlf. Select("attachments"). - From("default.events"). + From("events"). Where("app_id = toUUID(?)", retention.AppID). Where("timestamp < ?", retention.Threshold) - eventRows, err := server.Server.ChPool.Query(ctx, fetchEventsStmt.String(), fetchEventsStmt.Args()...) + attachmentRows, err := server.Server.ChPool.Query(ctx, fetchAttachmentsStmt.String(), fetchAttachmentsStmt.Args()...) if err != nil { fmt.Printf("Failed to fetch stale events from ClickHouse: %v\n", err) continue } - var staleEventIDs []string - var staleAttachments []Attachment - var eventID string var attachmentsJSON string var attachments []Attachment - for eventRows.Next() { - if err := eventRows.Scan(&eventID, &attachmentsJSON); err != nil { - fmt.Printf("Failed to scan event ID: %v\n", err) + var staleAttachments []Attachment + for attachmentRows.Next() { + if err := attachmentRows.Scan(&attachmentsJSON); err != nil { + fmt.Printf("Failed to scan attachment: %v\n", err) continue } @@ -405,55 +346,42 @@ func fetchStaleData(ctx context.Context, retentions []AppRetention) (staleData [ // attachments that fail unmarshalling will not be included in stale list. if attachmentsJSON != "[]" { if err := json.Unmarshal([]byte(attachmentsJSON), &attachments); err != nil { - fmt.Printf("Failed to unmarshal attachment JSON for event ID: %v, attachmentJSON: %v, error: %v\n", eventID, attachmentsJSON, err) + fmt.Printf("Failed to unmarshal attachment JSON attachmentJSON: %v, error: %v\n", attachmentsJSON, err) continue } staleAttachments = append(staleAttachments, attachments...) } - - staleEventIDs = append(staleEventIDs, eventID) } - // Fetch stale spans from ClickHouse - fetchSpansStmt := sqlf.Select("span_id"). - From("spans"). + // Delete attachments from object storage + deleteAttachments(ctx, staleAttachments) + + // Delete stale events + stmt := sqlf. + DeleteFrom("events"). Where("app_id = toUUID(?)", retention.AppID). - Where("start_time < ?", retention.Threshold) + Where("timestamp < ?", retention.Threshold) - spanRows, err := server.Server.ChPool.Query(ctx, fetchSpansStmt.String(), fetchSpansStmt.Args()...) - if err != nil { - fmt.Printf("Failed to fetch stale spans from ClickHouse: %v\n", err) + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale events for app id %q: %v\n", retention.AppID, err) + stmt.Close() continue } - var staleSpanIDs []string - var spanID string - for spanRows.Next() { - if err := spanRows.Scan(&spanID); err != nil { - fmt.Printf("Failed to scan span ID: %v\n", err) - continue - } - - staleSpanIDs = append(staleSpanIDs, spanID) - } - - staleData = append(staleData, StaleData{ - AppID: retention.AppID, - RetentionDate: retention.Threshold, - EventIDs: staleEventIDs, - SpanIDs: staleSpanIDs, - Attachments: staleAttachments, - }) + stmt.Close() } - return + if errCount < 1 { + fmt.Println("Successfully deleted stale events and their attachments") + } } -func deleteAttachments(ctx context.Context, staleData StaleData) (err error) { +func deleteAttachments(ctx context.Context, attachments []Attachment) (err error) { objectIds := []types.ObjectIdentifier{} - for _, at := range staleData.Attachments { + for _, at := range attachments { objectIds = append(objectIds, types.ObjectIdentifier{ Key: aws.String(at.Key), }) @@ -492,3 +420,37 @@ func deleteAttachments(ctx context.Context, staleData StaleData) (err error) { return } + +// fetchAppRetentions fetches retention period +// for each app. +func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) { + // Fetch retention periods for each app + stmt := sqlf.PostgreSQL. + From("app_settings"). + Select("app_id"). + Select("retention_period") + + defer stmt.Close() + + rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...) + if err != nil { + return + } + + for rows.Next() { + var retention AppRetention + var period int + + if err := rows.Scan(&retention.AppID, &period); err != nil { + fmt.Printf("Failed to scan row: %v\n", err) + continue + } + + retention.Threshold = time.Now().UTC().AddDate(0, 0, -period) + retentions = append(retentions, retention) + } + + err = rows.Err() + + return +}