Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make kurtosis service logs faster #2525

Merged
merged 25 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/cli/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ fi
fi
# Executing goreleaser v1.26.2 without needing to install it
if ! curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 DISTRIBUTION=oss bash -s -- ${goreleaser_verb_and_flags}; then
echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2
exit 1
fi
echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2
exit 1
fi
)

# Final verification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(
streamErrChan := make(chan error)

// this channel will return the user service log lines by service UUID
logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine)
logLineSender := logline.NewLogLineSender()
logsByKurtosisUserServiceUuidChan := logLineSender.GetLogsChannel()

wgSenders := &sync.WaitGroup{}
for serviceUuid := range userServiceUuids {
wgSenders.Add(oneSenderAdded)
go client.streamServiceLogLines(
ctx,
wgSenders,
logsByKurtosisUserServiceUuidChan,
logLineSender,
streamErrChan,
enclaveUuid,
serviceUuid,
Expand All @@ -87,7 +88,11 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(
//wait for stream go routine to end
wgSenders.Wait()

close(logsByKurtosisUserServiceUuidChan)
// send all buffered log lines
logLineSender.Flush()

// wait until the channel has been fully read/empty before closing it
closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan)
close(streamErrChan)

//then cancel the context
Expand Down Expand Up @@ -130,7 +135,7 @@ func (client *persistentVolumeLogsDatabaseClient) FilterExistingServiceUuids(
func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
ctx context.Context,
wgSenders *sync.WaitGroup,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand All @@ -143,7 +148,7 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
client.streamStrategy.StreamLogs(
ctx,
client.filesystem,
logsByKurtosisUserServiceUuidChan,
logLineSender,
streamErrChan,
enclaveUuid,
serviceUuid,
Expand All @@ -152,3 +157,12 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
shouldReturnAllLogs,
numLogLines)
}

func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) {
for {
if len(logsChan) == 0 {
close(logsChan)
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type JsonLog map[string]string
func (strategy *PerFileStreamLogsStrategy) StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down Expand Up @@ -122,12 +122,7 @@ func (strategy *PerFileStreamLogsStrategy) StreamLogs(
break
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
logLineSender.SendLogLine(serviceUuid, *logLine)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodI
func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down Expand Up @@ -89,20 +89,20 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
}()

if shouldReturnAllLogs {
if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.streamAllLogs(ctx, logsReader, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
} else {
if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid)
return
}
}

if shouldFollowLogs {
latestLogFile := paths[len(paths)-1]
if err := strategy.followLogs(ctx, latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.followLogs(ctx, latestLogFile, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred creating following logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths [
func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
ctx context.Context,
logsReader *bufio.Reader,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
for {
Expand All @@ -190,12 +190,14 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
return nil
default:
jsonLogStr, err := getCompleteJsonLogString(logsReader)

if isValidJsonEnding(jsonLogStr) {
jsonLog, err := convertStringToJson(jsonLogStr)
if err != nil {
return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr)
}
if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {

if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return err
}
}
Expand All @@ -217,7 +219,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs(
ctx context.Context,
logsReader *bufio.Reader,
numLogLines uint32,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
tailLogLines := make([]string, 0, numLogLines)
Expand Down Expand Up @@ -255,7 +257,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs(
if err != nil {
return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr)
}
if err := strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return err
}
}
Expand Down Expand Up @@ -298,11 +300,7 @@ func isValidJsonEnding(line string) bool {
return endOfLine == volume_consts.EndOfJsonLine
}

func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
jsonLog JsonLog,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}
Expand Down Expand Up @@ -338,12 +336,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
return nil
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
logLineSender.SendLogLine(serviceUuid, *logLine)
return nil
}

Expand All @@ -358,7 +351,7 @@ func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logl
func (strategy *PerWeekStreamLogsStrategy) followLogs(
ctx context.Context,
filepath string,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex,
) error {
Expand Down Expand Up @@ -399,8 +392,7 @@ func (strategy *PerWeekStreamLogsStrategy) followLogs(
// if tail package fails to parse a valid new line, fail fast
return stacktrace.NewError("hpcloud/tail returned the following line: '%v' that was not valid json.\nThis is potentially a bug in tailing package.", logLine.Text)
}
err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type StreamLogsStrategy interface {
StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down
62 changes: 62 additions & 0 deletions engine/server/engine/centralized_logs/logline/logline_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package logline

import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"sync"
)

const (
batchLogsAmount = 500
logsChanBufferSize = 300
tedim52 marked this conversation as resolved.
Show resolved Hide resolved
)

type LogLineSender struct {
logsChan chan map[service.ServiceUUID][]LogLine

logLineBuffer map[service.ServiceUUID][]LogLine

sync.Mutex
}

func NewLogLineSender() *LogLineSender {
return &LogLineSender{

Check failure on line 22 in engine/server/engine/centralized_logs/logline/logline_sender.go

View workflow job for this annotation

GitHub Actions / golang-lint (engine/server)

Mutex is missing in LogLineSender (exhaustruct)
logsChan: make(chan map[service.ServiceUUID][]LogLine, logsChanBufferSize),
logLineBuffer: map[service.ServiceUUID][]LogLine{},
}
}

func (sender *LogLineSender) SendLogLine(serviceUuid service.ServiceUUID, logLine LogLine) {
sender.Mutex.Lock()
defer sender.Mutex.Unlock()

sender.logLineBuffer[serviceUuid] = append(sender.logLineBuffer[serviceUuid], logLine)

if len(sender.logLineBuffer[serviceUuid])%batchLogsAmount == 0 {
userServicesLogLinesMap := map[service.ServiceUUID][]LogLine{
serviceUuid: sender.logLineBuffer[serviceUuid],
}
sender.logsChan <- userServicesLogLinesMap

// clear buffer after flushing it through the channel
sender.logLineBuffer[serviceUuid] = []LogLine{}
}
}

func (sender *LogLineSender) GetLogsChannel() chan map[service.ServiceUUID][]LogLine {
return sender.logsChan
}

// sends all logs remaining in the buffers through the channel
// this should be called at the end of processing to send the remainder of logs
func (sender *LogLineSender) Flush() {
sender.Mutex.Lock()
defer sender.Mutex.Unlock()

for uuid, logLines := range sender.logLineBuffer {
serviceUuid := uuid
userServiceLogLinesMap := map[service.ServiceUUID][]LogLine{
serviceUuid: logLines,
}
sender.logsChan <- userServiceLogLinesMap
}
}
2 changes: 1 addition & 1 deletion engine/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/kurtosis-tech/kurtosis/grpc-file-transfer/golang v0.0.0-20230803130419-099ee7a4e3dc
github.com/kurtosis-tech/kurtosis/metrics-library/golang v0.0.0-20231206095907-9bdf0d02cb90
github.com/labstack/echo/v4 v4.11.3
github.com/rs/cors v1.9.0
github.com/rs/cors v1.11.0
github.com/spf13/afero v1.10.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/apimachinery v0.27.2
Expand Down
4 changes: 2 additions & 2 deletions engine/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE=
github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA=
github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
Loading