diff --git a/cli/cli/commands/service/logs/logs.go b/cli/cli/commands/service/logs/logs.go index 1c7e52a0cf..028039224c 100644 --- a/cli/cli/commands/service/logs/logs.go +++ b/cli/cli/commands/service/logs/logs.go @@ -298,7 +298,6 @@ func run( logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } - logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) } } diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index f1117e80d9..ba7ac5e388 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -89,10 +89,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( }() if shouldReturnAllLogs { + startTime := time.Now() if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) return } + logrus.Infof("TOTAL TIME IN STREAM ALL LOGS FUNCTION: %v", time.Now().Sub(startTime)) } else { if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid) @@ -184,23 +186,38 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { var totalLogFileReadDuration time.Duration + var totalTimeToGetJsonStrings time.Duration + var totalTimeToSendJsonLogs time.Duration + var totalTimeProcessLinesInSend time.Duration + var totalTimeSendingAcrossChannelInSend time.Duration for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return nil default: + startTime := time.Now() + + getJsonStartTime := time.Now() jsonLogStr, err := getCompleteJsonLogString(logsReader) + totalTimeToGetJsonStrings += time.Now().Sub(getJsonStartTime) + if isValidJsonEnding(jsonLogStr) { - startTime := time.Now() jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } - if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + + sendJsonLogLineStartTime := time.Now() + err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + if err != nil { return err } + totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) + totalTimeProcessLinesInSend += sendDuration + totalTimeProcessLinesInSend += processDuration + endTime := time.Now() totalLogFileReadDuration += endTime.Sub(startTime) } @@ -208,10 +225,10 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return nil } else { - logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend) return err } } @@ -354,6 +371,63 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } +func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( + jsonLog JsonLog, + logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + serviceUuid service.ServiceUUID, + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) { + // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} + // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", + // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} + + var processDuration time.Duration + var sendDuration time.Duration + + processStart := time.Now() + // Then extract the actual log message using the vectors log field + logMsgStr, found := jsonLog[volume_consts.LogLabel] + if !found { + return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration + } + + // Extract the timestamp using vectors timestamp field + logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) + if err != nil { + return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration + } + logLine := logline.NewLogLine(logMsgStr, *logTimestamp) + + // Then filter by checking if the log message is valid based on requested filters + validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + } + if !validLogLine { + return nil, sendDuration, processDuration + } + + // ensure this log line is within the retention period if it has a timestamp + withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration + } + if !withinRetentionPeriod { + return nil, sendDuration, processDuration + } + + // send the log line + logLines := []logline.LogLine{*logLine} + userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + serviceUuid: logLines, + } + processDuration = time.Now().Sub(processStart) + + sendStart := time.Now() + logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + sendDuration = time.Now().Sub(sendStart) + return nil, sendDuration, processDuration +} + // Returns true if [logLine] has no timestamp func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) { retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek) @@ -434,3 +508,11 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) { } return ×tamp, nil } + +func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) { + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction) +} diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 1921e0725d..a3f3c84225 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -348,6 +348,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c }() var totalLogStreamDuration time.Duration + var counter int for { select { //stream case @@ -358,12 +359,20 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } - startTime := time.Now() + // print out num log lines every 100 lines times + //for serviceUUID, logs := range serviceLogsByServiceUuid { + // if counter%100 == 0 { + // logrus.Infof("NUM LOG LINES FOR SERVICE '%v' CHECK IN ENGINE CONNECT SERVICE: %v", serviceUUID, len(logs)) + // } + //} + + startTime := time.Now() getServiceLogsResponse := newLogsResponse(requestedServiceUuids, serviceLogsByServiceUuid, notFoundServiceUuids) if err := stream.Send(getServiceLogsResponse); err != nil { return stacktrace.Propagate(err, "An error occurred sending the stream logs for service logs response '%+v'", getServiceLogsResponse) } + counter += 1 endTime := time.Now() totalLogStreamDuration += endTime.Sub(startTime) //client cancel ctx case