Skip to content

Commit

Permalink
Add LogStats to snapshot status component (#38)
Browse files Browse the repository at this point in the history
Signed-off-by: sayedppqq <[email protected]>
  • Loading branch information
sayedppqq authored Dec 18, 2024
1 parent afb75ce commit 60c3f1d
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
43 changes: 28 additions & 15 deletions cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
)

var (
snapshotName string
snapshotNamespace string
kubeconfig string
kubeconfig string
snapshotName string
snapshotNamespace string
snapshotSuccessfulLogHistoryLimit string
snapshotFailedLogHistoryLimit string
)

// oplogPushCmd represents the continuous oplog archiving procedure
Expand Down Expand Up @@ -60,13 +62,19 @@ var oplogPushCmd = &cobra.Command{
}

func init() {
oplogPushCmd.PersistentFlags().StringVarP(
&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
cmd.AddCommand(oplogPushCmd)
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotName, "snapshot-name", "", "", "Name of the snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotNamespace, "snapshot-namespace", "n", "", "Namespace of the snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&kubeconfig, "kubeconfig", "", "", "Path of the kubeconfig")
&snapshotSuccessfulLogHistoryLimit, "snapshot-successful-log-history-limit", "", "",
"Maximum number of successful log history in snapshot")
oplogPushCmd.PersistentFlags().StringVarP(
&snapshotFailedLogHistoryLimit, "snapshot-failed-log-history-limit", "", "",
"Maximum number of failed log history in snapshot")
}

func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplogPushStatsArgs) error {
Expand All @@ -83,8 +91,11 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
uplProvider.ChangeDirectory(subDir)
uploader := archive.NewStorageUploader(uplProvider)
uploader.SetKubeClient(pushArgs.kubeClient)
uploader.SetSnapshot(snapshotName, snapshotNamespace)
uploader.SetDBNode(pushArgs.dbNode)
err = uploader.SetupSnapshot(snapshotName, snapshotNamespace, snapshotSuccessfulLogHistoryLimit, snapshotFailedLogHistoryLimit)
if err != nil {
return err
}

// set up mongodb client and oplog fetcher
mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL)
Expand Down Expand Up @@ -149,16 +160,18 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
}

type oplogPushRunArgs struct {
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
dbProvider string
dbPath string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
kubeClient controllerclient.Client
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
dbProvider string
dbPath string
successfulLogHistory string
failedLogHistory string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
kubeClient controllerclient.Client
}

func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
Expand Down
86 changes: 70 additions & 16 deletions internal/databases/mongo/archive/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -222,10 +223,13 @@ type StorageUploader struct {
crypter crypto.Crypter // usages only in UploadOplogArchive
buf *bytes.Buffer

kubeClient controllerruntime.Client
snapshotName string
snapshotNamespace string
dbNode string
kubeClient controllerruntime.Client
dbNode string

snapshotName string
snapshotNamespace string
snapshotSuccessfulLogHistoryLimit int
snapshotFailedLogHistoryLimit int
}

// NewStorageUploader builds mongodb uploader.
Expand All @@ -238,9 +242,16 @@ func (su *StorageUploader) SetKubeClient(client controllerruntime.Client) {
su.kubeClient = client
}

func (su *StorageUploader) SetSnapshot(name, namespace string) {
func (su *StorageUploader) SetupSnapshot(name, namespace, successfulLog, failedLog string) error {
var err error
su.snapshotName = name
su.snapshotNamespace = namespace
su.snapshotSuccessfulLogHistoryLimit, err = strconv.Atoi(successfulLog)
if err != nil {
return err
}
su.snapshotFailedLogHistoryLimit, err = strconv.Atoi(failedLog)
return err
}

func (su *StorageUploader) SetDBNode(node string) {
Expand All @@ -250,8 +261,9 @@ func (su *StorageUploader) GetDBNode() string {
return su.dbNode
}

func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error {
func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp, uploadErr error, arch models.Archive) error {
var snapshot storageapi.Snapshot
archFileName := arch.DBNodeSpecificFileName(su.dbNode)
err := su.kubeClient.Get(context.TODO(), controllerruntime.ObjectKey{
Namespace: su.snapshotNamespace,
Name: su.snapshotName,
Expand All @@ -272,14 +284,29 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
in.Status.Components = make(map[string]storageapi.Component)
}
if _, ok := in.Status.Components[compName]; !ok {
walSegments := make([]storageapi.WalSegment, 1)
walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}

logStats := new(storageapi.LogStats)

if uploadErr != nil {
su.updateLogStatsLog(logStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
} else {
su.updateLogStatsLog(logStats, "", arch)
startTime := (&metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}).String()
logStats.Start = &startTime
}
in.Status.Components[compName] = storageapi.Component{
WalSegments: walSegments,
LogStats: logStats,
}
}
component := in.Status.Components[compName]
component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}

if uploadErr != nil {
su.updateLogStatsLog(component.LogStats, fmt.Errorf("failed to push archiver %s. error: %w", archFileName, uploadErr).Error(), arch)
} else {
su.updateLogStatsLog(component.LogStats, "", arch)
endTime := (&metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}).String()
component.LogStats.End = &endTime
}
in.Status.Components[compName] = component

return in
Expand All @@ -288,13 +315,35 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
return err
}

// UploadOplogArchive compresses a stream and uploads it with given archive name.
func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
err := su.updateSnapshot(firstTS, lastTS)
if err != nil {
return fmt.Errorf("failed to update snapshot: %w", err)
func (su *StorageUploader) updateLogStatsLog(logStats *storageapi.LogStats, errMsg string, arch models.Archive) {
// No error found while uploading arch
if errMsg == "" {
logStats.TotalSucceededCount++
logStats.LastSucceededStats = append(logStats.LastSucceededStats, getLog(errMsg, arch))
if len(logStats.LastSucceededStats) > su.snapshotSuccessfulLogHistoryLimit {
logStats.LastSucceededStats = logStats.LastSucceededStats[1:]
}
} else {
logStats.TotalFailedCount++
logStats.LastFailedStats = append(logStats.LastFailedStats, getLog(errMsg, arch))
if len(logStats.LastFailedStats) > su.snapshotFailedLogHistoryLimit {
logStats.LastFailedStats = logStats.LastFailedStats[1:]
}
}
}

func getLog(msg string, arch models.Archive) storageapi.Log {
startTime := (&metav1.Time{Time: time.Unix(int64(arch.Start.ToBsonTS().T), 0)}).String()
endTime := (&metav1.Time{Time: time.Unix(int64(arch.End.ToBsonTS().T), 0)}).String()
return storageapi.Log{
Start: &startTime,
End: &endTime,
Error: msg,
}
}

// UploadOplogArchive compresses a stream and uploads it with given archive name.
func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Reader, firstTS, lastTS models.Timestamp) error {
arch, err := models.NewArchive(firstTS, lastTS, su.Compression().FileExtension(), models.ArchiveTypeOplog)
if err != nil {
return fmt.Errorf("can not build archive: %w", err)
Expand All @@ -308,7 +357,12 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea
}
fileName := arch.DBNodeSpecificFileName(su.dbNode)
// providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage
return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
uploadErr := su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
err = su.updateSnapshot(firstTS, lastTS, uploadErr, arch)
if err != nil {
return fmt.Errorf("failed to update snapshot: %w\nerror from uploading archiver: %w", err, uploadErr)
}
return uploadErr
}

// UploadGap uploads mark indicating archiving gap.
Expand Down

0 comments on commit 60c3f1d

Please sign in to comment.