Skip to content

Commit

Permalink
add more granular measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
tedim52 committed Jul 31, 2024
1 parent ee510cd commit f881126
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 7 deletions.
1 change: 0 additions & 1 deletion cli/cli/commands/service/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ func run(
logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration)
return nil
}
logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
}()

if shouldReturnAllLogs {
startTime := time.Now()
if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
logrus.Infof("TOTAL TIME IN STREAM ALL LOGS FUNCTION: %v", time.Now().Sub(startTime))
} else {
if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid)
Expand Down Expand Up @@ -184,34 +186,49 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
var totalLogFileReadDuration time.Duration
var totalTimeToGetJsonStrings time.Duration
var totalTimeToSendJsonLogs time.Duration
var totalTimeProcessLinesInSend time.Duration
var totalTimeSendingAcrossChannelInSend time.Duration
for {
select {
case <-ctx.Done():
logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
return nil
default:
startTime := time.Now()

getJsonStartTime := time.Now()
jsonLogStr, err := getCompleteJsonLogString(logsReader)
totalTimeToGetJsonStrings += time.Now().Sub(getJsonStartTime)

if isValidJsonEnding(jsonLogStr) {
startTime := time.Now()
jsonLog, err := convertStringToJson(jsonLogStr)
if err != nil {
return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr)
}
if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {

sendJsonLogLineStartTime := time.Now()
err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return err
}
totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime)
totalTimeProcessLinesInSend += sendDuration
totalTimeProcessLinesInSend += processDuration

endTime := time.Now()
totalLogFileReadDuration += endTime.Sub(startTime)
}

if err != nil {
// if we've reached end of logs, return success, otherwise return the error
if errors.Is(err, io.EOF) {
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
return nil
} else {
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalLogFileReadDuration)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
return err
}
}
Expand Down Expand Up @@ -354,6 +371,63 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
return nil
}

func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes(
jsonLog JsonLog,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}

var processDuration time.Duration
var sendDuration time.Duration

processStart := time.Now()
// Then extract the actual log message using the vectors log field
logMsgStr, found := jsonLog[volume_consts.LogLabel]
if !found {
return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration
}

// Extract the timestamp using vectors timestamp field
logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog)
if err != nil {
return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration
}
logLine := logline.NewLogLine(logMsgStr, *logTimestamp)

// Then filter by checking if the log message is valid based on requested filters
validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration
}
if !validLogLine {
return nil, sendDuration, processDuration
}

// ensure this log line is within the retention period if it has a timestamp
withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration
}
if !withinRetentionPeriod {
return nil, sendDuration, processDuration
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
processDuration = time.Now().Sub(processStart)

sendStart := time.Now()
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
sendDuration = time.Now().Sub(sendStart)
return nil, sendDuration, processDuration
}

// Returns true if [logLine] has no timestamp
func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) {
retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek)
Expand Down Expand Up @@ -434,3 +508,11 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) {
}
return &timestamp, nil
}

func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) {
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction)
}
11 changes: 10 additions & 1 deletion engine/server/engine/server/engine_connect_server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
}()

var totalLogStreamDuration time.Duration
var counter int
for {
select {
//stream case
Expand All @@ -358,12 +359,20 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration)
return nil
}
startTime := time.Now()

// print out num log lines every 100 lines times
//for serviceUUID, logs := range serviceLogsByServiceUuid {
// if counter%100 == 0 {
// logrus.Infof("NUM LOG LINES FOR SERVICE '%v' CHECK IN ENGINE CONNECT SERVICE: %v", serviceUUID, len(logs))
// }
//}

startTime := time.Now()
getServiceLogsResponse := newLogsResponse(requestedServiceUuids, serviceLogsByServiceUuid, notFoundServiceUuids)
if err := stream.Send(getServiceLogsResponse); err != nil {
return stacktrace.Propagate(err, "An error occurred sending the stream logs for service logs response '%+v'", getServiceLogsResponse)
}
counter += 1
endTime := time.Now()
totalLogStreamDuration += endTime.Sub(startTime)
//client cancel ctx case
Expand Down

0 comments on commit f881126

Please sign in to comment.