Skip to content

Commit

Permalink
Revert "(TESTING) use node names instead of NodeList"
Browse files Browse the repository at this point in the history
  • Loading branch information
ashnamehrotra authored Oct 17, 2023
1 parent 654e01b commit df9af21
Showing 1 changed file with 32 additions and 65 deletions.
97 changes: 32 additions & 65 deletions controllers/imagejob/imagejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,6 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ
return err
}

var nodeNames []string
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
// free up space
nodes = nil

template := corev1.PodTemplate{}
err = r.Get(ctx,
types.NamespacedName{
Expand All @@ -343,14 +336,16 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ
}

imageJob.Status = eraserv1.ImageJobStatus{
Desired: len(nodeNames),
Desired: len(nodes.Items),
Succeeded: 0,
Skipped: 0, // placeholder, updated below
Failed: 0,
Phase: eraserv1.PhaseRunning,
}

skipped := 0
var nodeList []corev1.Node

log := log.WithValues("job", imageJob.Name)

env := []corev1.EnvVar{
Expand All @@ -370,12 +365,12 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ

switch filterOpts.Type {
case "exclude":
nodeNames, skipped, err = r.filterOutSkippedNodes(ctx, nodeNames, filterOpts.Selectors)
nodeList, skipped, err = filterOutSkippedNodes(nodes, filterOpts.Selectors)
if err != nil {
return err
}
case "include":
nodeNames, skipped, err = r.selectIncludedNodes(ctx, nodeNames, filterOpts.Selectors)
nodeList, skipped, err = selectIncludedNodes(nodes, filterOpts.Selectors)
if err != nil {
return err
}
Expand All @@ -390,27 +385,22 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ

var namespacedNames []types.NamespacedName
podSpecTemplate := template.Template.Spec
for i := range nodeNames {
currNode, err := r.getNodebyName(ctx, nodeNames[i])
if err != nil {
log.Info("Node not found, skipping node", "Node name", nodeNames[i])
continue
}

log := log.WithValues("node", nodeNames[i])
podSpec, err := copyAndFillTemplateSpec(&podSpecTemplate, env, currNode)
for i := range nodeList {
log := log.WithValues("node", nodeList[i].Name)
podSpec, err := copyAndFillTemplateSpec(&podSpecTemplate, env, &nodeList[i])
if err != nil {
return err
}

containerName := podSpec.Containers[0].Name
nodeName := nodeList[i].Name

pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
Spec: *podSpec,
ObjectMeta: metav1.ObjectMeta{
Namespace: eraserUtils.GetNamespace(),
GenerateName: "eraser-" + nodeNames[i] + "-",
GenerateName: "eraser-" + nodeName + "-",
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(&template, template.GroupVersionKind()),
},
Expand All @@ -423,7 +413,7 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ
pod.Labels = map[string]string{imageJobTypeLabelKey: collectorJobType}
}

fitness := checkNodeFitness(pod, currNode)
fitness := checkNodeFitness(pod, &nodeList[i])
if !fitness {
log.Info(containerName + " pod does not fit on node, skipping")
continue
Expand All @@ -434,7 +424,7 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ
return err
}

log.Info("Started "+containerName+" pod on node", "nodeName", nodeNames[i])
log.Info("Started "+containerName+" pod on node", "nodeName", nodeName)
namespacedNames = append(namespacedNames, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})
}

Expand All @@ -447,16 +437,6 @@ func (r *Reconciler) handleNewJob(ctx context.Context, imageJob *eraserv1.ImageJ
return nil
}

func (r *Reconciler) getNodebyName(ctx context.Context, nodeName string) (*corev1.Node, error) {
node := &corev1.Node{}
err := r.Get(ctx, types.NamespacedName{Name: nodeName}, node)
if err != nil {
log.Error(err, "Unable to find node", "Node name", nodeName)
return nil, err
}
return node, nil
}

func (r *Reconciler) isPodReady(ctx context.Context, namespacedName types.NamespacedName) wait.ConditionFunc {
return func() (bool, error) {
currentPod := &corev1.Pod{}
Expand Down Expand Up @@ -505,74 +485,61 @@ func (r *Reconciler) updateJobStatus(ctx context.Context, imageJob *eraserv1.Ima
return nil
}

func (r *Reconciler) selectIncludedNodes(ctx context.Context, nodeNames []string, includeNodesSelectors []string) ([]string, int, error) {
func selectIncludedNodes(nodes *corev1.NodeList, includeNodesSelectors []string) ([]corev1.Node, int, error) {
skipped := 0
newNodeNames := make([]string, 0, len(nodeNames))
nodeList := make([]corev1.Node, 0, len(nodes.Items))

nodes:
for i := range nodeNames {
nodeName := nodeNames[i]
log := log.WithValues("node", nodeName)
for i := range nodes.Items {
log := log.WithValues("node", nodes.Items[i].Name)
skipped++

currNode, err := r.getNodebyName(ctx, nodeName)
if err != nil {
log.Error(err, "Could not find node", "Node name", nodeName)
continue nodes
}

nodeName := nodes.Items[i].Name
for _, includeNodesSelectors := range includeNodesSelectors {
includedLabels, err := labels.Parse(includeNodesSelectors)
if err != nil {
return nil, -1, err
}

log.V(1).Info("includedLabels", "includedLabels", includedLabels)
log.V(1).Info("nodeLabels", "nodeLabels", currNode.ObjectMeta.Labels)
if includedLabels.Matches(labels.Set(currNode.ObjectMeta.Labels)) {
log.V(1).Info("nodeLabels", "nodeLabels", nodes.Items[i].ObjectMeta.Labels)
if includedLabels.Matches(labels.Set(nodes.Items[i].ObjectMeta.Labels)) {
log.Info("node is included because it matched the specified labels",
"nodeName", nodeName,
"labels", currNode.ObjectMeta.Labels,
"labels", nodes.Items[i].ObjectMeta.Labels,
"specifiedSelectors", includeNodesSelectors,
)

newNodeNames = append(newNodeNames, nodeName)
nodeList = append(nodeList, nodes.Items[i])
skipped--
continue nodes
}
}
}

return newNodeNames, skipped, nil
return nodeList, skipped, nil
}

func (r *Reconciler) filterOutSkippedNodes(ctx context.Context, nodeNames []string, skipNodesSelectors []string) ([]string, int, error) {
func filterOutSkippedNodes(nodes *corev1.NodeList, skipNodesSelectors []string) ([]corev1.Node, int, error) {
skipped := 0
newNodeNames := make([]string, 0, len(nodeNames))
nodeList := make([]corev1.Node, 0, len(nodes.Items))

nodes:
for i := range nodeNames {
nodeName := nodeNames[i]
log := log.WithValues("node", nodeNames[i])

currNode, err := r.getNodebyName(ctx, nodeName)
if err != nil {
log.Error(err, "Could not find node", "N ode name", nodeName)
continue nodes
}
for i := range nodes.Items {
log := log.WithValues("node", nodes.Items[i].Name)

nodeName := nodes.Items[i].Name
for _, skipNodesSelector := range skipNodesSelectors {
skipLabels, err := labels.Parse(skipNodesSelector)
if err != nil {
return nil, -1, err
}

log.V(1).Info("skipLabels", "skipLabels", skipLabels)
log.V(1).Info("nodeLabels", "nodeLabels", currNode.ObjectMeta.Labels)
if skipLabels.Matches(labels.Set(currNode.ObjectMeta.Labels)) {
log.V(1).Info("nodeLabels", "nodeLabels", nodes.Items[i].ObjectMeta.Labels)
if skipLabels.Matches(labels.Set(nodes.Items[i].ObjectMeta.Labels)) {
log.Info("node will be skipped because it matched the specified labels",
"nodeName", nodeName,
"labels", currNode.ObjectMeta.Labels,
"labels", nodes.Items[i].ObjectMeta.Labels,
"specifiedSelectors", skipNodesSelectors,
)

Expand All @@ -581,10 +548,10 @@ nodes:
}
}

newNodeNames = append(newNodeNames, nodeName)
nodeList = append(nodeList, nodes.Items[i])
}

return newNodeNames, skipped, nil
return nodeList, skipped, nil
}

func copyAndFillTemplateSpec(templateSpecTemplate *corev1.PodSpec, env []corev1.EnvVar, node *corev1.Node) (*corev1.PodSpec, error) {
Expand Down

0 comments on commit df9af21

Please sign in to comment.