diff --git a/.gitignore b/.gitignore index df6cbf6f9..7fde39a2c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ go.work.sum cli/odigos .venv **/__pycache__/ -**/*.pyc \ No newline at end of file +**/*.pyc +serving-certs/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index e51bde1ec..aec4ea6f8 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -17,7 +17,10 @@ "request": "launch", "mode": "debug", "program": "${workspaceFolder}/instrumentor", - "cwd": "${workspaceFolder}/instrumentor" + "cwd": "${workspaceFolder}/instrumentor", + "env": { + "LOCAL_MUTATING_WEBHOOK_CERT_DIR": "${workspaceFolder}/serving-certs" + } }, { "name": "frontend", diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 28224c7c9..c2327488a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -167,3 +167,36 @@ make debug-odiglet Then, you can attach a debugger to the Odiglet pod. For example, if you are using Goland, you can follow the instructions [here](https://www.jetbrains.com/help/go/attach-to-running-go-processes-with-debugger.html#step-3-create-the-remote-run-debug-configuration-on-the-client-computer) to attach to a remote process. For Visual Studio Code, you can use the `.vscode/launch.json` file in this repo to attach to the Odiglet pod. + + + +## Instrumentor + +### Debugging +If the Mutating Webhook is enabled, follow these steps: + +1. Copy the TLS certificate and key: +Create a local directory and extract the certificate and key by running the following command: +``` +mkdir -p serving-certs && kubectl get secret instrumentor-webhook-cert -n odigos-system -o jsonpath='{.data.tls\.crt}' | base64 -d > serving-certs/tls.crt && kubectl get secret instrumentor-webhook-cert -n odigos-system -o jsonpath='{.data.tls\.key}' | base64 -d > serving-certs/tls.key +``` + + +2. Apply this service to the cluster, it will replace the existing `odigos-instrumentor` service: + +``` +apiVersion: v1 +kind: Service +metadata: + name: odigos-instrumentor + namespace: odigos-system +spec: + type: ExternalName + externalName: host.docker.internal + ports: + - name: webhook-server + port: 9443 + protocol: TCP +``` + +Once this is done, you can use the .vscode/launch.json configuration and run instrumentor local for debugging. \ No newline at end of file diff --git a/instrumentor/controllers/instrumentationdevice/pods_webhook.go b/instrumentor/controllers/instrumentationdevice/pods_webhook.go index 45fb0594c..cbf4f231c 100644 --- a/instrumentor/controllers/instrumentationdevice/pods_webhook.go +++ b/instrumentor/controllers/instrumentationdevice/pods_webhook.go @@ -3,22 +3,26 @@ package instrumentationdevice import ( "context" "fmt" + "strings" + common "github.com/odigos-io/odigos/common" "sigs.k8s.io/controller-runtime/pkg/webhook" corev1 "k8s.io/api/core/v1" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "k8s.io/apimachinery/pkg/runtime" ) +const ( + EnvVarNamespace = "ODIGOS_WORKLOAD_NAMESPACE" + EnvVarContainerName = "ODIGOS_CONTAINER_NAME" + EnvVarPodName = "ODIGOS_POD_NAME" +) + type PodsWebhook struct{} var _ webhook.CustomDefaulter = &PodsWebhook{} func (p *PodsWebhook) Default(ctx context.Context, obj runtime.Object) error { - // TODO(edenfed): add object selector to mutatingwebhookconfiguration - log := logf.FromContext(ctx) pod, ok := obj.(*corev1.Pod) if !ok { return fmt.Errorf("expected a Pod but got a %T", obj) @@ -28,7 +32,71 @@ func (p *PodsWebhook) Default(ctx context.Context, obj runtime.Object) error { pod.Annotations = map[string]string{} } - //pod.Annotations["odigos.io/instrumented-webhook"] = "true" - log.V(0).Info("Defaulted Pod", "name", pod.Name) + // Inject ODIGOS environment variables into all containers + injectOdigosEnvVars(pod) + return nil } + +func injectOdigosEnvVars(pod *corev1.Pod) { + namespace := pod.Namespace + + // Common environment variables that do not change across containers + commonEnvVars := []corev1.EnvVar{ + { + Name: EnvVarNamespace, + Value: namespace, + }, + { + Name: EnvVarPodName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + } + + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + + // Check if the container does NOT have device in conatiner limits. If so, skip the environment injection. + if !hasOdigosInstrumentationInLimits(container.Resources) { + continue + } + + // Check if the environment variables are already present, if so skip inject them again. + if envVarsExist(container.Env, commonEnvVars) { + continue + } + + container.Env = append(container.Env, append(commonEnvVars, corev1.EnvVar{ + Name: EnvVarContainerName, + Value: container.Name, + })...) + } +} + +func envVarsExist(containerEnv []corev1.EnvVar, commonEnvVars []corev1.EnvVar) bool { + envMap := make(map[string]struct{}) + for _, envVar := range containerEnv { + envMap[envVar.Name] = struct{}{} // Inserting empty struct as value + } + + for _, commonEnvVar := range commonEnvVars { + if _, exists := envMap[commonEnvVar.Name]; exists { // Checking if key exists + return true + } + } + return false +} + +// Helper function to check if a container's resource limits have a key starting with the specified namespace +func hasOdigosInstrumentationInLimits(resources corev1.ResourceRequirements) bool { + for resourceName := range resources.Limits { + if strings.HasPrefix(string(resourceName), common.OdigosResourceNamespace) { + return true + } + } + return false +} diff --git a/instrumentor/main.go b/instrumentor/main.go index 0dc820793..67a9494b9 100644 --- a/instrumentor/main.go +++ b/instrumentor/main.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -100,7 +101,7 @@ func main() { logger := zapr.NewLogger(zapLogger) ctrl.SetLogger(logger) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + mgrOptions := ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ BindAddress: metricsAddr, @@ -178,7 +179,20 @@ func main() { }, }, }, - }) + } + + // Check if the environment variable `LOCAL_WEBHOOK_CERT_DIR` is set. + // If defined, add WebhookServer options with the specified certificate directory. + // This is used primarily for local development environments to provide a custom path for serving TLS certificates. + localCertDir := os.Getenv("LOCAL_MUTATING_WEBHOOK_CERT_DIR") + if localCertDir != "" { + mgrOptions.WebhookServer = webhook.NewServer(webhook.Options{ + CertDir: localCertDir, + }) + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOptions) + if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) diff --git a/k8sutils/pkg/workload/ownerreference.go b/k8sutils/pkg/workload/ownerreference.go index f5bacf863..fbb061ee9 100644 --- a/k8sutils/pkg/workload/ownerreference.go +++ b/k8sutils/pkg/workload/ownerreference.go @@ -1,36 +1,43 @@ package workload import ( - "errors" + "fmt" "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// GetWorkloadFromOwnerReference retrieves both the workload name and workload kind +// from the provided owner reference. func GetWorkloadFromOwnerReference(ownerReference metav1.OwnerReference) (workloadName string, workloadKind WorkloadKind, err error) { - ownerName := ownerReference.Name - ownerKind := ownerReference.Kind + + return GetWorkloadNameAndKind(ownerReference.Name, ownerReference.Kind) +} + +func GetWorkloadNameAndKind(ownerName, ownerKind string) (string, WorkloadKind, error) { if ownerKind == "ReplicaSet" { - // ReplicaSet name is in the format - - hyphenIndex := strings.LastIndex(ownerName, "-") - if hyphenIndex == -1 { - // It is possible for a user to define a bare ReplicaSet without a deployment, currently not supporting this - err = errors.New("replicaset name does not contain a hyphen") - return - } - // Extract deployment name from ReplicaSet name - workloadName = ownerName[:hyphenIndex] - workloadKind = WorkloadKindDeployment - return + return extractDeploymentInfo(ownerName) + } + return handleNonReplicaSet(ownerName, ownerKind) +} + +// extractDeploymentInfo extracts deployment information from a ReplicaSet name +func extractDeploymentInfo(replicaSetName string) (string, WorkloadKind, error) { + hyphenIndex := strings.LastIndex(replicaSetName, "-") + if hyphenIndex == -1 { + return "", "", fmt.Errorf("replicaset name '%s' does not contain a hyphen", replicaSetName) } - workloadKind = WorkloadKindFromString(ownerKind) + deploymentName := replicaSetName[:hyphenIndex] + return deploymentName, WorkloadKindDeployment, nil +} + +// handleNonReplicaSet processes non-ReplicaSet workload types +func handleNonReplicaSet(ownerName, ownerKind string) (string, WorkloadKind, error) { + workloadKind := WorkloadKindFromString(ownerKind) if workloadKind == "" { - err = ErrKindNotSupported - return + return "", "", ErrKindNotSupported } - workloadName = ownerName - err = nil - return + return ownerName, workloadKind, nil } diff --git a/opampserver/pkg/server/handlers.go b/opampserver/pkg/server/handlers.go index ca4af1045..b42d49eb3 100644 --- a/opampserver/pkg/server/handlers.go +++ b/opampserver/pkg/server/handlers.go @@ -11,24 +11,35 @@ import ( "github.com/odigos-io/odigos/k8sutils/pkg/instrumentation_instance" "github.com/odigos-io/odigos/k8sutils/pkg/workload" "github.com/odigos-io/odigos/opampserver/pkg/connection" - "github.com/odigos-io/odigos/opampserver/pkg/deviceid" + di "github.com/odigos-io/odigos/opampserver/pkg/deviceid" "github.com/odigos-io/odigos/opampserver/pkg/sdkconfig" "github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers" "github.com/odigos-io/odigos/opampserver/protobufs" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" ) type ConnectionHandlers struct { - deviceIdCache *deviceid.DeviceIdCache + deviceIdCache *di.DeviceIdCache sdkConfig *sdkconfig.SdkConfigManager logger logr.Logger kubeclient client.Client + kubeClientSet *kubernetes.Clientset scheme *runtime.Scheme // TODO: revisit this, we should not depend on controller runtime nodeName string } +type opampAgentAttributesKeys struct { + ProgrammingLanguage string + ContainerName string + PodName string + Namespace string +} + func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId string, firstMessage *protobufs.AgentToServer) (*connection.ConnectionInfo, *protobufs.ServerToAgent, error) { if firstMessage.AgentDescription == nil { @@ -53,25 +64,19 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin return nil, nil, fmt.Errorf("missing pid in agent description") } - var programmingLanguage string - for _, attr := range firstMessage.AgentDescription.IdentifyingAttributes { - if attr.Key == string(semconv.TelemetrySDKLanguageKey) { - programmingLanguage = attr.Value.GetStringValue() - break - } - } - if programmingLanguage == "" { + attrs := extractOpampAgentAttributes(firstMessage.AgentDescription) + + if attrs.ProgrammingLanguage == "" { return nil, nil, fmt.Errorf("missing programming language in agent description") } - k8sAttributes, pod, err := c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId) + k8sAttributes, pod, err := c.resolveK8sAttributes(ctx, attrs, deviceId, c.logger) if err != nil { - c.logger.Error(err, "failed to get attributes from device", "deviceId", deviceId) - return nil, nil, err + return nil, nil, fmt.Errorf("failed to process k8s attributes: %w", err) } podWorkload := workload.PodWorkload{ - Namespace: pod.GetNamespace(), + Namespace: k8sAttributes.Namespace, Kind: workload.WorkloadKind(k8sAttributes.WorkloadKind), Name: k8sAttributes.WorkloadName, } @@ -83,7 +88,7 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin return nil, nil, err } - fullRemoteConfig, err := c.sdkConfig.GetFullConfig(ctx, remoteResourceAttributes, &podWorkload, instrumentedAppName, programmingLanguage) + fullRemoteConfig, err := c.sdkConfig.GetFullConfig(ctx, remoteResourceAttributes, &podWorkload, instrumentedAppName, attrs.ProgrammingLanguage) if err != nil { c.logger.Error(err, "failed to get full config", "k8sAttributes", k8sAttributes) return nil, nil, err @@ -96,7 +101,7 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin Pod: pod, ContainerName: k8sAttributes.ContainerName, Pid: pid, - ProgrammingLanguage: programmingLanguage, + ProgrammingLanguage: attrs.ProgrammingLanguage, InstrumentedAppName: instrumentedAppName, AgentRemoteConfig: fullRemoteConfig, RemoteResourceAttributes: remoteResourceAttributes, @@ -191,3 +196,72 @@ func (c *ConnectionHandlers) UpdateInstrumentationInstanceStatus(ctx context.Con return nil } + +// resolveK8sAttributes resolves K8s resource attributes using either direct attributes from opamp agent or device cache +func (c *ConnectionHandlers) resolveK8sAttributes(ctx context.Context, attrs opampAgentAttributesKeys, + deviceId string, logger logr.Logger) (*di.K8sResourceAttributes, *corev1.Pod, error) { + + if attrs.hasRequiredAttributes() { + podInfoResolver := di.NewK8sPodInfoResolver(logger, c.kubeClientSet) + return resolveFromDirectAttributes(ctx, attrs, podInfoResolver, c.kubeClientSet) + } + return c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId) +} + +func extractOpampAgentAttributes(agentDescription *protobufs.AgentDescription) opampAgentAttributesKeys { + result := opampAgentAttributesKeys{} + + for _, attr := range agentDescription.IdentifyingAttributes { + switch attr.Key { + case string(semconv.TelemetrySDKLanguageKey): + result.ProgrammingLanguage = attr.Value.GetStringValue() + case string(semconv.ContainerNameKey): + result.ContainerName = attr.Value.GetStringValue() + case string(semconv.K8SPodNameKey): + result.PodName = attr.Value.GetStringValue() + case string(semconv.K8SNamespaceNameKey): + result.Namespace = attr.Value.GetStringValue() + } + } + + return result +} + +func (k opampAgentAttributesKeys) hasRequiredAttributes() bool { + return k.ContainerName != "" && k.PodName != "" && k.Namespace != "" +} + +func resolveFromDirectAttributes(ctx context.Context, attrs opampAgentAttributesKeys, + podInfoResolver *di.K8sPodInfoResolver, kubeClient *kubernetes.Clientset) (*di.K8sResourceAttributes, *corev1.Pod, error) { + + pod, err := kubeClient.CoreV1().Pods(attrs.Namespace).Get(ctx, attrs.PodName, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + + var workloadName string + var workloadKind workload.WorkloadKind + + ownerRefs := pod.GetOwnerReferences() + for _, ownerRef := range ownerRefs { + workloadName, workloadKind, err = workload.GetWorkloadFromOwnerReference(ownerRef) + if err != nil { + return nil, nil, fmt.Errorf("failed to get workload from owner reference: %w", err) + } + } + + serviceName := podInfoResolver.ResolveServiceName(ctx, attrs.PodName, workloadName, &di.ContainerDetails{ + PodName: attrs.PodName, + }) + + k8sAttributes := &di.K8sResourceAttributes{ + Namespace: attrs.Namespace, + PodName: attrs.PodName, + ContainerName: attrs.ContainerName, + WorkloadKind: string(workloadKind), + WorkloadName: workloadName, + OtelServiceName: serviceName, + } + + return k8sAttributes, pod, nil +} diff --git a/opampserver/pkg/server/server.go b/opampserver/pkg/server/server.go index 4d8f0fcfb..04401dee6 100644 --- a/opampserver/pkg/server/server.go +++ b/opampserver/pkg/server/server.go @@ -17,12 +17,12 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClient *kubernetes.Clientset, nodeName string, odigosNs string) error { +func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string) error { listenEndpoint := fmt.Sprintf("0.0.0.0:%d", OpAmpServerDefaultPort) logger.Info("Starting opamp server", "listenEndpoint", listenEndpoint) - deviceidCache, err := deviceid.NewDeviceIdCache(logger, kubeClient) + deviceidCache, err := deviceid.NewDeviceIdCache(logger, kubeClientSet) if err != nil { return err } @@ -36,6 +36,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, deviceIdCache: deviceidCache, sdkConfig: sdkConfig, kubeclient: mgr.GetClient(), + kubeClientSet: kubeClientSet, scheme: mgr.GetScheme(), nodeName: nodeName, }