Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(backend): refactor event, attachment & span cleanup #1861

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 89 additions & 127 deletions backend/cleanup/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ type Attachment struct {
Location string `json:"location"`
}

type StaleData struct {
AppID string `json:"app_id"`
RetentionDate time.Time `json:"retention_date"`
EventIDs []string `json:"event_ids"`
SpanIDs []string `json:"span_ids"`
Attachments []Attachment `json:"attachments"`
}

type AppRetention struct {
AppID string `json:"app_id"`
Threshold time.Time `json:"threshold"`
Expand Down Expand Up @@ -72,61 +64,17 @@ func DeleteStaleData(ctx context.Context) {
// delete sessions
deleteSessions(ctx, appRetentions)

// delete events, spans and attachments
staleData := fetchStaleData(ctx, appRetentions)

for _, st := range staleData {
// Delete attachments from object storage
if len(st.Attachments) > 0 {
fmt.Printf("Deleting %v attachments for app_id: %v\n", len(st.Attachments), st.AppID)

err := deleteAttachments(ctx, st)

if err != nil {
fmt.Printf("Failed to delete %v attachments for app_id: %v, err: %v\n", len(st.Attachments), st.AppID, err)
return
}

fmt.Printf("Deleted %v attachments for app_id: %v\n", len(st.Attachments), st.AppID)
}

// Delete events from clickhouse
if len(st.EventIDs) > 0 {
fmt.Printf("Deleting %v events from clickhouse for app_id: %v\n", len(st.EventIDs), st.AppID)

deleteStmt := sqlf.DeleteFrom("default.events").
Where("app_id = ?", st.AppID).
Where("timestamp < ?", st.RetentionDate)

if err := server.Server.ChPool.Exec(ctx, deleteStmt.String(), deleteStmt.Args()...); err != nil {
fmt.Printf("Failed to delete %v events from clickhouse for app_id: %v, err: %v\n", len(st.EventIDs), st.AppID, err)
return
}

fmt.Printf("Deleted %v events from clickhouse for app_id: %v\n", len(st.EventIDs), st.AppID)
}

// Delete spans from clickhouse
if len(st.SpanIDs) > 0 {
fmt.Printf("Deleting %v spans from clickhouse for app_id: %v\n", len(st.SpanIDs), st.AppID)

deleteStmt := sqlf.DeleteFrom("spans").
Where("app_id = ?", st.AppID).
Where("start_time < ?", st.RetentionDate)

if err := server.Server.ChPool.Exec(ctx, deleteStmt.String(), deleteStmt.Args()...); err != nil {
fmt.Printf("Failed to delete %v spans from clickhouse for app_id: %v, err: %v\n", len(st.SpanIDs), st.AppID, err)
return
}
// delete spans
deleteSpans(ctx, appRetentions)

fmt.Printf("Deleted %v spans from clickhouse for app_id: %v\n", len(st.SpanIDs), st.AppID)
}
}
// delete events and attachments
deleteEventsAndAttachments(ctx, appRetentions)

staleDataJson, _ := json.MarshalIndent(staleData, "", " ")
fmt.Printf("Succesfully deleted stale data %v\n", string(staleDataJson))
fmt.Println("Finished cleaning up stale data")
}

// deleteStaleShortenedFilters deletes stale shortened filters that
// have passed the expiry threshold
func deleteStaleShortenedFilters(ctx context.Context) {
threshold := time.Now().Add(-60 * time.Minute) // 1 hour expiry
stmt := sqlf.PostgreSQL.DeleteFrom("public.short_filters").
Expand Down Expand Up @@ -341,119 +289,99 @@ func deleteSessions(ctx context.Context, retentions []AppRetention) {
}
}

// fetchAppRetentions fetches retention period
// for each app.
func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) {
// Fetch retention periods for each app
stmt := sqlf.PostgreSQL.
From("app_settings").
Select("app_id").
Select("retention_period")

defer stmt.Close()

rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return
}

for rows.Next() {
var retention AppRetention
var period int
// deleteSpans deletes stale spans for each
// app's retention threshold.
func deleteSpans(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("spans").
Where("app_id = toUUID(?)", retention.AppID).
Where("start_time < ?", retention.Threshold)

if err := rows.Scan(&retention.AppID, &period); err != nil {
fmt.Printf("Failed to scan row: %v\n", err)
if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale spans for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

retention.Threshold = time.Now().UTC().AddDate(0, 0, -period)
retentions = append(retentions, retention)
stmt.Close()
}

err = rows.Err()

return
if errCount < 1 {
fmt.Println("Successfully deleted stale spans")
}
}

func fetchStaleData(ctx context.Context, retentions []AppRetention) (staleData []StaleData) {
// deleteEventsAndAttachments deletes stale events and their attachments for each
// app's retention threshold.
func deleteEventsAndAttachments(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
// Fetch stale events from ClickHouse
fetchEventsStmt := sqlf.Select("id").

// Fetch attachments for current app's stale events
fetchAttachmentsStmt := sqlf.
Select("attachments").
From("default.events").
From("events").
Where("app_id = toUUID(?)", retention.AppID).
Where("timestamp < ?", retention.Threshold)

eventRows, err := server.Server.ChPool.Query(ctx, fetchEventsStmt.String(), fetchEventsStmt.Args()...)
attachmentRows, err := server.Server.ChPool.Query(ctx, fetchAttachmentsStmt.String(), fetchAttachmentsStmt.Args()...)
if err != nil {
fmt.Printf("Failed to fetch stale events from ClickHouse: %v\n", err)
continue
}

var staleEventIDs []string
var staleAttachments []Attachment
var eventID string
var attachmentsJSON string
var attachments []Attachment
for eventRows.Next() {
if err := eventRows.Scan(&eventID, &attachmentsJSON); err != nil {
fmt.Printf("Failed to scan event ID: %v\n", err)
var staleAttachments []Attachment
for attachmentRows.Next() {
if err := attachmentRows.Scan(&attachmentsJSON); err != nil {
fmt.Printf("Failed to scan attachment: %v\n", err)
continue
}

// If event has attachments, unmarshall it. If it can't be unmarshalled, continue. Events with
// attachments that fail unmarshalling will not be included in stale list.
if attachmentsJSON != "[]" {
if err := json.Unmarshal([]byte(attachmentsJSON), &attachments); err != nil {
fmt.Printf("Failed to unmarshal attachment JSON for event ID: %v, attachmentJSON: %v, error: %v\n", eventID, attachmentsJSON, err)
fmt.Printf("Failed to unmarshal attachment JSON attachmentJSON: %v, error: %v\n", attachmentsJSON, err)
continue
}

staleAttachments = append(staleAttachments, attachments...)
}

staleEventIDs = append(staleEventIDs, eventID)
}

// Fetch stale spans from ClickHouse
fetchSpansStmt := sqlf.Select("span_id").
From("spans").
// Delete attachments from object storage
deleteAttachments(ctx, staleAttachments)

// Delete stale events
stmt := sqlf.
DeleteFrom("events").
Where("app_id = toUUID(?)", retention.AppID).
Where("start_time < ?", retention.Threshold)
Where("timestamp < ?", retention.Threshold)

spanRows, err := server.Server.ChPool.Query(ctx, fetchSpansStmt.String(), fetchSpansStmt.Args()...)
if err != nil {
fmt.Printf("Failed to fetch stale spans from ClickHouse: %v\n", err)
if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale events for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

var staleSpanIDs []string
var spanID string
for spanRows.Next() {
if err := spanRows.Scan(&spanID); err != nil {
fmt.Printf("Failed to scan span ID: %v\n", err)
continue
}

staleSpanIDs = append(staleSpanIDs, spanID)
}

staleData = append(staleData, StaleData{
AppID: retention.AppID,
RetentionDate: retention.Threshold,
EventIDs: staleEventIDs,
SpanIDs: staleSpanIDs,
Attachments: staleAttachments,
})
stmt.Close()
}

return
if errCount < 1 {
fmt.Println("Successfully deleted stale events and their attachments")
}
}

func deleteAttachments(ctx context.Context, staleData StaleData) (err error) {
func deleteAttachments(ctx context.Context, attachments []Attachment) (err error) {
objectIds := []types.ObjectIdentifier{}

for _, at := range staleData.Attachments {
for _, at := range attachments {
objectIds = append(objectIds, types.ObjectIdentifier{
Key: aws.String(at.Key),
})
Expand Down Expand Up @@ -492,3 +420,37 @@ func deleteAttachments(ctx context.Context, staleData StaleData) (err error) {

return
}

// fetchAppRetentions fetches retention period
// for each app.
func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) {
// Fetch retention periods for each app
stmt := sqlf.PostgreSQL.
From("app_settings").
Select("app_id").
Select("retention_period")

defer stmt.Close()

rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return
}

for rows.Next() {
var retention AppRetention
var period int

if err := rows.Scan(&retention.AppID, &period); err != nil {
fmt.Printf("Failed to scan row: %v\n", err)
continue
}

retention.Threshold = time.Now().UTC().AddDate(0, 0, -period)
retentions = append(retentions, retention)
}

err = rows.Err()

return
}
Loading