Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support logs db for k8s #1864

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
em_api "github.com/kurtosis-tech/kurtosis/enclave-manager/server"
"github.com/kurtosis-tech/kurtosis/engine/launcher/args"
"github.com/kurtosis-tech/kurtosis/engine/launcher/args/kurtosis_backend_config"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock"
Expand Down Expand Up @@ -140,19 +142,11 @@ func runMain() error {
return stacktrace.Propagate(err, "An error occurred getting the Kurtosis backend for backend type '%v' and config '%+v'", serverArgs.KurtosisBackendType, backendConfig)
}

logsDatabaseClient := getLogsDatabaseClient(serverArgs.KurtosisBackendType, kurtosisBackend)

// TODO: Move log file management into LogsDatabaseClient
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()

// TODO: remove once users are fully migrated to log retention/new log schema
// pulls logs per enclave/per service id
perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy()
perFileLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perFileStreamStrategy)

// pulls logs /per week/per enclave/per service
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
perWeekLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamStrategy)

// TODO: Move logFileManager into LogsDatabaseClient
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime)
logFileManager.StartLogFileManagement(ctx)

Expand Down Expand Up @@ -232,8 +226,7 @@ func runMain() error {
enclaveManager,
serverArgs.MetricsUserID,
serverArgs.DidUserAcceptSendingMetrics,
perWeekLogsDatabaseClient,
perFileLogsDatabaseClient,
logsDatabaseClient,
logFileManager,
metricsClient)
apiPath, handler := kurtosis_engine_rpc_api_bindingsconnect.NewEngineServiceHandler(engineConnectServer)
Expand Down Expand Up @@ -329,6 +322,21 @@ func getKurtosisBackend(ctx context.Context, kurtosisBackendType args.KurtosisBa
return kurtosisBackend, nil
}

// if cluster is docker, return logs client for centralized logging, otherwise use logs db of kurtosis backend which uses k8s logs under the hood
func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend) centralized_logs.LogsDatabaseClient {
var logsDatabaseClient centralized_logs.LogsDatabaseClient
switch kurtosisBackendType {
case args.KurtosisBackendType_Docker:
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamLogsStrategy)
case args.KurtosisBackendType_Kubernetes:
logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend)
}
return logsDatabaseClient
}

func formatFilenameFunctionForLogs(filename string, functionName string) string {
var output strings.Builder
output.WriteString("[")
Expand Down
62 changes: 7 additions & 55 deletions engine/server/engine/server/engine_connect_server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@ import (
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
)

const (
subnetworkDisableBecauseItIsDeprecated = false
)

var (
logRetentionFeatureReleaseTime = time.Date(2023, 9, 7, 13, 0, 0, 0, time.UTC)
)

type EngineConnectServerService struct {
// The version tag of the engine server image, so it can report its own version
imageVersionTag string
Expand All @@ -38,14 +33,8 @@ type EngineConnectServerService struct {
// User consent to send metrics
didUserAcceptSendingMetrics bool

// The clients for consuming container logs from the logs' database server

// per week pulls logs from enclaves created post log retention feature
perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient

// per file pulls logs from enclaves created pre log retention feature
// TODO: remove once users are fully migrated to log retention/new log schema
perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient
// The client for consuming container logs from the logs database
logsDatabaseClient centralized_logs.LogsDatabaseClient

logFileManager *log_file_manager.LogFileManager

Expand All @@ -57,8 +46,7 @@ func NewEngineConnectServerService(
enclaveManager *enclave_manager.EnclaveManager,
metricsUserId string,
didUserAcceptSendingMetrics bool,
perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient,
perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient,
logsDatabaseClient centralized_logs.LogsDatabaseClient,
logFileManager *log_file_manager.LogFileManager,
metricsClient metrics_client.MetricsClient,
) *EngineConnectServerService {
Expand All @@ -67,8 +55,7 @@ func NewEngineConnectServerService(
enclaveManager: enclaveManager,
metricsUserID: metricsUserId,
didUserAcceptSendingMetrics: didUserAcceptSendingMetrics,
perWeekLogsDatabaseClient: perWeekLogsDatabaseClient,
perFileLogsDatabaseClient: perFileLogsDatabaseClient,
logsDatabaseClient: logsDatabaseClient,
logFileManager: logFileManager,
metricsClient: metricsClient,
}
Expand Down Expand Up @@ -205,7 +192,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
requestedServiceUuids[serviceUuid] = true
}

if service.perWeekLogsDatabaseClient == nil || service.perFileLogsDatabaseClient == nil {
if service.logsDatabaseClient == nil {
return stacktrace.NewError("It's not possible to return service logs because there is no logs database client; this is bug in Kurtosis")
}

Expand All @@ -225,14 +212,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
return stacktrace.Propagate(err, "An error occurred creating the conjunctive log line filters from the GRPC's conjunctive log line filters '%+v'", args.GetConjunctiveFilters())
}

// get enclave creation time to determine strategy to pull logs
enclaveCreationTime, err := service.getEnclaveCreationTime(ctx, enclaveUuid)
if err != nil {
return stacktrace.Propagate(err, "An error occurred while trying to get the enclave creation time to determine how to pull logs.")
}
logsDatabaseClient := service.getLogsDatabaseClient(enclaveCreationTime)

serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = logsDatabaseClient.StreamUserServiceLogs(
serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = service.logsDatabaseClient.StreamUserServiceLogs(
contextWithCancel,
enclaveUuid,
requestedServiceUuids,
Expand Down Expand Up @@ -300,8 +280,7 @@ func (service *EngineConnectServerService) reportAnyMissingUuidsAndGetNotFoundUu
requestedServiceUuids map[user_service.ServiceUUID]bool,
stream *connect.ServerStream[kurtosis_engine_rpc_api_bindings.GetServiceLogsResponse],
) (map[string]bool, error) {
// doesn't matter which logs client is used here
existingServiceUuids, err := service.perWeekLogsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids)
existingServiceUuids, err := service.logsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred retrieving the exhaustive list of service UUIDs from the log client for enclave '%v' and for the requested UUIDs '%+v'", enclaveUuid, requestedServiceUuids)
}
Expand Down Expand Up @@ -422,30 +401,3 @@ func newConjunctiveLogLineFiltersFromGRPCLogLineFilters(

return conjunctiveLogLineFilters, nil
}

// If the enclave was created prior to log retention, return the per file logs client
func (service *EngineConnectServerService) getLogsDatabaseClient(enclaveCreationTime time.Time) centralized_logs.LogsDatabaseClient {
if enclaveCreationTime.After(logRetentionFeatureReleaseTime) {
return service.perWeekLogsDatabaseClient
} else {
return service.perFileLogsDatabaseClient
}
}

func (service *EngineConnectServerService) getEnclaveCreationTime(ctx context.Context, enclaveUuid enclave.EnclaveUUID) (time.Time, error) {
enclaves, err := service.enclaveManager.GetEnclaves(ctx)
if err != nil {
return time.Time{}, err
}

enclaveObj, found := enclaves[string(enclaveUuid)]
if !found {
return time.Time{}, stacktrace.NewError("Engine could not find enclave '%v'", enclaveUuid)
}

timestamp := enclaveObj.GetCreationTime()
if timestamp == nil {
return time.Time{}, stacktrace.NewError("An error occurred getting the creation time for enclave '%v'. This is a bug in Kurtosis", enclaveUuid)
}
return timestamp.AsTime(), nil
}
Loading