Skip to content

Commit

Permalink
Merge pull request #1 from alasconnect/breakup-airflow-check
Browse files Browse the repository at this point in the history
Broke up airflow health check into import and health
  • Loading branch information
W. Duncan Fraser authored May 11, 2021
2 parents a5ee980 + 2b163e1 commit 20f49c1
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 113 deletions.
68 changes: 65 additions & 3 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,74 @@
builds:
# List of builds
- # First Build
- main: ./cmd/airflow-check/main.go
id: "airflow-check"
env:
- CGO_ENABLED=0
main: main.go
ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}'
# Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure
binary: bin/{{ .ProjectName }}
binary: bin/airflow-check
goos:
- darwin
- linux
- windows
goarch:
- amd64
- 386
- arm
- arm64
goarm:
- 5
- 6
- 7
targets:
- darwin_amd64
- linux_386
- linux_amd64
- linux_arm_5
- linux_arm_6
- linux_arm_7
- linux_arm64
- windows_386
- windows_amd64

- main: ./cmd/airflow-import-check/main.go
id: "airflow-import-check"
env:
- CGO_ENABLED=0
ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}'
# Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure
binary: bin/airflow-import-check
goos:
- darwin
- linux
- windows
goarch:
- amd64
- 386
- arm
- arm64
goarm:
- 5
- 6
- 7
targets:
- darwin_amd64
- linux_386
- linux_amd64
- linux_arm_5
- linux_arm_6
- linux_arm_7
- linux_arm64
- windows_386
- windows_amd64

- main: ./cmd/airflow-dag-check/main.go
id: "airflow-dag-check"
env:
- CGO_ENABLED=0
ldflags: '-s -w -X github.com/sensu-community/sensu-plugin-sdk/version.version={{.Version}} -X github.com/sensu-community/sensu-plugin-sdk/version.commit={{.Commit}} -X github.com/sensu-community/sensu-plugin-sdk/version.date={{.Date}}'
# Set the binary output location to bin/ so archive will comply with Sensu Go Asset structure
binary: bin/airflow-dag-check
goos:
- darwin
- linux
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Versioning](http://semver.org/spec/v2.0.0.html).

## Unreleased

## [0.0.1] - 2000-01-01
## [0.1.0] - 2021-05-11

### Added

- Initial release
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2021 Ampersand
Copyright 2021 AlasConnect LLC

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

Expand Down
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
![Go Test](https://github.com/alasconnect/sensu-airflow-check/workflows/Go%20Test/badge.svg)
![goreleaser](https://github.com/alasconnect/sensu-airflow-check/workflows/goreleaser/badge.svg)

# sensu-airflow-check
# sensu-airflow-check <!-- omit in toc -->

## Table of Contents
## Table of Contents <!-- omit in toc -->
- [Overview](#overview)
- [Files](#files)
- [Checks](#checks)
- [Usage examples](#usage-examples)
- [Configuration](#configuration)
- [Asset registration](#asset-registration)
- [Check definition](#check-definition)
- [airflow-check](#airflow-check)
- [airflow-import-check](#airflow-import-check)
- [airflow-dag-check](#airflow-dag-check)
- [Installation from source](#installation-from-source)
- [Additional notes](#additional-notes)
- [Contributing](#contributing)
Expand All @@ -24,12 +27,13 @@ The sensu-airflow-check is a [Sensu Check][6] that provides monitoring for airfl
This collection contains the following checks:

- airflow-check - for checking the health of the airflow metadatabase and scheduler.
- airflow-dag-check - for checking DAG runs for all or a specific list of DAGs
- airflow-import-check - for checking DAG import errors.
- airflow-dag-check - for checking DAG runs for all or a specific list of DAGs.

## Usage examples

```
airflow-check --url http://localhost:8080/ --username admin --password admin
airflow-check --url http://localhost:8080/
```

```
Expand Down Expand Up @@ -74,7 +78,24 @@ metadata:
name: airflow-check
namespace: default
spec:
command: airflow-check --url {url} --username {username} --password {password}
command: airflow-check --url {url}
subscriptions:
- system
runtime_assets:
- alasconnect/sensu-airflow-check
```
#### airflow-import-check
```yml
---
type: CheckConfig
api_version: core/v2
metadata:
name: airflow-import-check
namespace: default
spec:
command: airflow-import-check --url {url} --username {username} --password {password}
subscriptions:
- system
runtime_assets:
Expand Down Expand Up @@ -108,6 +129,7 @@ From the local path of the sensu-airflow-check repository:
```
go build -o bin/airflow-check ./cmd/airflow-check
go build -o bin/airflow-import-check ./cmd/airflow-import-check
go build -o bin/airflow-dag-check ./cmd/airflow-dag-check
```

Expand Down
114 changes: 17 additions & 97 deletions cmd/airflow-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@ import (
// Config represents the check plugin config.
type Config struct {
sensu.PluginConfig
AirflowApiUrl string
AirflowUsername string
AirflowPassword string
Timeout int
AirflowApiUrl string
Timeout int
}

var (
plugin = Config{
PluginConfig: sensu.PluginConfig{
Name: "sensu-airflow-check",
Short: "A plugin for checking the health of airflow 2.0 in Sensu.",
Name: "airflow-check",
Short: "Check the health endpoint of Airflow 2.x. Returns critical if not healthy.",
Keyspace: "sensu.io/plugins/airflow-check/config",
},
}
Expand All @@ -36,28 +34,10 @@ var (
Env: "",
Argument: "url",
Shorthand: "u",
Default: "https://127.0.0.1:8081/",
Default: "http://127.0.0.1:8080/",
Usage: "The base URL of the airflow REST API.",
Value: &plugin.AirflowApiUrl,
},
{
Path: "airflow-username",
Env: "",
Argument: "username",
Shorthand: "n",
Default: "",
Usage: "The username used to authenticate against the airflow API.",
Value: &plugin.AirflowUsername,
},
{
Path: "airflow-password",
Env: "",
Argument: "password",
Shorthand: "p",
Default: "",
Usage: "The password used to authenticate against the airflow API.",
Value: &plugin.AirflowPassword,
},
{
Path: "timeout",
Env: "",
Expand All @@ -81,14 +61,6 @@ func checkArgs(event *types.Event) (int, error) {
return sensu.CheckStateWarning, fmt.Errorf("failed to parse airflow URL %s: %v", plugin.AirflowApiUrl, err)
}

if plugin.AirflowUsername == "" {
return sensu.CheckStateWarning, fmt.Errorf("Airflow username is required")
}

if plugin.AirflowPassword == "" {
return sensu.CheckStateWarning, fmt.Errorf("Airflow password is required")
}

return sensu.CheckStateOK, nil
}

Expand All @@ -97,45 +69,31 @@ func executeCheck(event *types.Event) (int, error) {
client.Transport = http.DefaultTransport
client.Timeout = time.Duration(plugin.Timeout) * time.Second

criticals := 0

critical := false
var err error
var importErrors *ImportErrors
importErrors, err = getImportErrors(client)

if err != nil {
fmt.Printf("Error occurred while checking airflow import errors:\n%v\n", err)
criticals++
} else if importErrors.TotalEntries > 0 {
criticals++
for _, ie := range importErrors.ImportErrors {
fmt.Printf("Airflow encountered an error while importing DAG: %s\n%v\n", ie.FileName, ie.StackTrace)
}
}

var health *Health
health, err = getHealth(client)

if err != nil {
fmt.Printf("Error occurred while checking airflow health:\n%v\n", err)
criticals++
critical = true
} else {
if health.MetaDatabaseHealth.Status != "healthy" {
fmt.Printf("Airflow metadatabase is in trouble.")
criticals++
critical = true
}

if health.Scheduler.Status != "healthy" {
fmt.Printf("Airflow scheduler is in trouble.")
criticals++
critical = true
}
}

result := sensu.CheckStateOK
if criticals > 0 {
result = sensu.CheckStateCritical
if critical {
return sensu.CheckStateCritical, nil
}
return result, nil
return sensu.CheckStateOK, nil
}

type MetaDatabaseHealth struct {
Expand All @@ -152,14 +110,18 @@ type Health struct {
Scheduler SchedulerHealth `json:"scheduler"`
}

func getAirflowApiUrl() string {
// a trailing slash will cause errors
return strings.TrimSuffix(plugin.AirflowApiUrl, "/") + "/api/v1"
}

func getHealth(client *http.Client) (*Health, error) {
req, err := http.NewRequest("GET", getAirflowApiUrl()+"/health", nil)
if err != nil {
return nil, err
}

req.Header.Set("Accept", "application/json")
req.SetBasicAuth(plugin.AirflowUsername, plugin.AirflowPassword)

resp, err := client.Do(req)
if err != nil {
Expand All @@ -177,45 +139,3 @@ func getHealth(client *http.Client) (*Health, error) {

return &result, nil
}

type ImportError struct {
TimeStamp string `json:"timestamp"`
FileName string `json:"filename"`
StackTrace string `json:"stack_trace"`
}

type ImportErrors struct {
ImportErrors []ImportError `json:"import_errors"`
TotalEntries int `json:"total_entries"`
}

func getImportErrors(client *http.Client) (*ImportErrors, error) {
req, err := http.NewRequest("GET", getAirflowApiUrl()+"/importErrors", nil)
if err != nil {
return nil, err
}

req.Header.Set("Accept", "application/json")
req.SetBasicAuth(plugin.AirflowUsername, plugin.AirflowPassword)

resp, err := client.Do(req)
if err != nil {
return nil, err
} else if resp.StatusCode != 200 {
return nil, fmt.Errorf("import errors request returned an invalid status code: %s", resp.Status)
}

defer resp.Body.Close()

var result ImportErrors
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode health response: %v", err)
}

return &result, nil
}

func getAirflowApiUrl() string {
// a trailing slash will cause errors
return strings.TrimSuffix(plugin.AirflowApiUrl, "/") + "/api/v1"
}
10 changes: 5 additions & 5 deletions cmd/airflow-dag-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type Config struct {
var (
plugin = Config{
PluginConfig: sensu.PluginConfig{
Name: "sensu-airflow-check",
Short: "A plugin for checking the health of airflow 2.0 DAG runs in Sensu.",
Name: "airflow-dag-check",
Short: "Checks the health of Airflow 2.x DAG runs in Sensu.",
Keyspace: "sensu.io/plugins/airflow-dag-check/config",
},
}
Expand All @@ -37,7 +37,7 @@ var (
Env: "",
Argument: "url",
Shorthand: "u",
Default: "https://127.0.0.1:8081/",
Default: "http://127.0.0.1:8080/",
Usage: "The base URL of the airflow REST API.",
Value: &plugin.AirflowApiUrl,
},
Expand Down Expand Up @@ -144,9 +144,9 @@ func executeCheck(event *types.Event) (int, error) {
case sensu.CheckStateCritical:
criticals++
fmt.Printf("%s CRITICAL\n", h.DagId)
case sensu.CheckStateUnknown:
default:
unknowns++
fmt.Printf("%s UNKNOWN\n", h.DagId)
fmt.Printf("%s Unknown error code returned\n", h.DagId)
}

if h.Error != nil {
Expand Down
Loading

0 comments on commit 20f49c1

Please sign in to comment.