From 2b163e122578801674b7c234e9876691edb1db02 Mon Sep 17 00:00:00 2001 From: "W. Duncan Fraser" Date: Tue, 11 May 2021 14:04:35 -0500 Subject: [PATCH] Broke up airflow health check into import and health Prepped changelog for initial release Wired up goreleaser for all 3 binaries --- .goreleaser.yml | 68 ++++++++++- CHANGELOG.md | 3 +- LICENSE | 2 +- README.md | 34 +++++- cmd/airflow-check/main.go | 114 +++--------------- cmd/airflow-dag-check/main.go | 10 +- cmd/airflow-import-check/main.go | 162 ++++++++++++++++++++++++++ cmd/airflow-import-check/main_test.go | 8 ++ 8 files changed, 288 insertions(+), 113 deletions(-) create mode 100644 cmd/airflow-import-check/main.go create mode 100644 cmd/airflow-import-check/main_test.go diff --git a/.goreleaser.yml b/.goreleaser.yml index b508573..e722fa3 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -1,12 +1,74 @@ builds: # List of builds - - # First Build + - main: ./cmd/airflow-check/main.go + id: "airflow-check" env: - CGO_ENABLED=0 - main: main.go ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}' # Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure - binary: bin/{{ .ProjectName }} + binary: bin/airflow-check + goos: + - darwin + - linux + - windows + goarch: + - amd64 + - 386 + - arm + - arm64 + goarm: + - 5 + - 6 + - 7 + targets: + - darwin_amd64 + - linux_386 + - linux_amd64 + - linux_arm_5 + - linux_arm_6 + - linux_arm_7 + - linux_arm64 + - windows_386 + - windows_amd64 + + - main: ./cmd/airflow-import-check/main.go + id: "airflow-import-check" + env: + - CGO_ENABLED=0 + ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}' + # Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure + binary: bin/airflow-import-check + goos: + - darwin + - linux + - windows + goarch: + - amd64 + - 386 + - arm + - arm64 + goarm: + - 5 + - 6 + - 7 + targets: + - darwin_amd64 + - linux_386 + - linux_amd64 + - linux_arm_5 + - linux_arm_6 + - linux_arm_7 + - linux_arm64 + - windows_386 + - windows_amd64 + + - main: ./cmd/airflow-dag-check/main.go + id: "airflow-dag-check" + env: + - CGO_ENABLED=0 + ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}' + # Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure + binary: bin/airflow-dag-check goos: - darwin - linux diff --git a/CHANGELOG.md b/CHANGELOG.md index 103bb40..cb76c18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,8 @@ Versioning](http://semver.org/spec/v2.0.0.html). ## Unreleased -## [0.0.1] - 2000-01-01 +## [0.1.0] - 2021-05-11 ### Added + - Initial release diff --git a/LICENSE b/LICENSE index 70f931d..fd361ba 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2021 Ampersand +Copyright 2021 AlasConnect LLC Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.md b/README.md index d5e35bf..84c98fd 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,18 @@ ![Go Test](https://github.com/alasconnect/sensu-airflow-check/workflows/Go%20Test/badge.svg) ![goreleaser](https://github.com/alasconnect/sensu-airflow-check/workflows/goreleaser/badge.svg) -# sensu-airflow-check +# sensu-airflow-check -## Table of Contents +## Table of Contents - [Overview](#overview) -- [Files](#files) +- [Checks](#checks) - [Usage examples](#usage-examples) - [Configuration](#configuration) - [Asset registration](#asset-registration) - [Check definition](#check-definition) + - [airflow-check](#airflow-check) + - [airflow-import-check](#airflow-import-check) + - [airflow-dag-check](#airflow-dag-check) - [Installation from source](#installation-from-source) - [Additional notes](#additional-notes) - [Contributing](#contributing) @@ -24,12 +27,13 @@ The sensu-airflow-check is a [Sensu Check][6] that provides monitoring for airfl This collection contains the following checks: - airflow-check - for checking the health of the airflow metadatabase and scheduler. - - airflow-dag-check - for checking DAG runs for all or a specific list of DAGs + - airflow-import-check - for checking DAG import errors. + - airflow-dag-check - for checking DAG runs for all or a specific list of DAGs. ## Usage examples ``` -airflow-check --url http://localhost:8080/ --username admin --password admin +airflow-check --url http://localhost:8080/ ``` ``` @@ -74,7 +78,24 @@ metadata: name: airflow-check namespace: default spec: - command: airflow-check --url {url} --username {username} --password {password} + command: airflow-check --url {url} + subscriptions: + - system + runtime_assets: + - alasconnect/sensu-airflow-check +``` + +#### airflow-import-check + +```yml +--- +type: CheckConfig +api_version: core/v2 +metadata: + name: airflow-import-check + namespace: default +spec: + command: airflow-import-check --url {url} --username {username} --password {password} subscriptions: - system runtime_assets: @@ -108,6 +129,7 @@ From the local path of the sensu-airflow-check repository: ``` go build -o bin/airflow-check ./cmd/airflow-check +go build -o bin/airflow-import-check ./cmd/airflow-import-check go build -o bin/airflow-dag-check ./cmd/airflow-dag-check ``` diff --git a/cmd/airflow-check/main.go b/cmd/airflow-check/main.go index 8c4b73e..0cd3c57 100644 --- a/cmd/airflow-check/main.go +++ b/cmd/airflow-check/main.go @@ -15,17 +15,15 @@ import ( // Config represents the check plugin config. type Config struct { sensu.PluginConfig - AirflowApiUrl string - AirflowUsername string - AirflowPassword string - Timeout int + AirflowApiUrl string + Timeout int } var ( plugin = Config{ PluginConfig: sensu.PluginConfig{ - Name: "sensu-airflow-check", - Short: "A plugin for checking the health of airflow 2.0 in Sensu.", + Name: "airflow-check", + Short: "Check the health endpoint of Airflow 2.x. Returns critical if not healthy.", Keyspace: "sensu.io/plugins/airflow-check/config", }, } @@ -36,28 +34,10 @@ var ( Env: "", Argument: "url", Shorthand: "u", - Default: "https://127.0.0.1:8081/", + Default: "http://127.0.0.1:8080/", Usage: "The base URL of the airflow REST API.", Value: &plugin.AirflowApiUrl, }, - { - Path: "airflow-username", - Env: "", - Argument: "username", - Shorthand: "n", - Default: "", - Usage: "The username used to authenticate against the airflow API.", - Value: &plugin.AirflowUsername, - }, - { - Path: "airflow-password", - Env: "", - Argument: "password", - Shorthand: "p", - Default: "", - Usage: "The password used to authenticate against the airflow API.", - Value: &plugin.AirflowPassword, - }, { Path: "timeout", Env: "", @@ -81,14 +61,6 @@ func checkArgs(event *types.Event) (int, error) { return sensu.CheckStateWarning, fmt.Errorf("failed to parse airflow URL %s: %v", plugin.AirflowApiUrl, err) } - if plugin.AirflowUsername == "" { - return sensu.CheckStateWarning, fmt.Errorf("Airflow username is required") - } - - if plugin.AirflowPassword == "" { - return sensu.CheckStateWarning, fmt.Errorf("Airflow password is required") - } - return sensu.CheckStateOK, nil } @@ -97,45 +69,31 @@ func executeCheck(event *types.Event) (int, error) { client.Transport = http.DefaultTransport client.Timeout = time.Duration(plugin.Timeout) * time.Second - criticals := 0 - + critical := false var err error - var importErrors *ImportErrors - importErrors, err = getImportErrors(client) - - if err != nil { - fmt.Printf("Error occurred while checking airflow import errors:\n%v\n", err) - criticals++ - } else if importErrors.TotalEntries > 0 { - criticals++ - for _, ie := range importErrors.ImportErrors { - fmt.Printf("Airflow encountered an error while importing DAG: %s\n%v\n", ie.FileName, ie.StackTrace) - } - } var health *Health health, err = getHealth(client) if err != nil { fmt.Printf("Error occurred while checking airflow health:\n%v\n", err) - criticals++ + critical = true } else { if health.MetaDatabaseHealth.Status != "healthy" { fmt.Printf("Airflow metadatabase is in trouble.") - criticals++ + critical = true } if health.Scheduler.Status != "healthy" { fmt.Printf("Airflow scheduler is in trouble.") - criticals++ + critical = true } } - result := sensu.CheckStateOK - if criticals > 0 { - result = sensu.CheckStateCritical + if critical { + return sensu.CheckStateCritical, nil } - return result, nil + return sensu.CheckStateOK, nil } type MetaDatabaseHealth struct { @@ -152,6 +110,11 @@ type Health struct { Scheduler SchedulerHealth `json:"scheduler"` } +func getAirflowApiUrl() string { + // a trailing slash will cause errors + return strings.TrimSuffix(plugin.AirflowApiUrl, "/") + "/api/v1" +} + func getHealth(client *http.Client) (*Health, error) { req, err := http.NewRequest("GET", getAirflowApiUrl()+"/health", nil) if err != nil { @@ -159,7 +122,6 @@ func getHealth(client *http.Client) (*Health, error) { } req.Header.Set("Accept", "application/json") - req.SetBasicAuth(plugin.AirflowUsername, plugin.AirflowPassword) resp, err := client.Do(req) if err != nil { @@ -177,45 +139,3 @@ func getHealth(client *http.Client) (*Health, error) { return &result, nil } - -type ImportError struct { - TimeStamp string `json:"timestamp"` - FileName string `json:"filename"` - StackTrace string `json:"stack_trace"` -} - -type ImportErrors struct { - ImportErrors []ImportError `json:"import_errors"` - TotalEntries int `json:"total_entries"` -} - -func getImportErrors(client *http.Client) (*ImportErrors, error) { - req, err := http.NewRequest("GET", getAirflowApiUrl()+"/importErrors", nil) - if err != nil { - return nil, err - } - - req.Header.Set("Accept", "application/json") - req.SetBasicAuth(plugin.AirflowUsername, plugin.AirflowPassword) - - resp, err := client.Do(req) - if err != nil { - return nil, err - } else if resp.StatusCode != 200 { - return nil, fmt.Errorf("import errors request returned an invalid status code: %s", resp.Status) - } - - defer resp.Body.Close() - - var result ImportErrors - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to decode health response: %v", err) - } - - return &result, nil -} - -func getAirflowApiUrl() string { - // a trailing slash will cause errors - return strings.TrimSuffix(plugin.AirflowApiUrl, "/") + "/api/v1" -} diff --git a/cmd/airflow-dag-check/main.go b/cmd/airflow-dag-check/main.go index 0a66ff7..2227e69 100644 --- a/cmd/airflow-dag-check/main.go +++ b/cmd/airflow-dag-check/main.go @@ -25,8 +25,8 @@ type Config struct { var ( plugin = Config{ PluginConfig: sensu.PluginConfig{ - Name: "sensu-airflow-check", - Short: "A plugin for checking the health of airflow 2.0 DAG runs in Sensu.", + Name: "airflow-dag-check", + Short: "Checks the health of Airflow 2.x DAG runs in Sensu.", Keyspace: "sensu.io/plugins/airflow-dag-check/config", }, } @@ -37,7 +37,7 @@ var ( Env: "", Argument: "url", Shorthand: "u", - Default: "https://127.0.0.1:8081/", + Default: "http://127.0.0.1:8080/", Usage: "The base URL of the airflow REST API.", Value: &plugin.AirflowApiUrl, }, @@ -144,9 +144,9 @@ func executeCheck(event *types.Event) (int, error) { case sensu.CheckStateCritical: criticals++ fmt.Printf("%s CRITICAL\n", h.DagId) - case sensu.CheckStateUnknown: + default: unknowns++ - fmt.Printf("%s UNKNOWN\n", h.DagId) + fmt.Printf("%s Unknown error code returned\n", h.DagId) } if h.Error != nil { diff --git a/cmd/airflow-import-check/main.go b/cmd/airflow-import-check/main.go new file mode 100644 index 0000000..ad06b54 --- /dev/null +++ b/cmd/airflow-import-check/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/sensu-community/sensu-plugin-sdk/sensu" + "github.com/sensu/sensu-go/types" +) + +// Config represents the check plugin config. +type Config struct { + sensu.PluginConfig + AirflowApiUrl string + AirflowUsername string + AirflowPassword string + Timeout int +} + +var ( + plugin = Config{ + PluginConfig: sensu.PluginConfig{ + Name: "airflow-import-check", + Short: "Check the Airflow 2.x DAG import errors endpoint. Returns critical if there are errors.", + Keyspace: "sensu.io/plugins/airflow-import-check/config", + }, + } + + options = []*sensu.PluginConfigOption{ + { + Path: "airflow-api-url", + Env: "", + Argument: "url", + Shorthand: "u", + Default: "http://127.0.0.1:8080/", + Usage: "The base URL of the airflow REST API.", + Value: &plugin.AirflowApiUrl, + }, + { + Path: "airflow-username", + Env: "", + Argument: "username", + Shorthand: "n", + Default: "", + Usage: "The username used to authenticate against the airflow API.", + Value: &plugin.AirflowUsername, + }, + { + Path: "airflow-password", + Env: "", + Argument: "password", + Shorthand: "p", + Default: "", + Usage: "The password used to authenticate against the airflow API.", + Value: &plugin.AirflowPassword, + }, + { + Path: "timeout", + Env: "", + Argument: "timeout", + Shorthand: "t", + Default: 15, + Usage: "Request timeout in seconds", + Value: &plugin.Timeout, + }, + } +) + +func main() { + check := sensu.NewGoCheck(&plugin.PluginConfig, options, checkArgs, executeCheck, false) + check.Execute() +} + +func checkArgs(event *types.Event) (int, error) { + _, err := url.Parse(plugin.AirflowApiUrl) + if err != nil { + return sensu.CheckStateWarning, fmt.Errorf("failed to parse airflow URL %s: %v", plugin.AirflowApiUrl, err) + } + + if plugin.AirflowUsername == "" { + return sensu.CheckStateWarning, fmt.Errorf("airflow username is required") + } + + if plugin.AirflowPassword == "" { + return sensu.CheckStateWarning, fmt.Errorf("airflow password is required") + } + + return sensu.CheckStateOK, nil +} + +func executeCheck(event *types.Event) (int, error) { + client := http.DefaultClient + client.Transport = http.DefaultTransport + client.Timeout = time.Duration(plugin.Timeout) * time.Second + + critical := false + var err error + + var importErrors *ImportErrors + importErrors, err = getImportErrors(client) + + if err != nil { + fmt.Printf("Error occurred while checking airflow import errors:\n%v\n", err) + critical = true + } else if importErrors.TotalEntries > 0 { + critical = true + for _, ie := range importErrors.ImportErrors { + fmt.Printf("Airflow encountered an error while importing DAG: %s\n%v\n", ie.FileName, ie.StackTrace) + } + } + + if critical { + return sensu.CheckStateCritical, nil + } + return sensu.CheckStateOK, nil +} + +type ImportError struct { + TimeStamp string `json:"timestamp"` + FileName string `json:"filename"` + StackTrace string `json:"stack_trace"` +} + +type ImportErrors struct { + ImportErrors []ImportError `json:"import_errors"` + TotalEntries int `json:"total_entries"` +} + +func getAirflowApiUrl() string { + // a trailing slash will cause errors + return strings.TrimSuffix(plugin.AirflowApiUrl, "/") + "/api/v1" +} + +func getImportErrors(client *http.Client) (*ImportErrors, error) { + req, err := http.NewRequest("GET", getAirflowApiUrl()+"/importErrors", nil) + if err != nil { + return nil, err + } + + req.Header.Set("Accept", "application/json") + req.SetBasicAuth(plugin.AirflowUsername, plugin.AirflowPassword) + + resp, err := client.Do(req) + if err != nil { + return nil, err + } else if resp.StatusCode != 200 { + return nil, fmt.Errorf("import errors request returned an invalid status code: %s", resp.Status) + } + + defer resp.Body.Close() + + var result ImportErrors + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode health response: %v", err) + } + + return &result, nil +} diff --git a/cmd/airflow-import-check/main_test.go b/cmd/airflow-import-check/main_test.go new file mode 100644 index 0000000..330f64d --- /dev/null +++ b/cmd/airflow-import-check/main_test.go @@ -0,0 +1,8 @@ +package main + +import ( + "testing" +) + +func TestMain(t *testing.T) { +}