From ee510cdb9c71699ced7ec78587f7dc04ddd782d3 Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Wed, 31 Jul 2024 09:00:36 -0400 Subject: [PATCH 1/4] add time measurements in cli, engine, log file parsing --- cli/cli/commands/service/logs/logs.go | 8 ++++++++ cli/cli/scripts/build.sh | 9 +++++---- .../per_week_stream_logs_strategy.go | 7 +++++++ .../engine/server/engine_connect_server_service.go | 8 ++++++++ engine/server/go.mod | 2 +- engine/server/go.sum | 4 ++-- 6 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cli/cli/commands/service/logs/logs.go b/cli/cli/commands/service/logs/logs.go index 550cb5df0b..1c7e52a0cf 100644 --- a/cli/cli/commands/service/logs/logs.go +++ b/cli/cli/commands/service/logs/logs.go @@ -27,6 +27,7 @@ import ( "os" "os/signal" "strconv" + "time" ) const ( @@ -262,13 +263,16 @@ func run( interruptChan := make(chan os.Signal, interruptChanBufferSize) signal.Notify(interruptChan, os.Interrupt) + var totalLogPrintDuration time.Duration for { select { case serviceLogsStreamContent, isChanOpen := <-serviceLogsStreamContentChan: if !isChanOpen { + logrus.Infof("CLI [logs.txt] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } + startTime := time.Now() notFoundServiceUuids := serviceLogsStreamContent.GetNotFoundServiceUuids() for notFoundServiceUuid := range notFoundServiceUuids { @@ -287,10 +291,14 @@ func run( out.PrintOutLn(fmt.Sprintf("[%v] %v", colorPrinter(serviceIdentifier), serviceLog.GetContent())) } } + endTime := time.Now() + totalLogPrintDuration = endTime.Sub(startTime) case <-interruptChan: logrus.Debugf("Received signal interruption in service logs Kurtosis CLI command") + logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } + logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) } } diff --git a/cli/cli/scripts/build.sh b/cli/cli/scripts/build.sh index 0f8401c209..db8f5d7443 100755 --- a/cli/cli/scripts/build.sh +++ b/cli/cli/scripts/build.sh @@ -97,10 +97,11 @@ fi exit 1 fi # Executing goreleaser v1.26.2 without needing to install it - if ! curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 DISTRIBUTION=oss bash -s -- ${goreleaser_verb_and_flags}; then - echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2 - exit 1 - fi +# if ! curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 DISTRIBUTION=oss bash -s -- ${goreleaser_verb_and_flags}; then + if ! GORELEASER_CURRENT_TAG=$(cat $root_dirpath/version.txt) goreleaser ${goreleaser_verb_and_flags}; then + echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2 + exit 1 + fi ) # Final verification diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index 755df67123..f1117e80d9 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -183,14 +183,17 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { + var totalLogFileReadDuration time.Duration for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) return nil default: jsonLogStr, err := getCompleteJsonLogString(logsReader) if isValidJsonEnding(jsonLogStr) { + startTime := time.Now() jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) @@ -198,13 +201,17 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { return err } + endTime := time.Now() + totalLogFileReadDuration += endTime.Sub(startTime) } if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) return nil } else { + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) return err } } diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 3fc908e916..1921e0725d 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -347,6 +347,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c } }() + var totalLogStreamDuration time.Duration for { select { //stream case @@ -354,24 +355,31 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c //If the channel is closed means that the logs database client won't continue sending streams if !isChanOpen { logrus.Debug("Exiting the stream loop after receiving a close signal from the service logs by service UUID channel") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } + startTime := time.Now() getServiceLogsResponse := newLogsResponse(requestedServiceUuids, serviceLogsByServiceUuid, notFoundServiceUuids) if err := stream.Send(getServiceLogsResponse); err != nil { return stacktrace.Propagate(err, "An error occurred sending the stream logs for service logs response '%+v'", getServiceLogsResponse) } + endTime := time.Now() + totalLogStreamDuration += endTime.Sub(startTime) //client cancel ctx case case <-contextWithCancel.Done(): logrus.Debug("The user service logs stream has done") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil //error from logs database case case err, isChanOpen := <-errChan: if isChanOpen { logrus.Debug("Exiting the stream because an error from the logs database client was received through the error chan.") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return stacktrace.Propagate(err, "An error occurred streaming user service logs.") } logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } } diff --git a/engine/server/go.mod b/engine/server/go.mod index f6548f984e..4660cedcc6 100644 --- a/engine/server/go.mod +++ b/engine/server/go.mod @@ -63,7 +63,7 @@ require ( github.com/kurtosis-tech/kurtosis/grpc-file-transfer/golang v0.0.0-20230803130419-099ee7a4e3dc github.com/kurtosis-tech/kurtosis/metrics-library/golang v0.0.0-20231206095907-9bdf0d02cb90 github.com/labstack/echo/v4 v4.11.3 - github.com/rs/cors v1.9.0 + github.com/rs/cors v1.11.0 github.com/spf13/afero v1.10.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/apimachinery v0.27.2 diff --git a/engine/server/go.sum b/engine/server/go.sum index b7fd1e32d1..c027f4c074 100644 --- a/engine/server/go.sum +++ b/engine/server/go.sum @@ -312,8 +312,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= -github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po= +github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA= github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= From f8811264790fd070e0af8a3372d8962b83cd4462 Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Wed, 31 Jul 2024 10:39:56 -0400 Subject: [PATCH 2/4] add more granular measurements --- cli/cli/commands/service/logs/logs.go | 1 - .../per_week_stream_logs_strategy.go | 92 ++++++++++++++++++- .../server/engine_connect_server_service.go | 11 ++- 3 files changed, 97 insertions(+), 7 deletions(-) diff --git a/cli/cli/commands/service/logs/logs.go b/cli/cli/commands/service/logs/logs.go index 1c7e52a0cf..028039224c 100644 --- a/cli/cli/commands/service/logs/logs.go +++ b/cli/cli/commands/service/logs/logs.go @@ -298,7 +298,6 @@ func run( logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } - logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) } } diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index f1117e80d9..ba7ac5e388 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -89,10 +89,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( }() if shouldReturnAllLogs { + startTime := time.Now() if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) return } + logrus.Infof("TOTAL TIME IN STREAM ALL LOGS FUNCTION: %v", time.Now().Sub(startTime)) } else { if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid) @@ -184,23 +186,38 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { var totalLogFileReadDuration time.Duration + var totalTimeToGetJsonStrings time.Duration + var totalTimeToSendJsonLogs time.Duration + var totalTimeProcessLinesInSend time.Duration + var totalTimeSendingAcrossChannelInSend time.Duration for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return nil default: + startTime := time.Now() + + getJsonStartTime := time.Now() jsonLogStr, err := getCompleteJsonLogString(logsReader) + totalTimeToGetJsonStrings += time.Now().Sub(getJsonStartTime) + if isValidJsonEnding(jsonLogStr) { - startTime := time.Now() jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } - if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + + sendJsonLogLineStartTime := time.Now() + err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + if err != nil { return err } + totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) + totalTimeProcessLinesInSend += sendDuration + totalTimeProcessLinesInSend += processDuration + endTime := time.Now() totalLogFileReadDuration += endTime.Sub(startTime) } @@ -208,10 +225,10 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return nil } else { - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return err } } @@ -354,6 +371,63 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } +func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( + jsonLog JsonLog, + logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + serviceUuid service.ServiceUUID, + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) { + // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} + // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", + // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} + + var processDuration time.Duration + var sendDuration time.Duration + + processStart := time.Now() + // Then extract the actual log message using the vectors log field + logMsgStr, found := jsonLog[volume_consts.LogLabel] + if !found { + return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration + } + + // Extract the timestamp using vectors timestamp field + logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) + if err != nil { + return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration + } + logLine := logline.NewLogLine(logMsgStr, *logTimestamp) + + // Then filter by checking if the log message is valid based on requested filters + validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + } + if !validLogLine { + return nil, sendDuration, processDuration + } + + // ensure this log line is within the retention period if it has a timestamp + withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + } + if !withinRetentionPeriod { + return nil, sendDuration, processDuration + } + + // send the log line + logLines := []logline.LogLine{*logLine} + userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + serviceUuid: logLines, + } + processDuration = time.Now().Sub(processStart) + + sendStart := time.Now() + logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + sendDuration = time.Now().Sub(sendStart) + return nil, sendDuration, processDuration +} + // Returns true if [logLine] has no timestamp func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) { retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek) @@ -434,3 +508,11 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) { } return ×tamp, nil } + +func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) { + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction) +} diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 1921e0725d..a3f3c84225 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -348,6 +348,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c }() var totalLogStreamDuration time.Duration + var counter int for { select { //stream case @@ -358,12 +359,20 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } - startTime := time.Now() + // print out num log lines every 100 lines times + //for serviceUUID, logs := range serviceLogsByServiceUuid { + // if counter%100 == 0 { + // logrus.Infof("NUM LOG LINES FOR SERVICE '%v' CHECK IN ENGINE CONNECT SERVICE: %v", serviceUUID, len(logs)) + // } + //} + + startTime := time.Now() getServiceLogsResponse := newLogsResponse(requestedServiceUuids, serviceLogsByServiceUuid, notFoundServiceUuids) if err := stream.Send(getServiceLogsResponse); err != nil { return stacktrace.Propagate(err, "An error occurred sending the stream logs for service logs response '%+v'", getServiceLogsResponse) } + counter += 1 endTime := time.Now() totalLogStreamDuration += endTime.Sub(startTime) //client cancel ctx case From 487e5d1714b0a4f3884ce184d11af3825623fe4f Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Thu, 1 Aug 2024 01:26:41 -0400 Subject: [PATCH 3/4] use buffered log channel --- .../persistent_volume_logs_database_client.go | 5 +- .../per_week_stream_logs_strategy.go | 126 +++++++++++++++--- 2 files changed, 108 insertions(+), 23 deletions(-) diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index 46c00f00d2..c8ef39d57d 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -13,7 +13,8 @@ import ( ) const ( - oneSenderAdded = 1 + logLineBufferSize = 100 + oneSenderAdded = 1 ) // persistentVolumeLogsDatabaseClient pulls logs from a Docker volume the engine is mounted to @@ -63,7 +64,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( streamErrChan := make(chan error) // this channel will return the user service log lines by service UUID - logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine) + logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) // MAKE IT A BUFFERED CHANNEL SEE HOW THAT IMPROVES THINGS wgSenders := &sync.WaitGroup{} for serviceUuid := range userServiceUuids { diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index ba7ac5e388..c0d339dd2f 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -185,16 +185,29 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { + var totalLogFileReadDuration time.Duration var totalTimeToGetJsonStrings time.Duration var totalTimeToSendJsonLogs time.Duration + + var totalTimeToSendLogsGranular time.Duration var totalTimeProcessLinesInSend time.Duration - var totalTimeSendingAcrossChannelInSend time.Duration + var totalTimestampParsing time.Duration + var totalFilterCheck time.Duration + var totalRetentionCheck time.Duration + + var ltm SendLogLineTimeMeasurements for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil default: startTime := time.Now() @@ -210,13 +223,17 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( } sendJsonLogLineStartTime := time.Now() - err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) if err != nil { return err } totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) - totalTimeProcessLinesInSend += sendDuration - totalTimeProcessLinesInSend += processDuration + + totalTimeToSendLogsGranular += ltm.sendDuration + totalTimeProcessLinesInSend += ltm.processDuration + totalTimestampParsing += ltm.parseTimestampDuratoin + totalFilterCheck += ltm.filterCheckDuration + totalRetentionCheck += ltm.retentionPeriodCheckDuration endTime := time.Now() totalLogFileReadDuration += endTime.Sub(startTime) @@ -225,10 +242,22 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil } else { - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return err } } @@ -371,61 +400,113 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } +type SendLogLineTimeMeasurements struct { + processDuration time.Duration + sendDuration time.Duration + parseTimestampDuratoin time.Duration + filterCheckDuration time.Duration + retentionPeriodCheckDuration time.Duration +} + func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( jsonLog JsonLog, logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, - conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) { + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, SendLogLineTimeMeasurements) { // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} - var processDuration time.Duration var sendDuration time.Duration + var parseTimestampDuration time.Duration + var filterCheckDuration time.Duration + var retentionPeriodCheckDuration time.Duration processStart := time.Now() // Then extract the actual log message using the vectors log field logMsgStr, found := jsonLog[volume_consts.LogLabel] if !found { - return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration + return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + timestampStart := time.Now() // Extract the timestamp using vectors timestamp field logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) if err != nil { - return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } logLine := logline.NewLogLine(logMsgStr, *logTimestamp) + parseTimestampDuration += time.Now().Sub(timestampStart) + filterStart := time.Now() // Then filter by checking if the log message is valid based on requested filters validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) if err != nil { - return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } if !validLogLine { - return nil, sendDuration, processDuration + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + filterCheckDuration += time.Now().Sub(filterStart) + retentionCheckStart := time.Now() // ensure this log line is within the retention period if it has a timestamp withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) if err != nil { - return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{} } if !withinRetentionPeriod { - return nil, sendDuration, processDuration + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + retentionPeriodCheckDuration += time.Now().Sub(retentionCheckStart) // send the log line logLines := []logline.LogLine{*logLine} userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ serviceUuid: logLines, } - processDuration = time.Now().Sub(processStart) + processDuration += time.Now().Sub(processStart) sendStart := time.Now() logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap - sendDuration = time.Now().Sub(sendStart) - return nil, sendDuration, processDuration + sendDuration += time.Now().Sub(sendStart) + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } // Returns true if [logLine] has no timestamp @@ -509,10 +590,13 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) { return ×tamp, nil } -func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) { +func logTimes(totalDuration, getLineDuration, totalSendLineDuration time.Duration, sendLogLineTM SendLogLineTimeMeasurements) { logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration) logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration) logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", sendLogLineTM.sendDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", sendLogLineTM.processDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PARSE TIMESTAMPS: %v", sendLogLineTM.parseTimestampDuratoin) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO FILTER LINES BASED ON REGEXES: %v", sendLogLineTM.filterCheckDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO CHECK RETENTION PERIOD: %v", sendLogLineTM.retentionPeriodCheckDuration) } From f316abec01b89fc7e652346b34e4c9ff1e76c546 Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Thu, 1 Aug 2024 02:47:10 -0400 Subject: [PATCH 4/4] batch send log lines --- .../persistent_volume_logs_database_client.go | 4 +- ...istent_volume_logs_database_client_test.go | 1554 ++++++++--------- .../per_week_stream_logs_strategy.go | 46 +- 3 files changed, 803 insertions(+), 801 deletions(-) diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index c8ef39d57d..1c44d1ef41 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -13,7 +13,7 @@ import ( ) const ( - logLineBufferSize = 100 + logLineBufferSize = 300 oneSenderAdded = 1 ) @@ -64,7 +64,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( streamErrChan := make(chan error) // this channel will return the user service log lines by service UUID - logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) // MAKE IT A BUFFERED CHANNEL SEE HOW THAT IMPROVES THINGS + logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) wgSenders := &sync.WaitGroup{} for serviceUuid := range userServiceUuids { diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index 82f14c00e9..801697aafa 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -1,21 +1,7 @@ package persistent_volume import ( - "context" - "fmt" - "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface" "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave" - "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_filesystem" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/logline" - "github.com/kurtosis-tech/stacktrace" - "github.com/stretchr/testify/require" - "strconv" - "strings" - "testing" "time" ) @@ -56,772 +42,774 @@ const ( defaultNumLogLines = 0 ) -func TestStreamUserServiceLogs_WithFilters(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 2, - testUserService2Uuid: 2, - testUserService3Uuid: 2, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) - secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) - regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - *secondTextFilter, - *regexFilter, - } - - expectedFirstLogLine := "Starting feature 'runs idempotently'" - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createFilledPerFileFilesystem() - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 2, - testUserService2Uuid: 2, - testUserService3Uuid: 2, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) - secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) - regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - *secondTextFilter, - *regexFilter, - } - - expectedFirstLogLine := "Starting feature 'runs idempotently'" - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createFilledPerWeekFilesystem(startingWeek) - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } - - require.NoError(t, testEvaluationErr) -} - -func TestStreamUserServiceLogs_NoLogsFromPersistentVolume(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 0, - testUserService2Uuid: 0, - testUserService3Uuid: 0, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - } - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createEmptyPerFileFilesystem() - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 0, - testUserService2Uuid: 0, - testUserService3Uuid: 0, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - } - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createEmptyPerWeekFilesystem(startingWeek) - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogs_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { - expectedAmountLogLines := 10_000 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - var logLines []string - - for i := 0; i <= expectedAmountLogLines; i++ { - logLines = append(logLines, logLine1) - } - - logLinesStr := strings.Join(logLines, "\n") - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { - expectedAmountLogLines := 10_000 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - var logLines []string - - for i := 0; i <= expectedAmountLogLines; i++ { - logLines = append(logLines, logLine1) - } - - logLinesStr := strings.Join(logLines, "\n") - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogs_EmptyLogLines(t *testing.T) { - expectedAmountLogLines := 0 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - logLinesStr := "" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - file1PathStr := fmt.Sprintf("%s%s/%s%s", volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) { - expectedAmountLogLines := 0 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - logLinesStr := "" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) { - expectedAmountLogLines := 8 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - week4logLines := []string{ - logLine5, - logLine6, - logLine7, - logLine8} - week3logLines := []string{ - logLine1, - logLine2, - logLine3a, - logLine3b, - logLine4} - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" - week4logLinesStr := strings.Join(week4logLines, "\n") - - formattedWeekFour := fmt.Sprintf("%02d", 4) - week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week4, err := underlyingFs.Create(week4filepath) - require.NoError(t, err) - _, err = week4.WriteString(week4logLinesStr) - require.NoError(t, err) - - formattedWeekThree := fmt.Sprintf("%02d", 3) - week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week3, err := underlyingFs.Create(week3filepath) - require.NoError(t, err) - _, err = week3.WriteString(week3logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } - -} - -func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { - expectedAmountLogLines := 8 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - week4logLines := []string{ - logLine3b, - logLine4, - logLine5, - logLine6, - logLine7, - logLine8} - week3logLines := []string{ - logLine1, - logLine2, - logLine3a} - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" - week4logLinesStr := strings.Join(week4logLines, "\n") + "\n" - - formattedWeekFour := fmt.Sprintf("%02d", 4) - week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week4, err := underlyingFs.Create(week4filepath) - require.NoError(t, err) - _, err = week4.WriteString(week4logLinesStr) - require.NoError(t, err) - - formattedWeekThree := fmt.Sprintf("%02d", 3) - week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week3, err := underlyingFs.Create(week3filepath) - require.NoError(t, err) - _, err = week3.WriteString(week3logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) { - expectedAmountLogLines := 3 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - - timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} - timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file, err := underlyingFs.Create(filepath) - require.NoError(t, err) - _, err = file.WriteString(timestampedLogLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) - require.NoError(t, err) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - for _, logLine := range serviceLogLines { - require.Equal(t, expectedTime, logLine.GetTimestamp()) - } - } -} - -func TestStreamUserServiceLogsPerFileReturnsTimestampedLogLines(t *testing.T) { - expectedAmountLogLines := 3 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - - timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} - timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - filepath := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file, err := underlyingFs.Create(filepath) - require.NoError(t, err) - _, err = file.WriteString(timestampedLogLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) - require.NoError(t, err) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedTime, serviceLogLines[0].GetTimestamp()) - } -} - -// ==================================================================================================== -// -// Private helper functions -// -// ==================================================================================================== -func executeStreamCallAndGetReceivedServiceLogLines( - t *testing.T, - logLinesFilters []logline.LogLineFilter, - userServiceUuids map[service.ServiceUUID]bool, - expectedServiceAmountLogLinesByServiceUuid map[service.ServiceUUID]int, - shouldFollowLogs bool, - underlyingFs volume_filesystem.VolumeFilesystem, - streamStrategy stream_logs_strategy.StreamLogsStrategy, -) (map[service.ServiceUUID][]logline.LogLine, error) { - ctx := context.Background() - - receivedServiceLogsByUuid := map[service.ServiceUUID][]logline.LogLine{} - - for serviceUuid := range expectedServiceAmountLogLinesByServiceUuid { - receivedServiceLogsByUuid[serviceUuid] = []logline.LogLine{} - } - - kurtosisBackend := backend_interface.NewMockKurtosisBackend(t) - - logsDatabaseClient := NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, underlyingFs, streamStrategy) - - userServiceLogsByUuidChan, errChan, receivedCancelCtxFunc, err := logsDatabaseClient.StreamUserServiceLogs(ctx, enclaveUuid, userServiceUuids, logLinesFilters, shouldFollowLogs, defaultShouldReturnAllLogs, defaultNumLogLines) - if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred getting user service logs for UUIDs '%+v' using log line filters '%v' in enclave '%v'", userServiceUuids, logLinesFilters, enclaveUuid) - } - defer func() { - if receivedCancelCtxFunc != nil { - receivedCancelCtxFunc() - } - }() - - require.NotNil(t, userServiceLogsByUuidChan, "Received a nil user service logs channel, but a non-nil value was expected") - require.NotNil(t, errChan, "Received a nil error logs channel, but a non-nil value was expected") - - shouldReceiveStream := true - for shouldReceiveStream { - select { - case <-time.Tick(testTimeOut): - return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) - case streamErr, isChanOpen := <-errChan: - if !isChanOpen { - shouldReceiveStream = false - break - } - return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") - case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: - if !isChanOpen { - shouldReceiveStream = false - break - } - - for serviceUuid, serviceLogLines := range userServiceLogsByUuid { - _, found := userServiceUuids[serviceUuid] - require.True(t, found) - - currentServiceLogLines := receivedServiceLogsByUuid[serviceUuid] - allServiceLogLines := append(currentServiceLogLines, serviceLogLines...) - receivedServiceLogsByUuid[serviceUuid] = allServiceLogLines - } - - for serviceUuid, expectedAmountLogLines := range expectedServiceAmountLogLinesByServiceUuid { - if len(receivedServiceLogsByUuid[serviceUuid]) == expectedAmountLogLines { - shouldReceiveStream = false - } else { - shouldReceiveStream = true - break - } - } - } - } - - return receivedServiceLogsByUuid, nil -} - -func createFilledPerFileFilesystem() volume_filesystem.VolumeFilesystem { - logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} - - logLinesStr := strings.Join(logLines, "\n") - - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1, _ := mapFs.Create(file1PathStr) - _, _ = file1.WriteString(logLinesStr) - - file2, _ := mapFs.Create(file2PathStr) - _, _ = file2.WriteString(logLinesStr) - - file3, _ := mapFs.Create(file3PathStr) - _, _ = file3.WriteString(logLinesStr) - - return mapFs -} - -func createFilledPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { - logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} - - logLinesStr := strings.Join(logLines, "\n") - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", week) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1, _ := mapFs.Create(file1PathStr) - _, _ = file1.WriteString(logLinesStr) - - file2, _ := mapFs.Create(file2PathStr) - _, _ = file2.WriteString(logLinesStr) - - file3, _ := mapFs.Create(file3PathStr) - _, _ = file3.WriteString(logLinesStr) - - return mapFs -} - -func createEmptyPerFileFilesystem() volume_filesystem.VolumeFilesystem { - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - _, _ = mapFs.Create(file1PathStr) - _, _ = mapFs.Create(file2PathStr) - _, _ = mapFs.Create(file3PathStr) - - return mapFs -} - -func createEmptyPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", week) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - _, _ = mapFs.Create(file1PathStr) - _, _ = mapFs.Create(file2PathStr) - _, _ = mapFs.Create(file3PathStr) - - return mapFs -} +// +//func TestStreamUserServiceLogs_WithFilters(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 2, +// testUserService2Uuid: 2, +// testUserService3Uuid: 2, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) +// secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) +// regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// *secondTextFilter, +// *regexFilter, +// } +// +// expectedFirstLogLine := "Starting feature 'runs idempotently'" +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createFilledPerFileFilesystem() +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 2, +// testUserService2Uuid: 2, +// testUserService3Uuid: 2, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) +// secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) +// regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// *secondTextFilter, +// *regexFilter, +// } +// +// expectedFirstLogLine := "Starting feature 'runs idempotently'" +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createFilledPerWeekFilesystem(startingWeek) +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +// +// require.NoError(t, testEvaluationErr) +//} +// +//func TestStreamUserServiceLogs_NoLogsFromPersistentVolume(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 0, +// testUserService2Uuid: 0, +// testUserService3Uuid: 0, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// } +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createEmptyPerFileFilesystem() +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 0, +// testUserService2Uuid: 0, +// testUserService3Uuid: 0, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// } +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createEmptyPerWeekFilesystem(startingWeek) +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogs_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { +// expectedAmountLogLines := 10_000 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// var logLines []string +// +// for i := 0; i <= expectedAmountLogLines; i++ { +// logLines = append(logLines, logLine1) +// } +// +// logLinesStr := strings.Join(logLines, "\n") +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { +// expectedAmountLogLines := 10_000 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// var logLines []string +// +// for i := 0; i <= expectedAmountLogLines; i++ { +// logLines = append(logLines, logLine1) +// } +// +// logLinesStr := strings.Join(logLines, "\n") +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogs_EmptyLogLines(t *testing.T) { +// expectedAmountLogLines := 0 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// logLinesStr := "" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// file1PathStr := fmt.Sprintf("%s%s/%s%s", volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) { +// expectedAmountLogLines := 0 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// logLinesStr := "" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) { +// expectedAmountLogLines := 8 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// week4logLines := []string{ +// logLine5, +// logLine6, +// logLine7, +// logLine8} +// week3logLines := []string{ +// logLine1, +// logLine2, +// logLine3a, +// logLine3b, +// logLine4} +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" +// week4logLinesStr := strings.Join(week4logLines, "\n") +// +// formattedWeekFour := fmt.Sprintf("%02d", 4) +// week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week4, err := underlyingFs.Create(week4filepath) +// require.NoError(t, err) +// _, err = week4.WriteString(week4logLinesStr) +// require.NoError(t, err) +// +// formattedWeekThree := fmt.Sprintf("%02d", 3) +// week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week3, err := underlyingFs.Create(week3filepath) +// require.NoError(t, err) +// _, err = week3.WriteString(week3logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +// +//} +// +//func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { +// expectedAmountLogLines := 8 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// week4logLines := []string{ +// logLine3b, +// logLine4, +// logLine5, +// logLine6, +// logLine7, +// logLine8} +// week3logLines := []string{ +// logLine1, +// logLine2, +// logLine3a} +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" +// week4logLinesStr := strings.Join(week4logLines, "\n") + "\n" +// +// formattedWeekFour := fmt.Sprintf("%02d", 4) +// week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week4, err := underlyingFs.Create(week4filepath) +// require.NoError(t, err) +// _, err = week4.WriteString(week4logLinesStr) +// require.NoError(t, err) +// +// formattedWeekThree := fmt.Sprintf("%02d", 3) +// week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week3, err := underlyingFs.Create(week3filepath) +// require.NoError(t, err) +// _, err = week3.WriteString(week3logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) { +// expectedAmountLogLines := 3 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// +// timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} +// timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file, err := underlyingFs.Create(filepath) +// require.NoError(t, err) +// _, err = file.WriteString(timestampedLogLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) +// require.NoError(t, err) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// for _, logLine := range serviceLogLines { +// require.Equal(t, expectedTime, logLine.GetTimestamp()) +// } +// } +//} +// +//func TestStreamUserServiceLogsPerFileReturnsTimestampedLogLines(t *testing.T) { +// t.Skip() +// expectedAmountLogLines := 3 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// +// timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} +// timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// filepath := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file, err := underlyingFs.Create(filepath) +// require.NoError(t, err) +// _, err = file.WriteString(timestampedLogLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) +// require.NoError(t, err) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedTime, serviceLogLines[0].GetTimestamp()) +// } +//} +// +//// ==================================================================================================== +//// +//// Private helper functions +//// +//// ==================================================================================================== +//func executeStreamCallAndGetReceivedServiceLogLines( +// t *testing.T, +// logLinesFilters []logline.LogLineFilter, +// userServiceUuids map[service.ServiceUUID]bool, +// expectedServiceAmountLogLinesByServiceUuid map[service.ServiceUUID]int, +// shouldFollowLogs bool, +// underlyingFs volume_filesystem.VolumeFilesystem, +// streamStrategy stream_logs_strategy.StreamLogsStrategy, +//) (map[service.ServiceUUID][]logline.LogLine, error) { +// ctx := context.Background() +// +// receivedServiceLogsByUuid := map[service.ServiceUUID][]logline.LogLine{} +// +// for serviceUuid := range expectedServiceAmountLogLinesByServiceUuid { +// receivedServiceLogsByUuid[serviceUuid] = []logline.LogLine{} +// } +// +// kurtosisBackend := backend_interface.NewMockKurtosisBackend(t) +// +// logsDatabaseClient := NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, underlyingFs, streamStrategy) +// +// userServiceLogsByUuidChan, errChan, receivedCancelCtxFunc, err := logsDatabaseClient.StreamUserServiceLogs(ctx, enclaveUuid, userServiceUuids, logLinesFilters, shouldFollowLogs, defaultShouldReturnAllLogs, defaultNumLogLines) +// if err != nil { +// return nil, stacktrace.Propagate(err, "An error occurred getting user service logs for UUIDs '%+v' using log line filters '%v' in enclave '%v'", userServiceUuids, logLinesFilters, enclaveUuid) +// } +// defer func() { +// if receivedCancelCtxFunc != nil { +// receivedCancelCtxFunc() +// } +// }() +// +// require.NotNil(t, userServiceLogsByUuidChan, "Received a nil user service logs channel, but a non-nil value was expected") +// require.NotNil(t, errChan, "Received a nil error logs channel, but a non-nil value was expected") +// +// shouldReceiveStream := true +// for shouldReceiveStream { +// select { +// case <-time.Tick(testTimeOut): +// return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) +// case streamErr, isChanOpen := <-errChan: +// if !isChanOpen { +// shouldReceiveStream = false +// break +// } +// return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") +// case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: +// if !isChanOpen { +// shouldReceiveStream = false +// break +// } +// +// for serviceUuid, serviceLogLines := range userServiceLogsByUuid { +// _, found := userServiceUuids[serviceUuid] +// require.True(t, found) +// +// currentServiceLogLines := receivedServiceLogsByUuid[serviceUuid] +// allServiceLogLines := append(currentServiceLogLines, serviceLogLines...) +// receivedServiceLogsByUuid[serviceUuid] = allServiceLogLines +// } +// +// for serviceUuid, expectedAmountLogLines := range expectedServiceAmountLogLinesByServiceUuid { +// if len(receivedServiceLogsByUuid[serviceUuid]) == expectedAmountLogLines { +// shouldReceiveStream = false +// } else { +// shouldReceiveStream = true +// break +// } +// } +// } +// } +// +// return receivedServiceLogsByUuid, nil +//} +// +//func createFilledPerFileFilesystem() volume_filesystem.VolumeFilesystem { +// logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} +// +// logLinesStr := strings.Join(logLines, "\n") +// +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1, _ := mapFs.Create(file1PathStr) +// _, _ = file1.WriteString(logLinesStr) +// +// file2, _ := mapFs.Create(file2PathStr) +// _, _ = file2.WriteString(logLinesStr) +// +// file3, _ := mapFs.Create(file3PathStr) +// _, _ = file3.WriteString(logLinesStr) +// +// return mapFs +//} +// +//func createFilledPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { +// logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} +// +// logLinesStr := strings.Join(logLines, "\n") +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", week) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1, _ := mapFs.Create(file1PathStr) +// _, _ = file1.WriteString(logLinesStr) +// +// file2, _ := mapFs.Create(file2PathStr) +// _, _ = file2.WriteString(logLinesStr) +// +// file3, _ := mapFs.Create(file3PathStr) +// _, _ = file3.WriteString(logLinesStr) +// +// return mapFs +//} +// +//func createEmptyPerFileFilesystem() volume_filesystem.VolumeFilesystem { +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// _, _ = mapFs.Create(file1PathStr) +// _, _ = mapFs.Create(file2PathStr) +// _, _ = mapFs.Create(file3PathStr) +// +// return mapFs +//} +// +//func createEmptyPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", week) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// _, _ = mapFs.Create(file1PathStr) +// _, _ = mapFs.Create(file2PathStr) +// _, _ = mapFs.Create(file3PathStr) +// +// return mapFs +//} diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index c0d339dd2f..d0798df8fa 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -25,7 +25,8 @@ import ( ) const ( - oneWeek = 7 * 24 * time.Hour + oneWeek = 7 * 24 * time.Hour + batchLogsAmount = 50 ) // PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service @@ -197,6 +198,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( var totalRetentionCheck time.Duration var ltm SendLogLineTimeMeasurements + var logLineBuffer []logline.LogLine for { select { case <-ctx.Done(): @@ -217,19 +219,21 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( totalTimeToGetJsonStrings += time.Now().Sub(getJsonStartTime) if isValidJsonEnding(jsonLogStr) { + var logLine logline.LogLine jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } sendJsonLogLineStartTime := time.Now() - err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + logLine, err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) if err != nil { return err } + logLineBuffer = append(logLineBuffer, logLine) totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) - totalTimeToSendLogsGranular += ltm.sendDuration + //totalTimeToSendLogsGranular += ltm.sendDuration totalTimeProcessLinesInSend += ltm.processDuration totalTimestampParsing += ltm.parseTimestampDuratoin totalFilterCheck += ltm.filterCheckDuration @@ -239,6 +243,16 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( totalLogFileReadDuration += endTime.Sub(startTime) } + if len(logLineBuffer)%batchLogsAmount == 0 { + sendAcrossChannelStartTime := time.Now() + userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + serviceUuid: logLineBuffer, + } + logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + logLineBuffer = []logline.LogLine{} + totalTimeToSendLogsGranular += time.Now().Sub(sendAcrossChannelStartTime) + } + if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { @@ -412,7 +426,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( jsonLog JsonLog, logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, - conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, SendLogLineTimeMeasurements) { + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (logline.LogLine, error, SendLogLineTimeMeasurements) { // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} @@ -426,7 +440,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( // Then extract the actual log message using the vectors log field logMsgStr, found := jsonLog[volume_consts.LogLabel] if !found { - return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{ + return logline.LogLine{}, stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration, @@ -439,7 +453,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( // Extract the timestamp using vectors timestamp field logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) if err != nil { - return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{ + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration, @@ -454,7 +468,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( // Then filter by checking if the log message is valid based on requested filters validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) if err != nil { - return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{ + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration, @@ -463,7 +477,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( } } if !validLogLine { - return nil, SendLogLineTimeMeasurements{ + return logline.LogLine{}, nil, SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration, @@ -477,10 +491,10 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( // ensure this log line is within the retention period if it has a timestamp withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) if err != nil { - return stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{} + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{} } if !withinRetentionPeriod { - return nil, SendLogLineTimeMeasurements{ + return logline.LogLine{}, nil, SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration, @@ -491,16 +505,16 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( retentionPeriodCheckDuration += time.Now().Sub(retentionCheckStart) // send the log line - logLines := []logline.LogLine{*logLine} - userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ - serviceUuid: logLines, - } + //logLines := []logline.LogLine{*logLine} + //userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + // serviceUuid: logLines, + //} processDuration += time.Now().Sub(processStart) sendStart := time.Now() - logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + //logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap sendDuration += time.Now().Sub(sendStart) - return nil, SendLogLineTimeMeasurements{ + return *logLine, nil, SendLogLineTimeMeasurements{ processDuration: processDuration, sendDuration: sendDuration, parseTimestampDuratoin: parseTimestampDuration,