Skip to content

Commit

Permalink
fix: re-price nodes after every pricing update
Browse files Browse the repository at this point in the history
Previously there was a race between pricing updates and node creation.
If the pricing updated first, you would get new pricing (OD & Spot) for
nodes.  If the nodes were created first, you would use stale OD pricing
and not have spot pricing.  This change forces all nodes to have their
prices re-calculated after every pricing update.
  • Loading branch information
tzneal committed Mar 22, 2023
1 parent ef83274 commit b715d6c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
39 changes: 11 additions & 28 deletions cmd/eks-node-viewer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"time"
Expand Down Expand Up @@ -74,13 +73,18 @@ func main() {

defaults.SharedCredentialsFilename()
pprov := pricing.NewStaticProvider()
m := model.NewUIModel(strings.Split(flags.ExtraLabels, ","))
m.SetResources(strings.FieldsFunc(flags.Resources, func(r rune) bool { return r == ',' }))

if !flags.DisablePricing {
sess := session.Must(session.NewSession(nil))
pprov = pricing.NewProvider(ctx, sess)
updateAllPrices := func() {
m.Cluster().ForEachNode(func(n *model.Node) {
n.UpdatePrice(pprov)
})
}
pprov = pricing.NewProvider(ctx, sess, updateAllPrices)
}
m := model.NewUIModel(strings.Split(flags.ExtraLabels, ","))

m.SetResources(strings.FieldsFunc(flags.Resources, func(r rune) bool { return r == ',' }))

var nodeSelector labels.Selector
if ns, err := labels.Parse(flags.NodeSelector); err != nil {
Expand Down Expand Up @@ -126,7 +130,7 @@ func startMonitor(ctx context.Context, settings *monitorSettings) {
node, ok := cluster.GetNode(p.Spec.NodeName)
// need to potentially update node price as we need the fargate pod in order to figure out the cost
if ok && node.IsFargate() && !node.HasPrice() {
updatePrice(settings, node)
node.UpdatePrice(settings.pricing)
}
}
},
Expand Down Expand Up @@ -163,7 +167,7 @@ func startMonitor(ctx context.Context, settings *monitorSettings) {
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := model.NewNode(obj.(*v1.Node))
updatePrice(settings, node)
node.UpdatePrice(settings.pricing)
n := cluster.AddNode(node)
n.Show()
},
Expand All @@ -190,27 +194,6 @@ func startMonitor(ctx context.Context, settings *monitorSettings) {

}

func updatePrice(settings *monitorSettings, node *model.Node) {
// lookup our node price
node.Price = math.NaN()
if node.IsOnDemand() {
if price, ok := settings.pricing.OnDemandPrice(node.InstanceType()); ok {
node.Price = price
}
} else if node.IsSpot() {
if price, ok := settings.pricing.SpotPrice(node.InstanceType(), node.Zone()); ok {
node.Price = price
}
} else if node.IsFargate() && len(node.Pods()) == 1 {
cpu, mem, ok := node.Pods()[0].FargateCapacityProvisioned()
if ok {
if price, ok := settings.pricing.FargatePrice(cpu, mem); ok {
node.Price = price
}
}
}
}

// isTerminalPod returns true if the pod is deleting or in a terminal state
func isTerminalPod(p *v1.Pod) bool {
if !p.DeletionTimestamp.IsZero() {
Expand Down
10 changes: 9 additions & 1 deletion pkg/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (c *Cluster) DeleteNode(name string) {
}
}

func (c *Cluster) ForEachNode(f func(n *Node)) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodes {
f(n)
}
}

func (c *Cluster) GetNode(name string) (*Node, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down Expand Up @@ -143,7 +151,7 @@ func (c *Cluster) Stats() Stats {
}
// only add the price if it's not NaN which is used to indicate an unknown
// price
if n.Price == n.Price {
if n.HasPrice() {
st.TotalPrice += n.Price
}
st.NumNodes++
Expand Down
24 changes: 24 additions & 0 deletions pkg/model/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package model

import (
"fmt"
"math"
"sync"
"time"

v1 "k8s.io/api/core/v1"

"github.com/awslabs/eks-node-viewer/pkg/pricing"
)

type objectKey struct {
Expand Down Expand Up @@ -207,3 +210,24 @@ func (n *Node) HasPrice() bool {
// we use NaN for an unknown price, so if this is true the price is known
return n.Price == n.Price
}

func (n *Node) UpdatePrice(pricing *pricing.Provider) {
// lookup our n price
n.Price = math.NaN()
if n.IsOnDemand() {
if price, ok := pricing.OnDemandPrice(n.InstanceType()); ok {
n.Price = price
}
} else if n.IsSpot() {
if price, ok := pricing.SpotPrice(n.InstanceType(), n.Zone()); ok {
n.Price = price
}
} else if n.IsFargate() && len(n.Pods()) == 1 {
cpu, mem, ok := n.Pods()[0].FargateCapacityProvisioned()
if ok {
if price, ok := pricing.FargatePrice(cpu, mem); ok {
n.Price = price
}
}
}
}
6 changes: 4 additions & 2 deletions pkg/pricing/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Provider struct {
spotPrices map[string]zonalPricing
fargateVCPUPricePerHour float64
fargateGBPricePerHour float64
notify func()
}

// zonalPricing is used to capture the per-zone price
Expand Down Expand Up @@ -97,7 +98,7 @@ func NewStaticProvider() *Provider {
spotUpdateTime: initialPriceUpdate,
}
}
func NewProvider(ctx context.Context, sess *session.Session) *Provider {
func NewProvider(ctx context.Context, sess *session.Session, notify func()) *Provider {
region := "us-west-2"
if aws.StringValue(sess.Config.Region) != "" {
region = aws.StringValue(sess.Config.Region)
Expand All @@ -110,6 +111,7 @@ func NewProvider(ctx context.Context, sess *session.Session) *Provider {
spotUpdateTime: initialPriceUpdate,
ec2: ec2.New(sess),
pricing: NewPricingAPI(sess, region),
notify: notify,
}

go func() {
Expand Down Expand Up @@ -212,8 +214,8 @@ func (p *Provider) updatePricing(ctx context.Context) {
log.Printf("updating fargate pricing, %s", err)
}
}()

wg.Wait()
p.notify()
}

func (p *Provider) updateOnDemandPricing(ctx context.Context) error {
Expand Down

0 comments on commit b715d6c

Please sign in to comment.