diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index 46c00f00d2..c8ef39d57d 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -13,7 +13,8 @@ import ( ) const ( - oneSenderAdded = 1 + logLineBufferSize = 100 + oneSenderAdded = 1 ) // persistentVolumeLogsDatabaseClient pulls logs from a Docker volume the engine is mounted to @@ -63,7 +64,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( streamErrChan := make(chan error) // this channel will return the user service log lines by service UUID - logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine) + logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) // MAKE IT A BUFFERED CHANNEL SEE HOW THAT IMPROVES THINGS wgSenders := &sync.WaitGroup{} for serviceUuid := range userServiceUuids { diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index ba7ac5e388..c0d339dd2f 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -185,16 +185,29 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { + var totalLogFileReadDuration time.Duration var totalTimeToGetJsonStrings time.Duration var totalTimeToSendJsonLogs time.Duration + + var totalTimeToSendLogsGranular time.Duration var totalTimeProcessLinesInSend time.Duration - var totalTimeSendingAcrossChannelInSend time.Duration + var totalTimestampParsing time.Duration + var totalFilterCheck time.Duration + var totalRetentionCheck time.Duration + + var ltm SendLogLineTimeMeasurements for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil default: startTime := time.Now() @@ -210,13 +223,17 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( } sendJsonLogLineStartTime := time.Now() - err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) if err != nil { return err } totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) - totalTimeProcessLinesInSend += sendDuration - totalTimeProcessLinesInSend += processDuration + + totalTimeToSendLogsGranular += ltm.sendDuration + totalTimeProcessLinesInSend += ltm.processDuration + totalTimestampParsing += ltm.parseTimestampDuratoin + totalFilterCheck += ltm.filterCheckDuration + totalRetentionCheck += ltm.retentionPeriodCheckDuration endTime := time.Now() totalLogFileReadDuration += endTime.Sub(startTime) @@ -225,10 +242,22 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil } else { - logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return err } } @@ -371,61 +400,113 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } +type SendLogLineTimeMeasurements struct { + processDuration time.Duration + sendDuration time.Duration + parseTimestampDuratoin time.Duration + filterCheckDuration time.Duration + retentionPeriodCheckDuration time.Duration +} + func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( jsonLog JsonLog, logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, - conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) { + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, SendLogLineTimeMeasurements) { // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} - var processDuration time.Duration var sendDuration time.Duration + var parseTimestampDuration time.Duration + var filterCheckDuration time.Duration + var retentionPeriodCheckDuration time.Duration processStart := time.Now() // Then extract the actual log message using the vectors log field logMsgStr, found := jsonLog[volume_consts.LogLabel] if !found { - return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration + return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + timestampStart := time.Now() // Extract the timestamp using vectors timestamp field logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) if err != nil { - return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } logLine := logline.NewLogLine(logMsgStr, *logTimestamp) + parseTimestampDuration += time.Now().Sub(timestampStart) + filterStart := time.Now() // Then filter by checking if the log message is valid based on requested filters validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) if err != nil { - return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } if !validLogLine { - return nil, sendDuration, processDuration + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + filterCheckDuration += time.Now().Sub(filterStart) + retentionCheckStart := time.Now() // ensure this log line is within the retention period if it has a timestamp withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) if err != nil { - return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + return stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{} } if !withinRetentionPeriod { - return nil, sendDuration, processDuration + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } + retentionPeriodCheckDuration += time.Now().Sub(retentionCheckStart) // send the log line logLines := []logline.LogLine{*logLine} userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ serviceUuid: logLines, } - processDuration = time.Now().Sub(processStart) + processDuration += time.Now().Sub(processStart) sendStart := time.Now() logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap - sendDuration = time.Now().Sub(sendStart) - return nil, sendDuration, processDuration + sendDuration += time.Now().Sub(sendStart) + return nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } } // Returns true if [logLine] has no timestamp @@ -509,10 +590,13 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) { return ×tamp, nil } -func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) { +func logTimes(totalDuration, getLineDuration, totalSendLineDuration time.Duration, sendLogLineTM SendLogLineTimeMeasurements) { logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration) logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration) logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", sendLogLineTM.sendDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", sendLogLineTM.processDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PARSE TIMESTAMPS: %v", sendLogLineTM.parseTimestampDuratoin) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO FILTER LINES BASED ON REGEXES: %v", sendLogLineTM.filterCheckDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO CHECK RETENTION PERIOD: %v", sendLogLineTM.retentionPeriodCheckDuration) }