From 2bc89bc5c74ea995733a953d1ccba1011de415ff Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Tue, 28 Nov 2023 08:14:52 -0500 Subject: [PATCH 1/3] support logs db for k8s --- engine/server/engine/main.go | 33 ++++++++------ .../server/engine_connect_server_service.go | 43 +++---------------- 2 files changed, 27 insertions(+), 49 deletions(-) diff --git a/engine/server/engine/main.go b/engine/server/engine/main.go index b5be184041..40d6c2c74e 100644 --- a/engine/server/engine/main.go +++ b/engine/server/engine/main.go @@ -19,6 +19,8 @@ import ( em_api "github.com/kurtosis-tech/kurtosis/enclave-manager/server" "github.com/kurtosis-tech/kurtosis/engine/launcher/args" "github.com/kurtosis-tech/kurtosis/engine/launcher/args/kurtosis_backend_config" + "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs" + "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock" @@ -140,19 +142,11 @@ func runMain() error { return stacktrace.Propagate(err, "An error occurred getting the Kurtosis backend for backend type '%v' and config '%+v'", serverArgs.KurtosisBackendType, backendConfig) } + logsDatabaseClient := getLogsDatabaseClient(serverArgs.KurtosisBackendType, kurtosisBackend) + + // TODO: Move log file management into LogsDatabaseClient osFs := volume_filesystem.NewOsVolumeFilesystem() realTime := logs_clock.NewRealClock() - - // TODO: remove once users are fully migrated to log retention/new log schema - // pulls logs per enclave/per service id - perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy() - perFileLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perFileStreamStrategy) - - // pulls logs /per week/per enclave/per service - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime) - perWeekLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamStrategy) - - // TODO: Move logFileManager into LogsDatabaseClient logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime) logFileManager.StartLogFileManagement(ctx) @@ -232,8 +226,7 @@ func runMain() error { enclaveManager, serverArgs.MetricsUserID, serverArgs.DidUserAcceptSendingMetrics, - perWeekLogsDatabaseClient, - perFileLogsDatabaseClient, + logsDatabaseClient, logFileManager, metricsClient) apiPath, handler := kurtosis_engine_rpc_api_bindingsconnect.NewEngineServiceHandler(engineConnectServer) @@ -329,6 +322,20 @@ func getKurtosisBackend(ctx context.Context, kurtosisBackendType args.KurtosisBa return kurtosisBackend, nil } +func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend) centralized_logs.LogsDatabaseClient { + var logsDatabaseClient centralized_logs.LogsDatabaseClient + switch kurtosisBackendType { + case args.KurtosisBackendType_Docker: + osFs := volume_filesystem.NewOsVolumeFilesystem() + realTime := logs_clock.NewRealClock() + perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime) + logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamLogsStrategy) + case args.KurtosisBackendType_Kubernetes: + logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend) + } + return logsDatabaseClient +} + func formatFilenameFunctionForLogs(filename string, functionName string) string { var output strings.Builder output.WriteString("[") diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index 015bb17c28..db53c976a7 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -22,10 +22,6 @@ const ( subnetworkDisableBecauseItIsDeprecated = false ) -var ( - logRetentionFeatureReleaseTime = time.Date(2023, 9, 7, 13, 0, 0, 0, time.UTC) -) - type EngineConnectServerService struct { // The version tag of the engine server image, so it can report its own version imageVersionTag string @@ -38,14 +34,8 @@ type EngineConnectServerService struct { // User consent to send metrics didUserAcceptSendingMetrics bool - // The clients for consuming container logs from the logs' database server - - // per week pulls logs from enclaves created post log retention feature - perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient - - // per file pulls logs from enclaves created pre log retention feature - // TODO: remove once users are fully migrated to log retention/new log schema - perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient + // The client for consuming container logs from the logs database + logsDatabaseClient centralized_logs.LogsDatabaseClient logFileManager *log_file_manager.LogFileManager @@ -57,8 +47,7 @@ func NewEngineConnectServerService( enclaveManager *enclave_manager.EnclaveManager, metricsUserId string, didUserAcceptSendingMetrics bool, - perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient, - perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient, + logsDatabaseClient centralized_logs.LogsDatabaseClient, logFileManager *log_file_manager.LogFileManager, metricsClient metrics_client.MetricsClient, ) *EngineConnectServerService { @@ -67,8 +56,7 @@ func NewEngineConnectServerService( enclaveManager: enclaveManager, metricsUserID: metricsUserId, didUserAcceptSendingMetrics: didUserAcceptSendingMetrics, - perWeekLogsDatabaseClient: perWeekLogsDatabaseClient, - perFileLogsDatabaseClient: perFileLogsDatabaseClient, + logsDatabaseClient: logsDatabaseClient, logFileManager: logFileManager, metricsClient: metricsClient, } @@ -205,7 +193,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c requestedServiceUuids[serviceUuid] = true } - if service.perWeekLogsDatabaseClient == nil || service.perFileLogsDatabaseClient == nil { + if service.logsDatabaseClient == nil { return stacktrace.NewError("It's not possible to return service logs because there is no logs database client; this is bug in Kurtosis") } @@ -225,14 +213,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c return stacktrace.Propagate(err, "An error occurred creating the conjunctive log line filters from the GRPC's conjunctive log line filters '%+v'", args.GetConjunctiveFilters()) } - // get enclave creation time to determine strategy to pull logs - enclaveCreationTime, err := service.getEnclaveCreationTime(ctx, enclaveUuid) - if err != nil { - return stacktrace.Propagate(err, "An error occurred while trying to get the enclave creation time to determine how to pull logs.") - } - logsDatabaseClient := service.getLogsDatabaseClient(enclaveCreationTime) - - serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = logsDatabaseClient.StreamUserServiceLogs( + serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = service.logsDatabaseClient.StreamUserServiceLogs( contextWithCancel, enclaveUuid, requestedServiceUuids, @@ -300,8 +281,7 @@ func (service *EngineConnectServerService) reportAnyMissingUuidsAndGetNotFoundUu requestedServiceUuids map[user_service.ServiceUUID]bool, stream *connect.ServerStream[kurtosis_engine_rpc_api_bindings.GetServiceLogsResponse], ) (map[string]bool, error) { - // doesn't matter which logs client is used here - existingServiceUuids, err := service.perWeekLogsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids) + existingServiceUuids, err := service.logsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids) if err != nil { return nil, stacktrace.Propagate(err, "An error occurred retrieving the exhaustive list of service UUIDs from the log client for enclave '%v' and for the requested UUIDs '%+v'", enclaveUuid, requestedServiceUuids) } @@ -423,15 +403,6 @@ func newConjunctiveLogLineFiltersFromGRPCLogLineFilters( return conjunctiveLogLineFilters, nil } -// If the enclave was created prior to log retention, return the per file logs client -func (service *EngineConnectServerService) getLogsDatabaseClient(enclaveCreationTime time.Time) centralized_logs.LogsDatabaseClient { - if enclaveCreationTime.After(logRetentionFeatureReleaseTime) { - return service.perWeekLogsDatabaseClient - } else { - return service.perFileLogsDatabaseClient - } -} - func (service *EngineConnectServerService) getEnclaveCreationTime(ctx context.Context, enclaveUuid enclave.EnclaveUUID) (time.Time, error) { enclaves, err := service.enclaveManager.GetEnclaves(ctx) if err != nil { From e52fdcef9ea09395abbf1f2f763b6771acd820f2 Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Tue, 28 Nov 2023 09:01:21 -0500 Subject: [PATCH 2/3] remove enclave creation time --- .../server/engine_connect_server_service.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index db53c976a7..5fd20a33b5 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -15,7 +15,6 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" - "time" ) const ( @@ -402,21 +401,3 @@ func newConjunctiveLogLineFiltersFromGRPCLogLineFilters( return conjunctiveLogLineFilters, nil } - -func (service *EngineConnectServerService) getEnclaveCreationTime(ctx context.Context, enclaveUuid enclave.EnclaveUUID) (time.Time, error) { - enclaves, err := service.enclaveManager.GetEnclaves(ctx) - if err != nil { - return time.Time{}, err - } - - enclaveObj, found := enclaves[string(enclaveUuid)] - if !found { - return time.Time{}, stacktrace.NewError("Engine could not find enclave '%v'", enclaveUuid) - } - - timestamp := enclaveObj.GetCreationTime() - if timestamp == nil { - return time.Time{}, stacktrace.NewError("An error occurred getting the creation time for enclave '%v'. This is a bug in Kurtosis", enclaveUuid) - } - return timestamp.AsTime(), nil -} From e5d845493837decfb7ba05f4a89f2e69cf099a37 Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Wed, 29 Nov 2023 09:37:59 -0500 Subject: [PATCH 3/3] add comment --- engine/server/engine/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/server/engine/main.go b/engine/server/engine/main.go index 40d6c2c74e..a2661a2fee 100644 --- a/engine/server/engine/main.go +++ b/engine/server/engine/main.go @@ -322,6 +322,7 @@ func getKurtosisBackend(ctx context.Context, kurtosisBackendType args.KurtosisBa return kurtosisBackend, nil } +// if cluster is docker, return logs client for centralized logging, otherwise use logs db of kurtosis backend which uses k8s logs under the hood func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend) centralized_logs.LogsDatabaseClient { var logsDatabaseClient centralized_logs.LogsDatabaseClient switch kurtosisBackendType {