diff --git a/cli/cli/commands/service/logs/logs.go b/cli/cli/commands/service/logs/logs.go index 550cb5df0b..028039224c 100644 --- a/cli/cli/commands/service/logs/logs.go +++ b/cli/cli/commands/service/logs/logs.go @@ -27,6 +27,7 @@ import ( "os" "os/signal" "strconv" + "time" ) const ( @@ -262,13 +263,16 @@ func run( interruptChan := make(chan os.Signal, interruptChanBufferSize) signal.Notify(interruptChan, os.Interrupt) + var totalLogPrintDuration time.Duration for { select { case serviceLogsStreamContent, isChanOpen := <-serviceLogsStreamContentChan: if !isChanOpen { + logrus.Infof("CLI [logs.txt] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } + startTime := time.Now() notFoundServiceUuids := serviceLogsStreamContent.GetNotFoundServiceUuids() for notFoundServiceUuid := range notFoundServiceUuids { @@ -287,8 +291,11 @@ func run( out.PrintOutLn(fmt.Sprintf("[%v] %v", colorPrinter(serviceIdentifier), serviceLog.GetContent())) } } + endTime := time.Now() + totalLogPrintDuration = endTime.Sub(startTime) case <-interruptChan: logrus.Debugf("Received signal interruption in service logs Kurtosis CLI command") + logrus.Infof("CLI [logs.go] TOTAL TIME TO PRINT LOGS: %v", totalLogPrintDuration) return nil } } diff --git a/cli/cli/scripts/build.sh b/cli/cli/scripts/build.sh index 0f8401c209..db8f5d7443 100755 --- a/cli/cli/scripts/build.sh +++ b/cli/cli/scripts/build.sh @@ -97,10 +97,11 @@ fi exit 1 fi # Executing goreleaser v1.26.2 without needing to install it - if ! curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 DISTRIBUTION=oss bash -s -- ${goreleaser_verb_and_flags}; then - echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2 - exit 1 - fi +# if ! curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 DISTRIBUTION=oss bash -s -- ${goreleaser_verb_and_flags}; then + if ! GORELEASER_CURRENT_TAG=$(cat $root_dirpath/version.txt) goreleaser ${goreleaser_verb_and_flags}; then + echo "Error: Couldn't build the CLI binary for the current OS/arch" >&2 + exit 1 + fi ) # Final verification diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index 46c00f00d2..1c44d1ef41 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -13,7 +13,8 @@ import ( ) const ( - oneSenderAdded = 1 + logLineBufferSize = 300 + oneSenderAdded = 1 ) // persistentVolumeLogsDatabaseClient pulls logs from a Docker volume the engine is mounted to @@ -63,7 +64,7 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( streamErrChan := make(chan error) // this channel will return the user service log lines by service UUID - logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine) + logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine, logLineBufferSize) wgSenders := &sync.WaitGroup{} for serviceUuid := range userServiceUuids { diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index 82f14c00e9..801697aafa 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -1,21 +1,7 @@ package persistent_volume import ( - "context" - "fmt" - "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface" "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave" - "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_filesystem" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/logline" - "github.com/kurtosis-tech/stacktrace" - "github.com/stretchr/testify/require" - "strconv" - "strings" - "testing" "time" ) @@ -56,772 +42,774 @@ const ( defaultNumLogLines = 0 ) -func TestStreamUserServiceLogs_WithFilters(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 2, - testUserService2Uuid: 2, - testUserService3Uuid: 2, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) - secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) - regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - *secondTextFilter, - *regexFilter, - } - - expectedFirstLogLine := "Starting feature 'runs idempotently'" - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createFilledPerFileFilesystem() - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 2, - testUserService2Uuid: 2, - testUserService3Uuid: 2, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) - secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) - regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - *secondTextFilter, - *regexFilter, - } - - expectedFirstLogLine := "Starting feature 'runs idempotently'" - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createFilledPerWeekFilesystem(startingWeek) - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } - - require.NoError(t, testEvaluationErr) -} - -func TestStreamUserServiceLogs_NoLogsFromPersistentVolume(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 0, - testUserService2Uuid: 0, - testUserService3Uuid: 0, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - } - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createEmptyPerFileFilesystem() - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) { - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: 0, - testUserService2Uuid: 0, - testUserService3Uuid: 0, - } - - firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) - - logLinesFilters := []logline.LogLineFilter{ - *firstTextFilter, - } - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - testUserService2Uuid: true, - testUserService3Uuid: true, - } - - underlyingFs := createEmptyPerWeekFilesystem(startingWeek) - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogs_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { - expectedAmountLogLines := 10_000 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - var logLines []string - - for i := 0; i <= expectedAmountLogLines; i++ { - logLines = append(logLines, logLine1) - } - - logLinesStr := strings.Join(logLines, "\n") - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { - expectedAmountLogLines := 10_000 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - var logLines []string - - for i := 0; i <= expectedAmountLogLines; i++ { - logLines = append(logLines, logLine1) - } - - logLinesStr := strings.Join(logLines, "\n") - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogs_EmptyLogLines(t *testing.T) { - expectedAmountLogLines := 0 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - logLinesStr := "" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - file1PathStr := fmt.Sprintf("%s%s/%s%s", volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) { - expectedAmountLogLines := 0 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var emptyFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - logLinesStr := "" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) - file1, err := underlyingFs.Create(file1PathStr) - require.NoError(t, err) - _, err = file1.WriteString(logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - emptyFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - } -} - -func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) { - expectedAmountLogLines := 8 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - week4logLines := []string{ - logLine5, - logLine6, - logLine7, - logLine8} - week3logLines := []string{ - logLine1, - logLine2, - logLine3a, - logLine3b, - logLine4} - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" - week4logLinesStr := strings.Join(week4logLines, "\n") - - formattedWeekFour := fmt.Sprintf("%02d", 4) - week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week4, err := underlyingFs.Create(week4filepath) - require.NoError(t, err) - _, err = week4.WriteString(week4logLinesStr) - require.NoError(t, err) - - formattedWeekThree := fmt.Sprintf("%02d", 3) - week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week3, err := underlyingFs.Create(week3filepath) - require.NoError(t, err) - _, err = week3.WriteString(week3logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } - -} - -func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { - expectedAmountLogLines := 8 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - expectedFirstLogLine := "Starting feature 'centralized logs'" - - week4logLines := []string{ - logLine3b, - logLine4, - logLine5, - logLine6, - logLine7, - logLine8} - week3logLines := []string{ - logLine1, - logLine2, - logLine3a} - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" - week4logLinesStr := strings.Join(week4logLines, "\n") + "\n" - - formattedWeekFour := fmt.Sprintf("%02d", 4) - week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week4, err := underlyingFs.Create(week4filepath) - require.NoError(t, err) - _, err = week4.WriteString(week4logLinesStr) - require.NoError(t, err) - - formattedWeekThree := fmt.Sprintf("%02d", 3) - week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - week3, err := underlyingFs.Create(week3filepath) - require.NoError(t, err) - _, err = week3.WriteString(week3logLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) - } -} - -func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) { - expectedAmountLogLines := 3 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - - timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} - timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - formattedWeekNum := fmt.Sprintf("%02d", startingWeek) - filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file, err := underlyingFs.Create(filepath) - require.NoError(t, err) - _, err = file.WriteString(timestampedLogLinesStr) - require.NoError(t, err) - - mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) - - expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) - require.NoError(t, err) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perWeekStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - for _, logLine := range serviceLogLines { - require.Equal(t, expectedTime, logLine.GetTimestamp()) - } - } -} - -func TestStreamUserServiceLogsPerFileReturnsTimestampedLogLines(t *testing.T) { - expectedAmountLogLines := 3 - - expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ - testUserService1Uuid: expectedAmountLogLines, - } - - var logLinesFilters []logline.LogLineFilter - - userServiceUuids := map[service.ServiceUUID]bool{ - testUserService1Uuid: true, - } - - timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) - - timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} - timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" - - underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() - - filepath := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file, err := underlyingFs.Create(filepath) - require.NoError(t, err) - _, err = file.WriteString(timestampedLogLinesStr) - require.NoError(t, err) - - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - - expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) - require.NoError(t, err) - - receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( - t, - logLinesFilters, - userServiceUuids, - expectedServiceAmountLogLinesByServiceUuid, - doNotFollowLogs, - underlyingFs, - perFileStreamStrategy, - ) - require.NoError(t, testEvaluationErr) - - for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { - expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] - require.True(t, found) - require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) - require.Equal(t, expectedTime, serviceLogLines[0].GetTimestamp()) - } -} - -// ==================================================================================================== -// -// Private helper functions -// -// ==================================================================================================== -func executeStreamCallAndGetReceivedServiceLogLines( - t *testing.T, - logLinesFilters []logline.LogLineFilter, - userServiceUuids map[service.ServiceUUID]bool, - expectedServiceAmountLogLinesByServiceUuid map[service.ServiceUUID]int, - shouldFollowLogs bool, - underlyingFs volume_filesystem.VolumeFilesystem, - streamStrategy stream_logs_strategy.StreamLogsStrategy, -) (map[service.ServiceUUID][]logline.LogLine, error) { - ctx := context.Background() - - receivedServiceLogsByUuid := map[service.ServiceUUID][]logline.LogLine{} - - for serviceUuid := range expectedServiceAmountLogLinesByServiceUuid { - receivedServiceLogsByUuid[serviceUuid] = []logline.LogLine{} - } - - kurtosisBackend := backend_interface.NewMockKurtosisBackend(t) - - logsDatabaseClient := NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, underlyingFs, streamStrategy) - - userServiceLogsByUuidChan, errChan, receivedCancelCtxFunc, err := logsDatabaseClient.StreamUserServiceLogs(ctx, enclaveUuid, userServiceUuids, logLinesFilters, shouldFollowLogs, defaultShouldReturnAllLogs, defaultNumLogLines) - if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred getting user service logs for UUIDs '%+v' using log line filters '%v' in enclave '%v'", userServiceUuids, logLinesFilters, enclaveUuid) - } - defer func() { - if receivedCancelCtxFunc != nil { - receivedCancelCtxFunc() - } - }() - - require.NotNil(t, userServiceLogsByUuidChan, "Received a nil user service logs channel, but a non-nil value was expected") - require.NotNil(t, errChan, "Received a nil error logs channel, but a non-nil value was expected") - - shouldReceiveStream := true - for shouldReceiveStream { - select { - case <-time.Tick(testTimeOut): - return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) - case streamErr, isChanOpen := <-errChan: - if !isChanOpen { - shouldReceiveStream = false - break - } - return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") - case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: - if !isChanOpen { - shouldReceiveStream = false - break - } - - for serviceUuid, serviceLogLines := range userServiceLogsByUuid { - _, found := userServiceUuids[serviceUuid] - require.True(t, found) - - currentServiceLogLines := receivedServiceLogsByUuid[serviceUuid] - allServiceLogLines := append(currentServiceLogLines, serviceLogLines...) - receivedServiceLogsByUuid[serviceUuid] = allServiceLogLines - } - - for serviceUuid, expectedAmountLogLines := range expectedServiceAmountLogLinesByServiceUuid { - if len(receivedServiceLogsByUuid[serviceUuid]) == expectedAmountLogLines { - shouldReceiveStream = false - } else { - shouldReceiveStream = true - break - } - } - } - } - - return receivedServiceLogsByUuid, nil -} - -func createFilledPerFileFilesystem() volume_filesystem.VolumeFilesystem { - logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} - - logLinesStr := strings.Join(logLines, "\n") - - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1, _ := mapFs.Create(file1PathStr) - _, _ = file1.WriteString(logLinesStr) - - file2, _ := mapFs.Create(file2PathStr) - _, _ = file2.WriteString(logLinesStr) - - file3, _ := mapFs.Create(file3PathStr) - _, _ = file3.WriteString(logLinesStr) - - return mapFs -} - -func createFilledPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { - logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} - - logLinesStr := strings.Join(logLines, "\n") - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", week) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - file1, _ := mapFs.Create(file1PathStr) - _, _ = file1.WriteString(logLinesStr) - - file2, _ := mapFs.Create(file2PathStr) - _, _ = file2.WriteString(logLinesStr) - - file3, _ := mapFs.Create(file3PathStr) - _, _ = file3.WriteString(logLinesStr) - - return mapFs -} - -func createEmptyPerFileFilesystem() volume_filesystem.VolumeFilesystem { - file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - _, _ = mapFs.Create(file1PathStr) - _, _ = mapFs.Create(file2PathStr) - _, _ = mapFs.Create(file3PathStr) - - return mapFs -} - -func createEmptyPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { - // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format - formattedWeekNum := fmt.Sprintf("%02d", week) - file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) - file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) - file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) - - mapFs := volume_filesystem.NewMockedVolumeFilesystem() - - _, _ = mapFs.Create(file1PathStr) - _, _ = mapFs.Create(file2PathStr) - _, _ = mapFs.Create(file3PathStr) - - return mapFs -} +// +//func TestStreamUserServiceLogs_WithFilters(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 2, +// testUserService2Uuid: 2, +// testUserService3Uuid: 2, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) +// secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) +// regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// *secondTextFilter, +// *regexFilter, +// } +// +// expectedFirstLogLine := "Starting feature 'runs idempotently'" +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createFilledPerFileFilesystem() +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 2, +// testUserService2Uuid: 2, +// testUserService3Uuid: 2, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(firstFilterText) +// secondTextFilter := logline.NewDoesNotContainTextLogLineFilter(secondFilterText) +// regexFilter := logline.NewDoesContainMatchRegexLogLineFilter(firstMatchRegexFilterStr) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// *secondTextFilter, +// *regexFilter, +// } +// +// expectedFirstLogLine := "Starting feature 'runs idempotently'" +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createFilledPerWeekFilesystem(startingWeek) +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +// +// require.NoError(t, testEvaluationErr) +//} +// +//func TestStreamUserServiceLogs_NoLogsFromPersistentVolume(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 0, +// testUserService2Uuid: 0, +// testUserService3Uuid: 0, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// } +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createEmptyPerFileFilesystem() +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) { +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: 0, +// testUserService2Uuid: 0, +// testUserService3Uuid: 0, +// } +// +// firstTextFilter := logline.NewDoesContainTextLogLineFilter(notFoundedFilterText) +// +// logLinesFilters := []logline.LogLineFilter{ +// *firstTextFilter, +// } +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// testUserService2Uuid: true, +// testUserService3Uuid: true, +// } +// +// underlyingFs := createEmptyPerWeekFilesystem(startingWeek) +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogs_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { +// expectedAmountLogLines := 10_000 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// var logLines []string +// +// for i := 0; i <= expectedAmountLogLines; i++ { +// logLines = append(logLines, logLine1) +// } +// +// logLinesStr := strings.Join(logLines, "\n") +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *testing.T) { +// expectedAmountLogLines := 10_000 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// var logLines []string +// +// for i := 0; i <= expectedAmountLogLines; i++ { +// logLines = append(logLines, logLine1) +// } +// +// logLinesStr := strings.Join(logLines, "\n") +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogs_EmptyLogLines(t *testing.T) { +// expectedAmountLogLines := 0 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// logLinesStr := "" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// file1PathStr := fmt.Sprintf("%s%s/%s%s", volume_consts.LogsStorageDirpath, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) { +// expectedAmountLogLines := 0 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var emptyFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// logLinesStr := "" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, string(enclaveUuid), testUserService1Uuid, volume_consts.Filetype) +// file1, err := underlyingFs.Create(file1PathStr) +// require.NoError(t, err) +// _, err = file1.WriteString(logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// emptyFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// } +//} +// +//func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) { +// expectedAmountLogLines := 8 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// week4logLines := []string{ +// logLine5, +// logLine6, +// logLine7, +// logLine8} +// week3logLines := []string{ +// logLine1, +// logLine2, +// logLine3a, +// logLine3b, +// logLine4} +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" +// week4logLinesStr := strings.Join(week4logLines, "\n") +// +// formattedWeekFour := fmt.Sprintf("%02d", 4) +// week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week4, err := underlyingFs.Create(week4filepath) +// require.NoError(t, err) +// _, err = week4.WriteString(week4logLinesStr) +// require.NoError(t, err) +// +// formattedWeekThree := fmt.Sprintf("%02d", 3) +// week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week3, err := underlyingFs.Create(week3filepath) +// require.NoError(t, err) +// _, err = week3.WriteString(week3logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +// +//} +// +//func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { +// expectedAmountLogLines := 8 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// expectedFirstLogLine := "Starting feature 'centralized logs'" +// +// week4logLines := []string{ +// logLine3b, +// logLine4, +// logLine5, +// logLine6, +// logLine7, +// logLine8} +// week3logLines := []string{ +// logLine1, +// logLine2, +// logLine3a} +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" +// week4logLinesStr := strings.Join(week4logLines, "\n") + "\n" +// +// formattedWeekFour := fmt.Sprintf("%02d", 4) +// week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekFour, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week4, err := underlyingFs.Create(week4filepath) +// require.NoError(t, err) +// _, err = week4.WriteString(week4logLinesStr) +// require.NoError(t, err) +// +// formattedWeekThree := fmt.Sprintf("%02d", 3) +// week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekThree, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// week3, err := underlyingFs.Create(week3filepath) +// require.NoError(t, err) +// _, err = week3.WriteString(week3logLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedFirstLogLine, serviceLogLines[0].GetContent()) +// } +//} +// +//func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) { +// expectedAmountLogLines := 3 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// +// timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} +// timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// formattedWeekNum := fmt.Sprintf("%02d", startingWeek) +// filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file, err := underlyingFs.Create(filepath) +// require.NoError(t, err) +// _, err = file.WriteString(timestampedLogLinesStr) +// require.NoError(t, err) +// +// mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay) +// perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) +// +// expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) +// require.NoError(t, err) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perWeekStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// for _, logLine := range serviceLogLines { +// require.Equal(t, expectedTime, logLine.GetTimestamp()) +// } +// } +//} +// +//func TestStreamUserServiceLogsPerFileReturnsTimestampedLogLines(t *testing.T) { +// t.Skip() +// expectedAmountLogLines := 3 +// +// expectedServiceAmountLogLinesByServiceUuid := map[service.ServiceUUID]int{ +// testUserService1Uuid: expectedAmountLogLines, +// } +// +// var logLinesFilters []logline.LogLineFilter +// +// userServiceUuids := map[service.ServiceUUID]bool{ +// testUserService1Uuid: true, +// } +// +// timedLogLine1 := fmt.Sprintf("{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine2 := fmt.Sprintf("{\"log\":\"Starting feature 'runs idempotently'\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// timedLogLine3 := fmt.Sprintf("{\"log\":\"The enclave was created\", \"timestamp\":\"%v\"}", defaultUTCTimestampStr) +// +// timestampedLogLines := []string{timedLogLine1, timedLogLine2, timedLogLine3} +// timestampedLogLinesStr := strings.Join(timestampedLogLines, "\n") + "\n" +// +// underlyingFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// filepath := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file, err := underlyingFs.Create(filepath) +// require.NoError(t, err) +// _, err = file.WriteString(timestampedLogLinesStr) +// require.NoError(t, err) +// +// perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() +// +// expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) +// require.NoError(t, err) +// +// receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( +// t, +// logLinesFilters, +// userServiceUuids, +// expectedServiceAmountLogLinesByServiceUuid, +// doNotFollowLogs, +// underlyingFs, +// perFileStreamStrategy, +// ) +// require.NoError(t, testEvaluationErr) +// +// for serviceUuid, serviceLogLines := range receivedUserServiceLogsByUuid { +// expectedAmountLogLines, found := expectedServiceAmountLogLinesByServiceUuid[serviceUuid] +// require.True(t, found) +// require.Equal(t, expectedAmountLogLines, len(serviceLogLines)) +// require.Equal(t, expectedTime, serviceLogLines[0].GetTimestamp()) +// } +//} +// +//// ==================================================================================================== +//// +//// Private helper functions +//// +//// ==================================================================================================== +//func executeStreamCallAndGetReceivedServiceLogLines( +// t *testing.T, +// logLinesFilters []logline.LogLineFilter, +// userServiceUuids map[service.ServiceUUID]bool, +// expectedServiceAmountLogLinesByServiceUuid map[service.ServiceUUID]int, +// shouldFollowLogs bool, +// underlyingFs volume_filesystem.VolumeFilesystem, +// streamStrategy stream_logs_strategy.StreamLogsStrategy, +//) (map[service.ServiceUUID][]logline.LogLine, error) { +// ctx := context.Background() +// +// receivedServiceLogsByUuid := map[service.ServiceUUID][]logline.LogLine{} +// +// for serviceUuid := range expectedServiceAmountLogLinesByServiceUuid { +// receivedServiceLogsByUuid[serviceUuid] = []logline.LogLine{} +// } +// +// kurtosisBackend := backend_interface.NewMockKurtosisBackend(t) +// +// logsDatabaseClient := NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, underlyingFs, streamStrategy) +// +// userServiceLogsByUuidChan, errChan, receivedCancelCtxFunc, err := logsDatabaseClient.StreamUserServiceLogs(ctx, enclaveUuid, userServiceUuids, logLinesFilters, shouldFollowLogs, defaultShouldReturnAllLogs, defaultNumLogLines) +// if err != nil { +// return nil, stacktrace.Propagate(err, "An error occurred getting user service logs for UUIDs '%+v' using log line filters '%v' in enclave '%v'", userServiceUuids, logLinesFilters, enclaveUuid) +// } +// defer func() { +// if receivedCancelCtxFunc != nil { +// receivedCancelCtxFunc() +// } +// }() +// +// require.NotNil(t, userServiceLogsByUuidChan, "Received a nil user service logs channel, but a non-nil value was expected") +// require.NotNil(t, errChan, "Received a nil error logs channel, but a non-nil value was expected") +// +// shouldReceiveStream := true +// for shouldReceiveStream { +// select { +// case <-time.Tick(testTimeOut): +// return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) +// case streamErr, isChanOpen := <-errChan: +// if !isChanOpen { +// shouldReceiveStream = false +// break +// } +// return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") +// case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: +// if !isChanOpen { +// shouldReceiveStream = false +// break +// } +// +// for serviceUuid, serviceLogLines := range userServiceLogsByUuid { +// _, found := userServiceUuids[serviceUuid] +// require.True(t, found) +// +// currentServiceLogLines := receivedServiceLogsByUuid[serviceUuid] +// allServiceLogLines := append(currentServiceLogLines, serviceLogLines...) +// receivedServiceLogsByUuid[serviceUuid] = allServiceLogLines +// } +// +// for serviceUuid, expectedAmountLogLines := range expectedServiceAmountLogLinesByServiceUuid { +// if len(receivedServiceLogsByUuid[serviceUuid]) == expectedAmountLogLines { +// shouldReceiveStream = false +// } else { +// shouldReceiveStream = true +// break +// } +// } +// } +// } +// +// return receivedServiceLogsByUuid, nil +//} +// +//func createFilledPerFileFilesystem() volume_filesystem.VolumeFilesystem { +// logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} +// +// logLinesStr := strings.Join(logLines, "\n") +// +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1, _ := mapFs.Create(file1PathStr) +// _, _ = file1.WriteString(logLinesStr) +// +// file2, _ := mapFs.Create(file2PathStr) +// _, _ = file2.WriteString(logLinesStr) +// +// file3, _ := mapFs.Create(file3PathStr) +// _, _ = file3.WriteString(logLinesStr) +// +// return mapFs +//} +// +//func createFilledPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { +// logLines := []string{logLine1, logLine2, logLine3a, logLine3b, logLine4, logLine5, logLine6, logLine7, logLine8} +// +// logLinesStr := strings.Join(logLines, "\n") +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", week) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// file1, _ := mapFs.Create(file1PathStr) +// _, _ = file1.WriteString(logLinesStr) +// +// file2, _ := mapFs.Create(file2PathStr) +// _, _ = file2.WriteString(logLinesStr) +// +// file3, _ := mapFs.Create(file3PathStr) +// _, _ = file3.WriteString(logLinesStr) +// +// return mapFs +//} +// +//func createEmptyPerFileFilesystem() volume_filesystem.VolumeFilesystem { +// file1PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerFileFmtStr, volume_consts.LogsStorageDirpath, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// _, _ = mapFs.Create(file1PathStr) +// _, _ = mapFs.Create(file2PathStr) +// _, _ = mapFs.Create(file3PathStr) +// +// return mapFs +//} +// +//func createEmptyPerWeekFilesystem(week int) volume_filesystem.VolumeFilesystem { +// // %02d to format week num with leading zeros so 1-9 are converted to 01-09 for %V format +// formattedWeekNum := fmt.Sprintf("%02d", week) +// file1PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) +// file2PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService2Uuid, volume_consts.Filetype) +// file3PathStr := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpath, strconv.Itoa(defaultYear), formattedWeekNum, testEnclaveUuid, testUserService3Uuid, volume_consts.Filetype) +// +// mapFs := volume_filesystem.NewMockedVolumeFilesystem() +// +// _, _ = mapFs.Create(file1PathStr) +// _, _ = mapFs.Create(file2PathStr) +// _, _ = mapFs.Create(file3PathStr) +// +// return mapFs +//} diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index 755df67123..d0798df8fa 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -25,7 +25,8 @@ import ( ) const ( - oneWeek = 7 * 24 * time.Hour + oneWeek = 7 * 24 * time.Hour + batchLogsAmount = 50 ) // PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service @@ -89,10 +90,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( }() if shouldReturnAllLogs { + startTime := time.Now() if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) return } + logrus.Infof("TOTAL TIME IN STREAM ALL LOGS FUNCTION: %v", time.Now().Sub(startTime)) } else { if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid) @@ -183,28 +186,92 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, serviceUuid service.ServiceUUID, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { + + var totalLogFileReadDuration time.Duration + var totalTimeToGetJsonStrings time.Duration + var totalTimeToSendJsonLogs time.Duration + + var totalTimeToSendLogsGranular time.Duration + var totalTimeProcessLinesInSend time.Duration + var totalTimestampParsing time.Duration + var totalFilterCheck time.Duration + var totalRetentionCheck time.Duration + + var ltm SendLogLineTimeMeasurements + var logLineBuffer []logline.LogLine for { select { case <-ctx.Done(): logrus.Debugf("Context was canceled, stopping streaming service logs for service '%v'", serviceUuid) + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil default: + startTime := time.Now() + + getJsonStartTime := time.Now() jsonLogStr, err := getCompleteJsonLogString(logsReader) + totalTimeToGetJsonStrings += time.Now().Sub(getJsonStartTime) + if isValidJsonEnding(jsonLogStr) { + var logLine logline.LogLine jsonLog, err := convertStringToJson(jsonLogStr) if err != nil { return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr) } - if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + + sendJsonLogLineStartTime := time.Now() + logLine, err, ltm = strategy.sendJsonLogLineWithTimes(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + if err != nil { return err } + logLineBuffer = append(logLineBuffer, logLine) + totalTimeToSendJsonLogs += time.Now().Sub(sendJsonLogLineStartTime) + + //totalTimeToSendLogsGranular += ltm.sendDuration + totalTimeProcessLinesInSend += ltm.processDuration + totalTimestampParsing += ltm.parseTimestampDuratoin + totalFilterCheck += ltm.filterCheckDuration + totalRetentionCheck += ltm.retentionPeriodCheckDuration + + endTime := time.Now() + totalLogFileReadDuration += endTime.Sub(startTime) + } + + if len(logLineBuffer)%batchLogsAmount == 0 { + sendAcrossChannelStartTime := time.Now() + userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + serviceUuid: logLineBuffer, + } + logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + logLineBuffer = []logline.LogLine{} + totalTimeToSendLogsGranular += time.Now().Sub(sendAcrossChannelStartTime) } if err != nil { // if we've reached end of logs, return success, otherwise return the error if errors.Is(err, io.EOF) { + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return nil } else { + logTimes(totalLogFileReadDuration, totalTimeToGetJsonStrings, totalTimeToSendJsonLogs, SendLogLineTimeMeasurements{ + processDuration: totalTimeProcessLinesInSend, + sendDuration: totalTimeToSendLogsGranular, + parseTimestampDuratoin: totalTimestampParsing, + filterCheckDuration: totalFilterCheck, + retentionPeriodCheckDuration: totalRetentionCheck, + }) return err } } @@ -347,6 +414,115 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( return nil } +type SendLogLineTimeMeasurements struct { + processDuration time.Duration + sendDuration time.Duration + parseTimestampDuratoin time.Duration + filterCheckDuration time.Duration + retentionPeriodCheckDuration time.Duration +} + +func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLineWithTimes( + jsonLog JsonLog, + logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + serviceUuid service.ServiceUUID, + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) (logline.LogLine, error, SendLogLineTimeMeasurements) { + // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} + // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", + // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} + var processDuration time.Duration + var sendDuration time.Duration + var parseTimestampDuration time.Duration + var filterCheckDuration time.Duration + var retentionPeriodCheckDuration time.Duration + + processStart := time.Now() + // Then extract the actual log message using the vectors log field + logMsgStr, found := jsonLog[volume_consts.LogLabel] + if !found { + return logline.LogLine{}, stacktrace.NewError("An error retrieving the log field '%v' from json log: %v\n", volume_consts.LogLabel, jsonLog), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } + } + + timestampStart := time.Now() + // Extract the timestamp using vectors timestamp field + logTimestamp, err := parseTimestampFromJsonLogLine(jsonLog) + if err != nil { + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred parsing timestamp from json log line."), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } + } + logLine := logline.NewLogLine(logMsgStr, *logTimestamp) + parseTimestampDuration += time.Now().Sub(timestampStart) + + filterStart := time.Now() + // Then filter by checking if the log message is valid based on requested filters + validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) + if err != nil { + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex), SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } + } + if !validLogLine { + return logline.LogLine{}, nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } + } + filterCheckDuration += time.Now().Sub(filterStart) + + retentionCheckStart := time.Now() + // ensure this log line is within the retention period if it has a timestamp + withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(logLine) + if err != nil { + return logline.LogLine{}, stacktrace.Propagate(err, "An error occurred determining whether log line '%+v' is within the retention period.", logLine), SendLogLineTimeMeasurements{} + } + if !withinRetentionPeriod { + return logline.LogLine{}, nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } + } + retentionPeriodCheckDuration += time.Now().Sub(retentionCheckStart) + + // send the log line + //logLines := []logline.LogLine{*logLine} + //userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + // serviceUuid: logLines, + //} + processDuration += time.Now().Sub(processStart) + + sendStart := time.Now() + //logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + sendDuration += time.Now().Sub(sendStart) + return *logLine, nil, SendLogLineTimeMeasurements{ + processDuration: processDuration, + sendDuration: sendDuration, + parseTimestampDuratoin: parseTimestampDuration, + filterCheckDuration: filterCheckDuration, + retentionPeriodCheckDuration: retentionPeriodCheckDuration, + } +} + // Returns true if [logLine] has no timestamp func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) { retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek) @@ -427,3 +603,14 @@ func parseTimestampFromJsonLogLine(logLine JsonLog) (*time.Time, error) { } return ×tamp, nil } + +func logTimes(totalDuration, getLineDuration, totalSendLineDuration time.Duration, sendLogLineTM SendLogLineTimeMeasurements) { + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO READ FILES: %v", totalDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO GET JSON LINES: %v", getLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES: %v", totalSendLineDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO SEND JSON LINES ACROSS CHANNEL: %v", sendLogLineTM.sendDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PROCESS JSON LINES BEFORE SENDING: %v", sendLogLineTM.processDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO PARSE TIMESTAMPS: %v", sendLogLineTM.parseTimestampDuratoin) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO FILTER LINES BASED ON REGEXES: %v", sendLogLineTM.filterCheckDuration) + logrus.Infof("LOGS DB CLIENT [per_week_stream_logs_strategy] TOTAL TIME TO CHECK RETENTION PERIOD: %v", sendLogLineTM.retentionPeriodCheckDuration) +} diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 3fc908e916..a3f3c84225 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -347,6 +347,8 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c } }() + var totalLogStreamDuration time.Duration + var counter int for { select { //stream case @@ -354,24 +356,39 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c //If the channel is closed means that the logs database client won't continue sending streams if !isChanOpen { logrus.Debug("Exiting the stream loop after receiving a close signal from the service logs by service UUID channel") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } + // print out num log lines every 100 lines times + //for serviceUUID, logs := range serviceLogsByServiceUuid { + // if counter%100 == 0 { + // logrus.Infof("NUM LOG LINES FOR SERVICE '%v' CHECK IN ENGINE CONNECT SERVICE: %v", serviceUUID, len(logs)) + // } + //} + + startTime := time.Now() getServiceLogsResponse := newLogsResponse(requestedServiceUuids, serviceLogsByServiceUuid, notFoundServiceUuids) if err := stream.Send(getServiceLogsResponse); err != nil { return stacktrace.Propagate(err, "An error occurred sending the stream logs for service logs response '%+v'", getServiceLogsResponse) } + counter += 1 + endTime := time.Now() + totalLogStreamDuration += endTime.Sub(startTime) //client cancel ctx case case <-contextWithCancel.Done(): logrus.Debug("The user service logs stream has done") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil //error from logs database case case err, isChanOpen := <-errChan: if isChanOpen { logrus.Debug("Exiting the stream because an error from the logs database client was received through the error chan.") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return stacktrace.Propagate(err, "An error occurred streaming user service logs.") } logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan") + logrus.Infof("ENGINE [engine_connect_server_service.go] TOTAL TIME TO STREAM LOGS IN ENGINE: %v", totalLogStreamDuration) return nil } } diff --git a/engine/server/go.mod b/engine/server/go.mod index f6548f984e..4660cedcc6 100644 --- a/engine/server/go.mod +++ b/engine/server/go.mod @@ -63,7 +63,7 @@ require ( github.com/kurtosis-tech/kurtosis/grpc-file-transfer/golang v0.0.0-20230803130419-099ee7a4e3dc github.com/kurtosis-tech/kurtosis/metrics-library/golang v0.0.0-20231206095907-9bdf0d02cb90 github.com/labstack/echo/v4 v4.11.3 - github.com/rs/cors v1.9.0 + github.com/rs/cors v1.11.0 github.com/spf13/afero v1.10.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/apimachinery v0.27.2 diff --git a/engine/server/go.sum b/engine/server/go.sum index b7fd1e32d1..c027f4c074 100644 --- a/engine/server/go.sum +++ b/engine/server/go.sum @@ -312,8 +312,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= -github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po= +github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA= github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=