Skip to content

Commit

Permalink
use buffered log channel
Browse files Browse the repository at this point in the history
  • Loading branch information
tedim52 committed Aug 1, 2024
1 parent f881126 commit 487e5d1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

const (
oneSenderAdded = 1
logLineBufferSize = 100
oneSenderAdded = 1
)

// persistentVolumeLogsDatabaseClient pulls logs from a Docker volume the engine is mounted to
Expand Down Expand Up @@ -63,7 +64,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(
streamErrChan := make(chan error)

// this channel will return the user service log lines by service UUID
logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine)
logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) // MAKE IT A BUFFERED CHANNEL SEE HOW THAT IMPROVES THINGS

wgSenders := &sync.WaitGroup{}
for serviceUuid := range userServiceUuids {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,29 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {

var totalLogFileReadDuration time.Duration
var totalTimeToGetJsonStrings time.Duration
var totalTimeToSendJsonLogs time.Duration

var totalTimeToSendLogsGranular time.Duration
var totalTimeProcessLinesInSend time.Duration
var totalTimeSendingAcrossChannelInSend time.Duration
var totalTimestampParsing time.Duration
var totalFilterCheck time.Duration
var totalRetentionCheck time.Duration

var ltm SendLogLineTimeMeasurements
for {
select {
case <-ctx.Done():
logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{
processDuration: totalTimeProcessLinesInSend,
sendDuration: totalTimeToSendLogsGranular,
parseTimestampDuratoin: totalTimestampParsing,
filterCheckDuration: totalFilterCheck,
retentionPeriodCheckDuration: totalRetentionCheck,
})
return nil
default:
startTime := time.Now()
Expand All @@ -210,13 +223,17 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
}

sendJsonLogLineStartTime := time.Now()
err, sendDuration, processDuration := strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return err
}
totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime)

Check failure on line 230 in engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go

View workflow job for this annotation

GitHub Actions / golang-lint (engine/server)

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
totalTimeProcessLinesInSend += sendDuration
totalTimeProcessLinesInSend += processDuration

totalTimeToSendLogsGranular += ltm.sendDuration
totalTimeProcessLinesInSend += ltm.processDuration
totalTimestampParsing += ltm.parseTimestampDuratoin
totalFilterCheck += ltm.filterCheckDuration
totalRetentionCheck += ltm.retentionPeriodCheckDuration

endTime := time.Now()
totalLogFileReadDuration += endTime.Sub(startTime)
Expand All @@ -225,10 +242,22 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
if err != nil {
// if we've reached end of logs, return success, otherwise return the error
if errors.Is(err, io.EOF) {
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{
processDuration: totalTimeProcessLinesInSend,
sendDuration: totalTimeToSendLogsGranular,
parseTimestampDuratoin: totalTimestampParsing,
filterCheckDuration: totalFilterCheck,
retentionPeriodCheckDuration: totalRetentionCheck,
})
return nil
} else {
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, totalTimeSendingAcrossChannelInSend, totalTimeProcessLinesInSend)
logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{
processDuration: totalTimeProcessLinesInSend,
sendDuration: totalTimeToSendLogsGranular,
parseTimestampDuratoin: totalTimestampParsing,
filterCheckDuration: totalFilterCheck,
retentionPeriodCheckDuration: totalRetentionCheck,
})
return err
}
}
Expand Down Expand Up @@ -371,61 +400,113 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
return nil
}

type SendLogLineTimeMeasurements struct {
processDuration time.Duration
sendDuration time.Duration
parseTimestampDuratoin time.Duration
filterCheckDuration time.Duration
retentionPeriodCheckDuration time.Duration
}

func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes(
jsonLog JsonLog,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, time.Duration, time.Duration) {
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (error, SendLogLineTimeMeasurements) {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}

var processDuration time.Duration
var sendDuration time.Duration
var parseTimestampDuration time.Duration
var filterCheckDuration time.Duration
var retentionPeriodCheckDuration time.Duration

processStart := time.Now()
// Then extract the actual log message using the vectors log field
logMsgStr, found := jsonLog[volume_consts.LogLabel]
if !found {
return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), sendDuration, processDuration
return stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}

timestampStart := time.Now()
// Extract the timestamp using vectors timestamp field
logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog)
if err != nil {
return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), sendDuration, processDuration
return stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}
logLine := logline.NewLogLine(logMsgStr, *logTimestamp)
parseTimestampDuration += time.Now().Sub(timestampStart)

filterStart := time.Now()
// Then filter by checking if the log message is valid based on requested filters
validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}
if !validLogLine {
return nil, sendDuration, processDuration
return nil, SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}
filterCheckDuration += time.Now().Sub(filterStart)

retentionCheckStart := time.Now()
// ensure this log line is within the retention period if it has a timestamp
withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), sendDuration, processDuration
return stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{}
}
if !withinRetentionPeriod {
return nil, sendDuration, processDuration
return nil, SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}
retentionPeriodCheckDuration += time.Now().Sub(retentionCheckStart)

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
processDuration = time.Now().Sub(processStart)
processDuration += time.Now().Sub(processStart)

sendStart := time.Now()
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
sendDuration = time.Now().Sub(sendStart)
return nil, sendDuration, processDuration
sendDuration += time.Now().Sub(sendStart)
return nil, SendLogLineTimeMeasurements{
processDuration: processDuration,
sendDuration: sendDuration,
parseTimestampDuratoin: parseTimestampDuration,
filterCheckDuration: filterCheckDuration,
retentionPeriodCheckDuration: retentionPeriodCheckDuration,
}
}

// Returns true if [logLine] has no timestamp
Expand Down Expand Up @@ -509,10 +590,13 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) {
return &timestamp, nil
}

func logTimes(totalDuration, getLineDuration, totalSendLineDuration, actualSendLineDuration, processLineDuraction time.Duration) {
func logTimes(totalDuration, getLineDuration, totalSendLineDuration time.Duration, sendLogLineTM SendLogLineTimeMeasurements) {
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", actualSendLineDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", processLineDuraction)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", sendLogLineTM.sendDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", sendLogLineTM.processDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PARSE TIMESTAMPS: %v", sendLogLineTM.parseTimestampDuratoin)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO FILTER LINES BASED ON REGEXES: %v", sendLogLineTM.filterCheckDuration)
logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO CHECK RETENTION PERIOD: %v", sendLogLineTM.retentionPeriodCheckDuration)
}

0 comments on commit 487e5d1

Please sign in to comment.