-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add health checks #259
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,24 +10,37 @@ | |
package main | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"log/slog" | ||
"os" | ||
"os/signal" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/GeoNet/kit/aws/s3" | ||
"github.com/GeoNet/kit/aws/sqs" | ||
"github.com/GeoNet/kit/cfg" | ||
"github.com/GeoNet/kit/health" | ||
"github.com/GeoNet/kit/metrics" | ||
) | ||
|
||
const ( | ||
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time | ||
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting | ||
healthCheckTimeout = 30 * time.Second //health check timeout | ||
healthCheckService = ":7777" //end point to listen to for SOH checks | ||
healthCheckPath = "/soh" | ||
) | ||
|
||
var ( | ||
db *sql.DB | ||
queueURL = os.Getenv("SQS_QUEUE_URL") | ||
queueURL string | ||
sqsClient sqs.SQS | ||
s3Client *s3.S3 | ||
saveHoldings *sql.Stmt | ||
|
@@ -37,7 +50,38 @@ type event struct { | |
s3.Event | ||
} | ||
|
||
// init and check aws variables | ||
func initAwsClient() { | ||
queueURL = os.Getenv("SQS_QUEUE_URL") | ||
if queueURL == "" { | ||
log.Fatal("SQS_QUEUE_URL is not set") | ||
} | ||
|
||
var err error | ||
sqsClient, err = sqs.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("error creating SQS client: %s", err) | ||
} | ||
if err = sqsClient.CheckQueue(queueURL); err != nil { | ||
log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) | ||
} | ||
|
||
s3c, err := s3.NewWithMaxRetries(3) | ||
if err != nil { | ||
log.Fatalf("error creating S3 client: %s", err) | ||
} | ||
s3Client = &s3c | ||
|
||
} | ||
|
||
func main() { | ||
//check health | ||
if health.RunningHealthCheck() { | ||
healthCheck() | ||
} | ||
|
||
//run as normal service | ||
initAwsClient() | ||
p, err := cfg.PostgresEnv() | ||
if err != nil { | ||
log.Fatalf("error reading DB config from the environment vars: %s", err) | ||
|
@@ -80,38 +124,44 @@ func main() { | |
db.SetMaxIdleConns(p.MaxIdle) | ||
db.SetMaxOpenConns(p.MaxOpen) | ||
|
||
// provide a soh heartbeat | ||
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup) | ||
|
||
ping: | ||
for { | ||
err = db.Ping() | ||
if err != nil { | ||
log.Println("problem pinging DB sleeping and retrying") | ||
health.Ok() //send heartbeat | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary. If the service stuck during initial DB connection then it's unhealthy |
||
time.Sleep(time.Second * 30) | ||
continue ping | ||
} | ||
break ping | ||
} | ||
|
||
sqsClient, err = sqs.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("error creating SQS client: %s", err) | ||
} | ||
|
||
s3c, err := s3.NewWithMaxRetries(3) | ||
if err != nil { | ||
log.Fatalf("error creating S3 client: %s", err) | ||
} | ||
s3Client = &s3c | ||
|
||
log.Println("listening for messages") | ||
|
||
var r sqs.Raw | ||
var e event | ||
|
||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) | ||
defer stop() | ||
|
||
loop1: | ||
for { | ||
r, err = sqsClient.Receive(queueURL, 600) | ||
health.Ok() // update soh | ||
r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) | ||
if err != nil { | ||
log.Printf("problem receiving message, backing off: %s", err) | ||
time.Sleep(time.Second * 20) | ||
switch { | ||
case sqs.IsNoMessagesError(err): | ||
continue | ||
case sqs.Cancelled(err): //stoped | ||
log.Println("##1 system stop... ") | ||
break loop1 | ||
default: | ||
slog.Warn("problem receiving message, backing off", "err", err) | ||
time.Sleep(time.Second * 20) | ||
} | ||
continue | ||
} | ||
|
||
|
@@ -120,14 +170,28 @@ ping: | |
log.Printf("problem processing message, skipping deletion for redelivery: %s", err) | ||
continue | ||
} | ||
|
||
err = sqsClient.Delete(queueURL, r.ReceiptHandle) | ||
if err != nil { | ||
log.Printf("problem deleting message, continuing: %s", err) | ||
} | ||
} | ||
} | ||
|
||
// check health by calling the http soh endpoint | ||
// cmd: ./fdsn-holdings-consumer -check | ||
func healthCheck() { | ||
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) | ||
defer cancel() | ||
|
||
msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout) | ||
if err != nil { | ||
log.Printf("status: %v", err) | ||
os.Exit(1) | ||
} | ||
log.Printf("status: %s", string(msg)) | ||
os.Exit(0) | ||
} | ||
|
||
// Process implements msg.Processor for event. | ||
func (e *event) Process(msg []byte) error { | ||
err := json.Unmarshal(msg, e) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,20 +5,33 @@ package main | |
|
||
import ( | ||
"bytes" | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"log/slog" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/GeoNet/kit/aws/s3" | ||
"github.com/GeoNet/kit/aws/sqs" | ||
"github.com/GeoNet/kit/cfg" | ||
"github.com/GeoNet/kit/health" | ||
"github.com/GeoNet/kit/metrics" | ||
) | ||
|
||
const ( | ||
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time | ||
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting | ||
healthCheckTimeout = 30 * time.Second //health check timeout | ||
healthCheckService = ":7777" //end point to listen to for SOH checks | ||
healthCheckPath = "/soh" | ||
) | ||
|
||
var ( | ||
queueURL = os.Getenv("SQS_QUEUE_URL") | ||
s3Client s3.S3 | ||
|
@@ -30,7 +43,37 @@ type notification struct { | |
s3.Event | ||
} | ||
|
||
// init and check aws clients | ||
func initAwsClients() { | ||
queueURL = os.Getenv("SQS_QUEUE_URL") | ||
if queueURL == "" { | ||
log.Fatal("SQS_QUEUE_URL is not set") | ||
} | ||
|
||
var err error | ||
s3Client, err = s3.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("creating S3 client: %s", err) | ||
} | ||
|
||
sqsClient, err = sqs.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("creating SQS client: %s", err) | ||
} | ||
if err = sqsClient.CheckQueue(queueURL); err != nil { | ||
log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) | ||
} | ||
} | ||
|
||
func main() { | ||
//check health | ||
if health.RunningHealthCheck() { | ||
healthCheck() | ||
} | ||
|
||
//run as normal service | ||
initAwsClients() | ||
|
||
p, err := cfg.PostgresEnv() | ||
if err != nil { | ||
log.Fatalf("error reading DB config from the environment vars: %s", err) | ||
|
@@ -45,37 +88,46 @@ func main() { | |
db.SetMaxIdleConns(p.MaxIdle) | ||
db.SetMaxOpenConns(p.MaxOpen) | ||
|
||
// provide a soh heartbeat | ||
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup) | ||
|
||
ping: | ||
for { | ||
err = db.Ping() | ||
if err != nil { | ||
log.Println("problem pinging DB sleeping and retrying") | ||
health.Ok() //send heartbeat | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as fdsn-holdings-consumer |
||
|
||
time.Sleep(time.Second * 30) | ||
continue ping | ||
} | ||
break ping | ||
} | ||
|
||
s3Client, err = s3.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("creating S3 client: %s", err) | ||
} | ||
|
||
sqsClient, err = sqs.NewWithMaxRetries(100) | ||
if err != nil { | ||
log.Fatalf("creating SQS client: %s", err) | ||
} | ||
|
||
log.Println("listening for messages") | ||
|
||
var r sqs.Raw | ||
var n notification | ||
|
||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) | ||
defer stop() | ||
|
||
loop1: | ||
for { | ||
r, err = sqsClient.Receive(queueURL, 600) | ||
health.Ok() // update soh | ||
|
||
r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) | ||
if err != nil { | ||
log.Printf("problem receiving message, backing off: %s", err) | ||
time.Sleep(time.Second * 20) | ||
switch { | ||
case sqs.IsNoMessagesError(err): | ||
continue | ||
case sqs.Cancelled(err): //stoped | ||
log.Println("##1 system stop... ") | ||
break loop1 | ||
default: | ||
slog.Warn("problem receiving message, backing off", "err", err) | ||
time.Sleep(time.Second * 20) | ||
} | ||
continue | ||
} | ||
|
||
|
@@ -92,6 +144,21 @@ ping: | |
} | ||
} | ||
|
||
// check health by calling the http soh endpoint | ||
// cmd: ./fdsn-quake-consumer -check | ||
func healthCheck() { | ||
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) | ||
defer cancel() | ||
|
||
msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout) | ||
if err != nil { | ||
log.Printf("status: %v", err) | ||
os.Exit(1) | ||
} | ||
log.Printf("status: %s", string(msg)) | ||
os.Exit(0) | ||
} | ||
|
||
// Process implements msg.Processor for event. | ||
func (n *notification) Process(msg []byte) error { | ||
err := json.Unmarshal(msg, n) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"log" | ||
"net/http" | ||
|
@@ -10,6 +11,7 @@ import ( | |
"time" | ||
|
||
"github.com/GeoNet/kit/cfg" | ||
"github.com/GeoNet/kit/health" | ||
"github.com/gorilla/schema" | ||
_ "github.com/lib/pq" | ||
) | ||
|
@@ -36,6 +38,12 @@ func newDecoder() *schema.Decoder { | |
} | ||
|
||
func main() { | ||
//check health if flagged in cmd | ||
if health.RunningHealthCheck() { | ||
healthCheck() | ||
} | ||
|
||
//run as normal service | ||
var err error | ||
if S3_BUCKET = os.Getenv("S3_BUCKET"); S3_BUCKET == "" { | ||
log.Fatal("ERROR: S3_BUCKET environment variable is not set") | ||
|
@@ -84,3 +92,19 @@ func main() { | |
} | ||
log.Fatal(server.ListenAndServe()) | ||
} | ||
|
||
// check health by calling the http soh endpoint | ||
// cmd: ./fdsn-ws -check | ||
func healthCheck() { | ||
timeout := 30 * time.Second | ||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||
defer cancel() | ||
|
||
msg, err := health.Check(ctx, ":8080/soh", timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we define ":8080" as a const, then both here and L88 uses that const? |
||
if err != nil { | ||
log.Printf("status: %v", err) | ||
os.Exit(1) | ||
} | ||
log.Printf("status: %s", string(msg)) | ||
os.Exit(0) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the pointer at L45, and getting rid of the
s3c
here? Thanks