Skip to content

Commit

Permalink
fix: expand to sts and ds
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirdavid1 committed Dec 22, 2024
1 parent 212962e commit 41a0d4e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 48 deletions.
6 changes: 3 additions & 3 deletions api/odigos/v1alpha1/instrumentedapplication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type OtherAgent struct {
type ProcessingState string

const (
ProcessingStateFailed ProcessingState = "Failed"
ProcessingStateSucceeded ProcessingState = "Succeeded"
ProcessingStateSkipped ProcessingState = "Skipped"
ProcessingStateFailed ProcessingState = "Failed" // Used when CRI fails to detect the runtime
ProcessingStateSucceeded ProcessingState = "Succeeded" // Used when originally come from CRI
ProcessingStateSkipped ProcessingState = "Skipped" // Used when env originally come from manifest
)

// +kubebuilder:object:generate=true
Expand Down
5 changes: 1 addition & 4 deletions cli/cmd/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ func (m *MigrationManager) Run(fromVersion, toVersion string) error {
}

// Check if migration should be executed
if true { //semver.Compare(fromVersion, cutVersion) < 0 && semver.Compare(toVersion, cutVersion) >= 0
if semver.Compare(fromVersion, cutVersion) < 0 && semver.Compare(toVersion, cutVersion) >= 0 {
fmt.Printf("Executing migration: %s - %s\n", migration.Name(), migration.Description())
if err := migration.Execute(); err != nil {
return fmt.Errorf("migration %s failed: %w", migration.Name(), err)
}
fmt.Printf("Migration %s completed successfully.\n", migration.Name())
} else {
fmt.Printf("Skipping migration: %s (cut version: %s, from: %s, to: %s)\n",
migration.Name(), cutVersion, fromVersion, toVersion)
}
}
return nil
Expand Down
219 changes: 178 additions & 41 deletions cli/cmd/migrations/runtime_details_migration/migrate_runtime_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (

"github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/cli/pkg/kube"
"github.com/odigos-io/odigos/common/envOverwrite"
k8scontainer "github.com/odigos-io/odigos/k8sutils/pkg/container"
"github.com/odigos-io/odigos/k8sutils/pkg/envoverwrite"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type MigrateRuntimeDetails struct {
Expand All @@ -29,33 +31,40 @@ func (m *MigrateRuntimeDetails) TriggerVersion() string {
}

func (m *MigrateRuntimeDetails) Execute() error {
fmt.Println("Migrating RuntimeDetailsByContainer....")

gvr := schema.GroupVersionResource{Group: "odigos.io", Version: "v1alpha1", Resource: "instrumentationconfigs"}

ctx := context.TODO()
instrumentationConfigs, err := m.Client.Dynamic.Resource(gvr).List(ctx, metav1.ListOptions{})
instrumentationConfigs, err := m.Client.OdigosClient.InstrumentationConfigs("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(fmt.Errorf("failed to list InstrumentationConfigs: %v", err))
return err
}

workloadNamespaces := make(map[string]map[string][]string) // workloadType -> namespace -> []workloadNames
workloadNamespaces := make(map[string]map[string]map[string]*v1alpha1.InstrumentationConfig)
// Example structure:
// {
// "deployment": {
// "default": {
// "frontend": *<InstrumentationConfig>,
// },
// },

for _, item := range instrumentationConfigs.Items {
fmt.Printf("Found InstrumentationConfig: %s in namespace: %s\n", item.GetName(), item.GetNamespace())
IcName := item.GetName()
IcNamespace := item.GetNamespace()
parts := strings.Split(IcName, "-")
if len(parts) < 2 {
fmt.Printf("Skipping invalid InstrumentationConfig name: %s\n", IcName)
continue
}

workloadType := parts[0] // deployment/statefulset/aemonset
workloadType := parts[0] // deployment/statefulset/daemonset
workloadName := strings.Join(parts[1:], "-")
if _, exists := workloadNamespaces[workloadType]; !exists {
workloadNamespaces[workloadType] = make(map[string][]string)
workloadNamespaces[workloadType] = make(map[string]map[string]*v1alpha1.InstrumentationConfig)
}
if _, exists := workloadNamespaces[workloadType][IcNamespace]; !exists {
workloadNamespaces[workloadType][IcNamespace] = make(map[string]*v1alpha1.InstrumentationConfig)
}
workloadNamespaces[workloadType][IcNamespace] = append(workloadNamespaces[workloadType][IcNamespace], workloadName)

// Save workloadName and the corresponding InstrumentationConfig reference
workloadNamespaces[workloadType][IcNamespace][workloadName] = &item
}
for workloadType, namespaces := range workloadNamespaces {
switch workloadType {
Expand All @@ -79,79 +88,207 @@ func (m *MigrateRuntimeDetails) Execute() error {
return nil
}

func fetchAndProcessDeployments(clientset *kube.Client, namespaces map[string][]string) error {
func fetchAndProcessDeployments(clientset *kube.Client, namespaces map[string]map[string]*v1alpha1.InstrumentationConfig) error {
for namespace, workloadNames := range namespaces {
fmt.Printf("Processing Deployments in namespace: %s\n", namespace)
deployments, err := clientset.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list deployments in namespace %s: %v", namespace, err)
}

for _, dep := range deployments.Items {

// Checking if the deployment is in the list of deployments that need to be processed
if contains(workloadNames, dep.Name) {
fmt.Printf("Processing Deployment: %s in namespace: %s\n", dep.Name, dep.Namespace)
originalEnvVar, _ := envoverwrite.NewOrigWorkloadEnvValues(dep.Annotations)
allNil := originalEnvVar.AreAllEnvValuesNil()
if allNil {
// update instrumentationConfig object

originalWorkloadEnvVar, _ := envoverwrite.NewOrigWorkloadEnvValues(dep.Annotations)
workloadInstrumentationConfigReference := workloadNames[dep.Name]
runtimeDetailsByContainer := workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer

for _, containerObject := range dep.Spec.Template.Spec.Containers {

err := handleContainerRuntimeDetailsUpdate(
containerObject,
*originalWorkloadEnvVar,
&runtimeDetailsByContainer,
)
if err != nil {
return fmt.Errorf("failed to process container %s in deployment %s: %v", containerObject.Name, dep.Name, err)
}
}
instrumentationConfig, err := clientset.OdigosClient.InstrumentationConfigs(dep.Namespace).Get(context.TODO(), dep.Name, metav1.GetOptions{})
_, err = clientset.OdigosClient.InstrumentationConfigs(dep.Namespace).UpdateStatus(
context.TODO(),
workloadInstrumentationConfigReference,
metav1.UpdateOptions{})
if err != nil {
fmt.Printf("Failed to get InstrumentationConfig: %v\n", err)
}
// TODO: Need to move the general operations to top level object and not per iteration.
// updating runtimeDetailsByContainer as Skipped - executed but nothing happen
for _, runtimeDetails := range instrumentationConfig.Status.RuntimeDetailsByContainer {
value := v1alpha1.ProcessingStateSkipped
runtimeDetails.RuntimeUpdateState = &value
}
for _, container := range dep.Spec.Template.Spec.Containers {
fmt.Printf("Processing container: %s\n", container.Name)
// Add your deployment-specific logic here
return err
}
}
}
}
return nil
}

func fetchAndProcessStatefulSets(clientset *kube.Client, namespaces map[string][]string) error {
func fetchAndProcessStatefulSets(clientset *kube.Client, namespaces map[string]map[string]*v1alpha1.InstrumentationConfig) error {
for namespace, workloadNames := range namespaces {
statefulSets, err := clientset.AppsV1().StatefulSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list statefulsets in namespace %s: %v", namespace, err)
}

for _, sts := range statefulSets.Items {
// Checking if the statefulset is in the list of statefulsets that need to be processed
if contains(workloadNames, sts.Name) {
fmt.Printf("Processing StatefulSet: %s in namespace: %s\n", sts.Name, sts.Namespace)
// Add your statefulset-specific logic here

originalWorkloadEnvVar, _ := envoverwrite.NewOrigWorkloadEnvValues(sts.Annotations)
workloadInstrumentationConfigReference := workloadNames[sts.Name]
runtimeDetailsByContainer := workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer

for _, containerObject := range sts.Spec.Template.Spec.Containers {
err := handleContainerRuntimeDetailsUpdate(
containerObject,
*originalWorkloadEnvVar,
&runtimeDetailsByContainer,
)
if err != nil {
return fmt.Errorf("failed to process container %s in statefulset %s: %v", containerObject.Name, sts.Name, err)
}
}

// Update runtimeDetailsByContainer in workloadInstrumentationConfigReference
workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer = runtimeDetailsByContainer

// Update the InstrumentationConfig status
_, err = clientset.OdigosClient.InstrumentationConfigs(sts.Namespace).UpdateStatus(
context.TODO(),
workloadInstrumentationConfigReference,
metav1.UpdateOptions{},
)
if err != nil {
return fmt.Errorf("failed to update status for statefulset %s in namespace %s: %v", sts.Name, sts.Namespace, err)
}
}
}
}
return nil
}

func fetchAndProcessDaemonSets(clientset *kube.Client, namespaces map[string][]string) error {
func fetchAndProcessDaemonSets(clientset *kube.Client, namespaces map[string]map[string]*v1alpha1.InstrumentationConfig) error {
for namespace, workloadNames := range namespaces {
daemonSets, err := clientset.AppsV1().DaemonSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list daemonsets in namespace %s: %v", namespace, err)
}

for _, ds := range daemonSets.Items {
// Checking if the daemonset is in the list of daemonsets that need to be processed
if contains(workloadNames, ds.Name) {
fmt.Printf("Processing DaemonSet: %s in namespace: %s\n", ds.Name, ds.Namespace)
// Add your daemonset-specific logic here

originalWorkloadEnvVar, _ := envoverwrite.NewOrigWorkloadEnvValues(ds.Annotations)
workloadInstrumentationConfigReference := workloadNames[ds.Name]
runtimeDetailsByContainer := workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer

for _, containerObject := range ds.Spec.Template.Spec.Containers {
err := handleContainerRuntimeDetailsUpdate(
containerObject,
*originalWorkloadEnvVar,
&runtimeDetailsByContainer)
if err != nil {
return fmt.Errorf("failed to process container %s in daemonset %s: %v", containerObject.Name, ds.Name, err)
}
}

// Update runtimeDetailsByContainer in workloadInstrumentationConfigReference
workloadInstrumentationConfigReference.Status.RuntimeDetailsByContainer = runtimeDetailsByContainer

// Update the InstrumentationConfig status
_, err = clientset.OdigosClient.InstrumentationConfigs(ds.Namespace).UpdateStatus(
context.TODO(),
workloadInstrumentationConfigReference,
metav1.UpdateOptions{},
)
if err != nil {
return fmt.Errorf("failed to update status for daemonset %s in namespace %s: %v", ds.Name, ds.Namespace, err)
}
}
}
}
return nil
}
func handleContainerRuntimeDetailsUpdate(
containerObject v1.Container,
originalWorkloadEnvVar envoverwrite.OrigWorkloadEnvValues,
runtimeDetailsByContainer *[]v1alpha1.RuntimeDetailsByContainer,
) error {
for i := range *runtimeDetailsByContainer {
containerRuntimeDetails := &(*runtimeDetailsByContainer)[i]

// Find the relevant container in runtimeDetailsByContainer
if containerRuntimeDetails.ContainerName != containerObject.Name {
continue
}

// Process environment variables for the container
annotationEnvVarsForContainer := originalWorkloadEnvVar.GetContainerStoredEnvs(containerObject.Name)
for envKey, envValue := range annotationEnvVarsForContainer {
isEnvVarAlreadyExists := isEnvVarPresent(containerRuntimeDetails.EnvVarsFromDockerFile, envKey)
if isEnvVarAlreadyExists {
continue
}

// Handle runtime-originated environment variables
if envValue == nil {
containerEnvFromManifestValue := k8scontainer.GetContainerEnvVarValue(&containerObject, envKey)
if containerEnvFromManifestValue != nil {
workloadEnvVarWithoutOdigosAdditions := cleanUpManifestValueFromOdigosAdditions(envKey, *containerEnvFromManifestValue)
envVarWithoutOdigosAddition := v1alpha1.EnvVar{Name: envKey, Value: workloadEnvVarWithoutOdigosAdditions}
containerRuntimeDetails.EnvVarsFromDockerFile = append(containerRuntimeDetails.EnvVarsFromDockerFile, envVarWithoutOdigosAddition)
state := v1alpha1.ProcessingStateSucceeded
containerRuntimeDetails.RuntimeUpdateState = &state
}
}
}

// Mark container as skipped if no runtime environment variables exist
if len(containerRuntimeDetails.EnvVarsFromDockerFile) == 0 {
state := v1alpha1.ProcessingStateSkipped
containerRuntimeDetails.RuntimeUpdateState = &state
}
}
return nil
}

func contains(workloadNames map[string]*v1alpha1.InstrumentationConfig, workloadName string) bool {
_, exists := workloadNames[workloadName]
return exists
}

func cleanUpManifestValueFromOdigosAdditions(manifestEnvVarKey string, manifestEnvVarValue string) string {
_, exists := envOverwrite.EnvValuesMap[manifestEnvVarKey]
if exists {
// clean up the value from all possible odigos additions
for _, value := range envOverwrite.GetPossibleValuesPerEnv(manifestEnvVarKey) {
manifestEnvVarValue = strings.ReplaceAll(manifestEnvVarValue, value, "")
}
withoutTrailingColon := cleanTrailingChar(manifestEnvVarValue, ":")
withoutTrailingSpace := cleanTrailingChar(withoutTrailingColon, " ")
return withoutTrailingSpace
} else {
// manifestEnvVarKey does not exist in the EnvValuesMap
return ""
}
}

// In case we remove OdigosAdditions to PythonPath we need to remove this also.
func cleanTrailingChar(input string, char string) string {
if len(input) > 0 && input[len(input)-1:] == char {
return input[:len(input)-1]
}
return input
}

func contains(slice []string, item string) bool {
for _, v := range slice {
if v == item {
func isEnvVarPresent(envVars []v1alpha1.EnvVar, envVarName string) bool {
for _, envVar := range envVars {
if envVar.Name == envVarName {
return true
}
}
Expand Down
4 changes: 4 additions & 0 deletions common/envOverwrite/overwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,7 @@ func ValToAppend(envName string, sdk common.OtelSdk) (string, bool) {

return valToAppend, true
}

func GetPossibleValuesPerEnv(env string) map[common.OtelSdk]string {
return EnvValuesMap[env].values
}
9 changes: 9 additions & 0 deletions k8sutils/pkg/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,12 @@ func AllContainersReady(pod *v1.Pod) bool {
}
return true
}

func GetContainerEnvVarValue(container *v1.Container, envVarName string) *string {
for _, env := range container.Env {
if env.Name == envVarName {
return &env.Value
}
}
return nil
}

0 comments on commit 41a0d4e

Please sign in to comment.