Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogStats to snapshot status component #38

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
)

var (
snapshotName string
snapshotNamespace string
kubeconfig string
kubeconfig string
snapshotName string
snapshotNamespace string
snapshotSuccessfulLogHistoryLimit string
snapshotFailedLogHistoryLimit string
)

// oplogPushCmd represents the continuous oplog archiving procedure
Expand Down Expand Up @@ -60,13 +62,19 @@ var oplogPushCmd = &cobra.Command{
}

func init() {
oplogPushCmd.PersistentFlags().StringVarP(
&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
cmd.AddCommand(oplogPushCmd)
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotName, "snapshot-name", "", "", "Name of the snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotNamespace, "snapshot-namespace", "n", "", "Namespace of the snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
&snapshotSuccessfulLogHistoryLimit, "snapshot-successful-log-history-limit", "", "",
"Maximum number of successful log history in snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotFailedLogHistoryLimit, "snapshot-failed-log-history-limit", "", "",
"Maximum number of failed log history in snapshot")
}

func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplogPushStatsArgs) error {
Expand All @@ -83,8 +91,11 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
uplProvider.ChangeDirectory(subDir)
uploader := archive.NewStorageUploader(uplProvider)
uploader.SetKubeClient(pushArgs.kubeClient)
uploader.SetSnapshot(snapshotName, snapshotNamespace)
uploader.SetDBNode(pushArgs.dbNode)
err = uploader.SetupSnapshot(snapshotName, snapshotNamespace, snapshotSuccessfulLogHistoryLimit, snapshotFailedLogHistoryLimit)
if err != nil {
return err
}

// set up mongodb client and oplog fetcher
mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL)
Expand Down Expand Up @@ -149,16 +160,18 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
}

type oplogPushRunArgs struct {
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
dbProvider string
dbPath string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
kubeClient controllerclient.Client
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
dbProvider string
dbPath string
successfulLogHistory string
failedLogHistory string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
kubeClient controllerclient.Client
}

func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
Expand Down
86 changes: 70 additions & 16 deletions internal/databases/mongo/archive/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -222,10 +223,13 @@ type StorageUploader struct {
crypter crypto.Crypter // usages only in UploadOplogArchive
buf *bytes.Buffer

kubeClient controllerruntime.Client
snapshotName string
snapshotNamespace string
dbNode string
kubeClient controllerruntime.Client
dbNode string

snapshotName string
snapshotNamespace string
snapshotSuccessfulLogHistoryLimit int
snapshotFailedLogHistoryLimit int
}

// NewStorageUploader builds mongodb uploader.
Expand All @@ -238,9 +242,16 @@ func (su *StorageUploader) SetKubeClient(client controllerruntime.Client) {
su.kubeClient = client
}

func (su *StorageUploader) SetSnapshot(name, namespace string) {
func (su *StorageUploader) SetupSnapshot(name, namespace, successfulLog, failedLog string) error {
var err error
su.snapshotName = name
su.snapshotNamespace = namespace
su.snapshotSuccessfulLogHistoryLimit, err = strconv.Atoi(successfulLog)
if err != nil {
return err
}
su.snapshotFailedLogHistoryLimit, err = strconv.Atoi(failedLog)
return err
}

func (su *StorageUploader) SetDBNode(node string) {
Expand All @@ -250,8 +261,9 @@ func (su *StorageUploader) GetDBNode() string {
return su.dbNode
}

func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error {
func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp, uploadErr error, arch models.Archive) error {
var snapshot storageapi.Snapshot
archFileName := arch.DBNodeSpecificFileName(su.dbNode)
err := su.kubeClient.Get(context.TODO(), controllerruntime.ObjectKey{
Namespace: su.snapshotNamespace,
Name: su.snapshotName,
Expand All @@ -272,14 +284,29 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
in.Status.Components = make(map[string]storageapi.Component)
}
if _, ok := in.Status.Components[compName]; !ok {
walSegments := make([]storageapi.WalSegment, 1)
walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}

logStats := new(storageapi.LogStats)

if uploadErr != nil {
su.updateLogStatsLog(logStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
} else {
su.updateLogStatsLog(logStats, "", arch)
startTime := (&metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}).String()
logStats.Start = &startTime
}
in.Status.Components[compName] = storageapi.Component{
WalSegments: walSegments,
LogStats: logStats,
}
}
component := in.Status.Components[compName]
component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}

if uploadErr != nil {
su.updateLogStatsLog(component.LogStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
} else {
su.updateLogStatsLog(component.LogStats, "", arch)
endTime := (&metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}).String()
component.LogStats.End = &endTime
}
in.Status.Components[compName] = component

return in
Expand All @@ -288,13 +315,35 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
return err
}

// UploadOplogArchive compresses a stream and uploads it with given archive name.
func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
err := su.updateSnapshot(firstTS, lastTS)
if err != nil {
return fmt.Errorf("failed to update snapshot: %w", err)
func (su *StorageUploader) updateLogStatsLog(logStats *storageapi.LogStats, errMsg string, arch models.Archive) {
// No error found while uploading arch
if errMsg == "" {
logStats.TotalSucceededCount++
logStats.LastSucceededStats = append(logStats.LastSucceededStats, getLog(errMsg, arch))
if len(logStats.LastSucceededStats) > su.snapshotSuccessfulLogHistoryLimit {
logStats.LastSucceededStats = logStats.LastSucceededStats[1:]
}
} else {
logStats.TotalFailedCount++
logStats.LastFailedStats = append(logStats.LastFailedStats, getLog(errMsg, arch))
if len(logStats.LastFailedStats) > su.snapshotFailedLogHistoryLimit {
logStats.LastFailedStats = logStats.LastFailedStats[1:]
}
}
}

func getLog(msg string, arch models.Archive) storageapi.Log {
startTime := (&metav1.Time{Time: time.Unix(int64(arch.Start.ToBsonTS().T), 0)}).String()
endTime := (&metav1.Time{Time: time.Unix(int64(arch.End.ToBsonTS().T), 0)}).String()
return storageapi.Log{
Start: &startTime,
End: &endTime,
Error: msg,
}
}

// UploadOplogArchive compresses a stream and uploads it with given archive name.
func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
arch, err := models.NewArchive(firstTS, lastTS, su.Compression().FileExtension(), models.ArchiveTypeOplog)
if err != nil {
return fmt.Errorf("can not build archive: %w", err)
Expand All @@ -308,7 +357,12 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea
}
fileName := arch.DBNodeSpecificFileName(su.dbNode)
// providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage
return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
uploadErr := su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
err = su.updateSnapshot(firstTS, lastTS, uploadErr, arch)
if err != nil {
return fmt.Errorf("failed to update snapshot: %w\nerror from uploading archiver: %w", err, uploadErr)
}
return uploadErr
}

// UploadGap uploads mark indicating archiving gap.
Expand Down
Loading