Skip to content

Commit

Permalink
improve test + logging
Browse files Browse the repository at this point in the history
  • Loading branch information
phuhung273 committed Jan 26, 2025
1 parent 316906e commit a95474e
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func main() {
log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
}

metrics, err := observability.InitMetrics(nthConfig)
metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,")
Expand Down Expand Up @@ -218,7 +218,7 @@ func main() {
ec2Client := ec2.New(sess)

if nthConfig.EnablePrometheus {
go metrics.InitNodeMetrics(node, ec2Client)
go metrics.InitNodeMetrics(nthConfig, node, ec2Client)
}

completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second
Expand Down
4 changes: 2 additions & 2 deletions pkg/ec2helper/ec2helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) {
}

if result == nil || result.Reservations == nil {
return nil, fmt.Errorf("failed to describe instances")
return nil, fmt.Errorf("describe instances success but return empty response for tag key: %s", tag)
}

for _, reservation := range result.Reservations {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error
}

if ids == nil {
return nil, fmt.Errorf("failed to describe instances")
return nil, fmt.Errorf("get instance ids success but return empty response for tag key: %s", tag)
}

for _, id := range ids {
Expand Down
80 changes: 80 additions & 0 deletions pkg/ec2helper/ec2helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/aws/aws-node-termination-handler/pkg/ec2helper"
h "github.com/aws/aws-node-termination-handler/pkg/test"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
)

Expand All @@ -40,6 +41,85 @@ func TestGetInstanceIdsByTagKey(t *testing.T) {
h.Equals(t, instanceId2, instanceIds[1])
}

func TestGetInstanceIdsByTagKeyAPIError(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: getDescribeInstancesResp(),
DescribeInstancesErr: awserr.New("ThrottlingException", "Rate exceeded", nil),
}
ec2Helper := ec2helper.New(ec2Mock)
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Nok(t, err)
}

func TestGetInstanceIdsByTagKeyNilResponse(t *testing.T) {
ec2Mock := h.MockedEC2{}
ec2Helper := ec2helper.New(ec2Mock)
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Nok(t, err)
}

func TestGetInstanceIdsByTagKeyNilReservations(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: ec2.DescribeInstancesOutput{
Reservations: nil,
},
}
ec2Helper := ec2helper.New(ec2Mock)
_, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Nok(t, err)
}

func TestGetInstanceIdsByTagKeyEmptyReservation(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{},
},
}
ec2Helper := ec2helper.New(ec2Mock)
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Ok(t, err)
h.Equals(t, 0, len(instanceIds))
}

func TestGetInstanceIdsByTagKeyEmptyInstances(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
{
Instances: []*ec2.Instance{},
},
},
},
}
ec2Helper := ec2helper.New(ec2Mock)
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Ok(t, err)
h.Equals(t, 0, len(instanceIds))
}

func TestGetInstanceIdsByTagKeyNilInstancesId(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
{
Instances: []*ec2.Instance{
{
InstanceId: nil,
},
{
InstanceId: aws.String(instanceId1),
},
},
},
},
},
}
ec2Helper := ec2helper.New(ec2Mock)
instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag")
h.Ok(t, err)
h.Equals(t, 1, len(instanceIds))
}

func TestGetInstanceIdsMapByTagKey(t *testing.T) {
ec2Mock := h.MockedEC2{
DescribeInstancesResp: getDescribeInstancesResp(),
Expand Down
8 changes: 5 additions & 3 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,20 +652,22 @@ func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) {
}

if matchingNodes == nil || matchingNodes.Items == nil {
return nil, fmt.Errorf("failed to list nodes")
return nil, fmt.Errorf("list nodes success but return empty response")
}

for _, node := range matchingNodes.Items {
// sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678
parts := strings.Split(node.Spec.ProviderID, "/")
if len(parts) < 2 {
log.Warn().Msgf("Found invalid providerID: %s", node.Spec.ProviderID)
if len(parts) != 5 {
log.Warn().Msgf("Invalid providerID format found for node %s: %s (expected format: aws:///region/instance-id)", node.Name, node.Spec.ProviderID)
continue
}

instanceId := parts[len(parts)-1]
if instanceIDRegex.MatchString(instanceId) {
ids = append(ids, parts[len(parts)-1])
} else {
log.Warn().Msgf("Invalid instance id format found for node %s: %s (expected format: ^i-.*)", node.Name, instanceId)
}
}

Expand Down
54 changes: 54 additions & 0 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,60 @@ func TestFetchKubernetesNodeInstanceIds(t *testing.T) {
h.Equals(t, instanceId2, instanceIds[1])
}

func TestFetchKubernetesNodeInstanceIdsEmptyResponse(t *testing.T) {
client := fake.NewSimpleClientset()

_, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
h.Ok(t, err)

node, err := newNode(config.Config{}, client)
h.Ok(t, err)

_, err = node.FetchKubernetesNodeInstanceIds()
h.Nok(t, err)
}

func TestFetchKubernetesNodeInstanceIdsInvalidProviderID(t *testing.T) {
client := fake.NewSimpleClientset(
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-1"},
Spec: v1.NodeSpec{ProviderID: "dummyProviderId"},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-2"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:/%s", instanceId2)},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-3"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("us-west-2a/%s", instanceId2)},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-4"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s/dummyPart", instanceId2)},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-2"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)},
},
&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-1"},
Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)},
},
)

_, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
h.Ok(t, err)

node, err := newNode(config.Config{}, client)
h.Ok(t, err)

instanceIds, err := node.FetchKubernetesNodeInstanceIds()
h.Ok(t, err)
h.Equals(t, 2, len(instanceIds))
h.Equals(t, instanceId1, instanceIds[0])
h.Equals(t, instanceId2, instanceIds[1])
}

func TestFilterOutDaemonSetPods(t *testing.T) {
tNode, err := newNode(config.Config{IgnoreDaemonSets: true}, fake.NewSimpleClientset())
h.Ok(t, err)
Expand Down
16 changes: 8 additions & 8 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Metrics struct {
}

// InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry.
func InitMetrics(nthConfig config.Config) (Metrics, error) {
func InitMetrics(enabled bool, port int) (Metrics, error) {
exporter, err := prometheus.New()
if err != nil {
return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err)
Expand All @@ -70,8 +70,7 @@ func InitMetrics(nthConfig config.Config) (Metrics, error) {
if err != nil {
return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err)
}
metrics.enabled = nthConfig.EnablePrometheus
metrics.nthConfig = nthConfig
metrics.enabled = enabled

// Starts an async process to collect golang runtime stats
// go.opentelemetry.io/contrib/instrumentation/runtime
Expand All @@ -80,14 +79,15 @@ func InitMetrics(nthConfig config.Config) (Metrics, error) {
return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err)
}

if metrics.enabled {
serveMetrics(nthConfig.PrometheusPort)
if enabled {
serveMetrics(port)
}

return metrics, nil
}

func (m Metrics) InitNodeMetrics(node *node.Node, ec2 ec2iface.EC2API) {
func (m Metrics) InitNodeMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) {
m.nthConfig = nthConfig
m.ec2Helper = ec2helper.New(ec2)
m.node = node

Expand All @@ -105,10 +105,10 @@ func (m Metrics) serveNodeMetrics() {
if err != nil || instanceIdsMap == nil {
log.Err(err).Msg("Failed to get AWS instance ids")
return
} else {
m.InstancesRecord(int64(len(instanceIdsMap)))
}

m.InstancesRecord(int64(len(instanceIdsMap)))

nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds()
if err != nil || nodeInstanceIds == nil {
log.Err(err).Msg("Failed to get node instance ids")
Expand Down
36 changes: 19 additions & 17 deletions test/e2e/prometheus-metrics-sqs-test
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ set +x

sleep 10

RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'"
MANAGED_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'"
MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=\"\"}]'"
UNMANAGED_INSTANCE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test}]'"
set -x
localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \
-o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \
| awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }')
echo "🥑 Using localstack pod $localstack_pod"
run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$RUN_INSTANCE_CMD")
instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId')
echo "🥑 Started mock EC2 instance ($instance_id)"

for instance_cmd in "$MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD" "$UNMANAGED_INSTANCE_CMD" "$MANAGED_INSTANCE_CMD"; do
run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$instance_cmd")
instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId')
echo "🥑 Started mock EC2 instance ($instance_id)"
done

CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}"
queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "$CREATE_SQS_CMD" | jq -r .QueueUrl)
Expand Down Expand Up @@ -168,6 +173,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do
if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then
kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD"
echo "✅ Verified the message was deleted from the queue after processing!"
break
fi

echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds"
Expand Down Expand Up @@ -211,7 +217,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do
sleep $TAINT_CHECK_SLEEP
done

if [[ -n $failed ]];then
if [[ -n $failed ]]; then
exit 4
fi

Expand All @@ -227,15 +233,11 @@ for action in cordon-and-drain post-drain; do
echo "✅ Fetched counter:$counter_value for metric with action:$action"
done

for gauge in nth_tagged_instances; do
query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}'
counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}')
if (($counter_value < 1)); then
echo "❌ Failed gauge count for metric:$gauge"
exit 5
fi
echo "✅ Fetched gauge:$counter_value for metric:$gauge"
done


exit 0
gauge="nth_tagged_instances"
query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}'
counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}')
if (($counter_value < 2)); then
echo "❌ Failed gauge count for metric:$gauge"
exit 5
fi
echo "✅ Fetched gauge:$counter_value for metric:$gauge"

0 comments on commit a95474e

Please sign in to comment.