From d39c4e4b605d61cad294915a515a205c44f18028 Mon Sep 17 00:00:00 2001 From: bpeng Date: Fri, 22 Nov 2024 11:36:31 +1300 Subject: [PATCH 1/4] update to add health checks --- cmd/fdsn-holdings-consumer/main.go | 68 +++++++- cmd/fdsn-quake-consumer/main.go | 69 +++++++- cmd/fdsn-ws/server.go | 24 +++ go.mod | 2 +- go.sum | 4 +- vendor/github.com/GeoNet/kit/aws/s3/s3.go | 2 +- vendor/github.com/GeoNet/kit/aws/sqs/sqs.go | 95 ++++++++--- vendor/github.com/GeoNet/kit/health/check.go | 80 +++++++++ .../github.com/GeoNet/kit/health/service.go | 159 ++++++++++++++++++ vendor/github.com/GeoNet/kit/seis/ms/doc.go | 1 - .../github.com/GeoNet/kit/seis/ms/header.go | 1 + .../github.com/GeoNet/kit/seis/ms/record.go | 2 +- vendor/github.com/GeoNet/kit/seis/ms/steim.go | 4 +- .../github.com/GeoNet/kit/slogger/logger.go | 82 +++++++++ vendor/github.com/GeoNet/kit/weft/weft.go | 6 +- vendor/modules.txt | 4 +- 16 files changed, 564 insertions(+), 39 deletions(-) create mode 100644 vendor/github.com/GeoNet/kit/health/check.go create mode 100644 vendor/github.com/GeoNet/kit/health/service.go create mode 100644 vendor/github.com/GeoNet/kit/slogger/logger.go diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index 20a2005d..fc36d5dc 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -10,19 +10,33 @@ package main import ( + "context" "database/sql" "encoding/json" "errors" "fmt" "log" + "log/slog" "os" + "os/signal" "strings" + "syscall" "time" "github.com/GeoNet/kit/aws/s3" "github.com/GeoNet/kit/aws/sqs" "github.com/GeoNet/kit/cfg" + "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" + "github.com/GeoNet/kit/slogger" +) + +const ( + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting + healthCheckTimeout = 30 * time.Second //health check timeout + healthCheckService = ":7777" //end point to listen to for SOH checks + healthCheckPath = "/soh" ) var ( @@ -31,6 +45,8 @@ var ( sqsClient sqs.SQS s3Client *s3.S3 saveHoldings *sql.Stmt + + sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type event struct { @@ -38,6 +54,12 @@ type event struct { } func main() { + //check health + if health.RunningHealthCheck() { + healthCheck() + } + + //run as normal service p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -80,11 +102,15 @@ func main() { db.SetMaxIdleConns(p.MaxIdle) db.SetMaxOpenConns(p.MaxOpen) + // provide a soh heartbeat + health := health.New(healthCheckService, healthCheckAged, healthCheckStartup) + ping: for { err = db.Ping() if err != nil { log.Println("problem pinging DB sleeping and retrying") + health.Ok() //send heartbeat time.Sleep(time.Second * 30) continue ping } @@ -107,17 +133,36 @@ ping: var r sqs.Raw var e event + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + +loop1: for { - r, err = sqsClient.Receive(queueURL, 600) + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { - log.Printf("problem receiving message, backing off: %s", err) - time.Sleep(time.Second * 20) + switch { + case sqs.Cancelled(err): //stoped + log.Println("##1 system stop... ") + break loop1 + case sqs.IsNoMessagesError(err): + n := sLogger.Log(err) + if n%100 == 0 { //don't log all repeated error messages + log.Printf("no message received for %d times ", n) + } + default: + slog.Warn("problem receiving message, backing off", "err", err) + time.Sleep(time.Second * 20) + } + // update soh + health.Ok() continue } err = metrics.DoProcess(&e, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) + // update soh + health.Ok() continue } @@ -125,7 +170,24 @@ ping: if err != nil { log.Printf("problem deleting message, continuing: %s", err) } + // update soh + health.Ok() + } +} + +// check health by calling the http soh endpoint +// cmd: ./fdsn-holdings-consumer -check +func healthCheck() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout) + if err != nil { + log.Printf("status: %v", err) + os.Exit(1) } + log.Printf("status: %s", string(msg)) + os.Exit(0) } // Process implements msg.Processor for event. diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index be3c19ad..9f1ce3ae 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -5,18 +5,32 @@ package main import ( "bytes" + "context" "database/sql" "encoding/json" "errors" "fmt" "log" + "log/slog" "os" + "os/signal" + "syscall" "time" "github.com/GeoNet/kit/aws/s3" "github.com/GeoNet/kit/aws/sqs" "github.com/GeoNet/kit/cfg" + "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" + "github.com/GeoNet/kit/slogger" +) + +const ( + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting + healthCheckTimeout = 30 * time.Second //health check timeout + healthCheckService = ":7777" //end point to listen to for SOH checks + healthCheckPath = "/soh" ) var ( @@ -24,6 +38,8 @@ var ( s3Client s3.S3 sqsClient sqs.SQS db *sql.DB + + sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type notification struct { @@ -31,6 +47,12 @@ type notification struct { } func main() { + //check health + if health.RunningHealthCheck() { + healthCheck() + } + + //run as normal service p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -45,11 +67,16 @@ func main() { db.SetMaxIdleConns(p.MaxIdle) db.SetMaxOpenConns(p.MaxOpen) + // provide a soh heartbeat + health := health.New(healthCheckService, healthCheckAged, healthCheckStartup) + ping: for { err = db.Ping() if err != nil { log.Println("problem pinging DB sleeping and retrying") + health.Ok() //send heartbeat + time.Sleep(time.Second * 30) continue ping } @@ -71,17 +98,36 @@ ping: var r sqs.Raw var n notification + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + +loop1: for { - r, err = sqsClient.Receive(queueURL, 600) + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { - log.Printf("problem receiving message, backing off: %s", err) - time.Sleep(time.Second * 20) + switch { + case sqs.Cancelled(err): //stoped + log.Println("##1 system stop... ") + break loop1 + case sqs.IsNoMessagesError(err): + n := sLogger.Log(err) + if n%100 == 0 { //don't log all repeated error messages + log.Printf("no message received for %d times ", n) + } + default: + slog.Warn("problem receiving message, backing off", "err", err) + time.Sleep(time.Second * 20) + } + // update soh + health.Ok() continue } err = metrics.DoProcess(&n, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) + // update soh + health.Ok() continue } @@ -89,7 +135,24 @@ ping: if err != nil { log.Printf("problem deleting message, continuing: %s", err) } + // update soh + health.Ok() + } +} + +// check health by calling the http soh endpoint +// cmd: ./fdsn-quake-consumer -check +func healthCheck() { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + defer cancel() + + msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout) + if err != nil { + log.Printf("status: %v", err) + os.Exit(1) } + log.Printf("status: %s", string(msg)) + os.Exit(0) } // Process implements msg.Processor for event. diff --git a/cmd/fdsn-ws/server.go b/cmd/fdsn-ws/server.go index f3311e36..a05ac9c0 100644 --- a/cmd/fdsn-ws/server.go +++ b/cmd/fdsn-ws/server.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "log" "net/http" @@ -10,6 +11,7 @@ import ( "time" "github.com/GeoNet/kit/cfg" + "github.com/GeoNet/kit/health" "github.com/gorilla/schema" _ "github.com/lib/pq" ) @@ -36,6 +38,12 @@ func newDecoder() *schema.Decoder { } func main() { + //check health if flagged in cmd + if health.RunningHealthCheck() { + healthCheck() + } + + //run as normal service var err error if S3_BUCKET = os.Getenv("S3_BUCKET"); S3_BUCKET == "" { log.Fatal("ERROR: S3_BUCKET environment variable is not set") @@ -84,3 +92,19 @@ func main() { } log.Fatal(server.ListenAndServe()) } + +// check health by calling the http soh endpoint +// cmd: ./fdsn-ws -check +func healthCheck() { + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + msg, err := health.Check(ctx, ":8080/soh", timeout) + if err != nil { + log.Printf("status: %v", err) + os.Exit(1) + } + log.Printf("status: %s", string(msg)) + os.Exit(0) +} diff --git a/go.mod b/go.mod index 3d3895fa..87342e01 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn go 1.21 require ( - github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 + github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 github.com/gorilla/schema v1.4.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.3 diff --git a/go.sum b/go.sum index 8912fc22..a5c407fb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 h1:BgAWCVg+WxU28mXiy/3le7H9nZUo37QS/+GfXSFWYgo= -github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= +github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80= +github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/vendor/github.com/GeoNet/kit/aws/s3/s3.go b/vendor/github.com/GeoNet/kit/aws/s3/s3.go index 4432d3b8..1cd6bafe 100644 --- a/vendor/github.com/GeoNet/kit/aws/s3/s3.go +++ b/vendor/github.com/GeoNet/kit/aws/s3/s3.go @@ -94,7 +94,7 @@ func getConfig() (aws.Config, error) { var cfg aws.Config var err error - if awsEndpoint := os.Getenv("CUSTOM_AWS_ENDPOINT_URL"); awsEndpoint != "" { + if awsEndpoint := os.Getenv("AWS_ENDPOINT_URL"); awsEndpoint != "" { customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", diff --git a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go index 8b9778a3..bd7f21c1 100644 --- a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go +++ b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go @@ -27,6 +27,9 @@ type SQS struct { client *sqs.Client } +// specific error to return when no messages are received from the queue +var ErrNoMessages = errors.New("no messages received from queue") + // New returns an SQS struct which wraps an SQS client using the default AWS credentials chain. // This consults (in order) environment vars, config files, EC2 and ECS roles. // It is an error if the AWS_REGION environment variable is not set. @@ -62,7 +65,7 @@ func getConfig() (aws.Config, error) { var cfg aws.Config var err error - if awsEndpoint := os.Getenv("CUSTOM_AWS_ENDPOINT_URL"); awsEndpoint != "" { + if awsEndpoint := os.Getenv("AWS_ENDPOINT_URL"); awsEndpoint != "" { customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ PartitionID: "aws", @@ -125,29 +128,28 @@ func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, // receiveMessage is the common code used internally to receive an SQS message based // on the provided input. func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) { + r, err := s.client.ReceiveMessage(ctx, input) + if err != nil { + return Raw{}, err + } - for { - r, err := s.client.ReceiveMessage(ctx, input) - if err != nil { - return Raw{}, err - } + switch { + case r == nil || len(r.Messages) == 0: + // no message received + return Raw{}, ErrNoMessages + + case len(r.Messages) == 1: + raw := r.Messages[0] - switch { - case r == nil || len(r.Messages) == 0: - // no message received - continue - case len(r.Messages) == 1: - raw := r.Messages[0] - - m := Raw{ - Body: aws.ToString(raw.Body), - ReceiptHandle: aws.ToString(raw.ReceiptHandle), - Attributes: raw.Attributes, - } - return m, nil - case len(r.Messages) > 1: - return Raw{}, fmt.Errorf("received more than 1 message: %d", len(r.Messages)) + m := Raw{ + Body: aws.ToString(raw.Body), + ReceiptHandle: aws.ToString(raw.ReceiptHandle), + Attributes: raw.Attributes, } + return m, nil + + default: + return Raw{}, fmt.Errorf("received unexpected messages: %d", len(r.Messages)) } } @@ -279,6 +281,53 @@ func (s *SQS) GetQueueUrl(name string) (string, error) { return "", nil } +func (s *SQS) GetQueueARN(url string) (string, error) { + + params := sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(url), + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameQueueArn, + }, + } + + output, err := s.client.GetQueueAttributes(context.TODO(), ¶ms) + if err != nil { + return "", err + } + arn := output.Attributes[string(types.QueueAttributeNameQueueArn)] + if arn == "" { + return "", errors.New("ARN attribute not found") + } + return arn, nil +} + +// CreateQueue creates an Amazon SQS queue with the specified name. You can specify +// whether the queue is created as a FIFO queue. Returns the queue URL. +func (s *SQS) CreateQueue(queueName string, isFifoQueue bool) (string, error) { + + queueAttributes := map[string]string{} + if isFifoQueue { + queueAttributes["FifoQueue"] = "true" + } + queue, err := s.client.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: aws.String(queueName), + Attributes: queueAttributes, + }) + if err != nil { + return "", err + } + + return aws.ToString(queue.QueueUrl), err +} + +// DeleteQueue deletes an Amazon SQS queue. +func (s *SQS) DeleteQueue(queueUrl string) error { + _, err := s.client.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ + QueueUrl: aws.String(queueUrl)}) + + return err +} + func Cancelled(err error) bool { var opErr *smithy.OperationError if errors.As(err, &opErr) { @@ -286,3 +335,7 @@ func Cancelled(err error) bool { } return false } + +func IsNoMessagesError(err error) bool { + return errors.Is(err, ErrNoMessages) +} diff --git a/vendor/github.com/GeoNet/kit/health/check.go b/vendor/github.com/GeoNet/kit/health/check.go new file mode 100644 index 00000000..a770ae60 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/health/check.go @@ -0,0 +1,80 @@ +package health + +import ( + "context" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// Check calls the given service endpoint with a given context and timeout. +// An error will be returned if the connection fails, or the response status +// is not 200 (i.e. StatusOK). A successful check will return only the check message reply. +func Check(ctx context.Context, servicePath string, timeout time.Duration) ([]byte, error) { + checkUrl := servicePath + if !strings.HasPrefix(checkUrl, "http") { + checkUrl = "http://" + servicePath + } + req, err := url.Parse(checkUrl) + if err != nil { + return nil, err + } + + client := &http.Client{ + Timeout: timeout, + } + + request, err := http.NewRequestWithContext(ctx, http.MethodGet, req.String(), nil) + if err != nil { + return nil, err + } + + resp, err := client.Do(request) + if resp == nil || err != nil { + return nil, err + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%s (%s)", string(body), http.StatusText(resp.StatusCode)) + } + + return body, nil +} + +// CheckStatus runs a Check on the given service and returns zero for a healthy service, and one otherwise. +// +// @param {string} servicePat: service address and path to check e.g. 8080/soh +func CheckStatus(servicePath string, timeout time.Duration) int { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if _, err := Check(ctx, servicePath, timeout); err != nil { + return 1 + } + + return 0 +} + +// RunningHealthCheck returns whether the -check flag was used when starting the program. +// This flag indicates that the program is being used to run a health check on another program. +func RunningHealthCheck() bool { + + var isHealthCheck bool + flag.BoolVar(&isHealthCheck, "check", false, "Whether the program is being used to run a health check") + flag.Parse() + + return isHealthCheck +} diff --git a/vendor/github.com/GeoNet/kit/health/service.go b/vendor/github.com/GeoNet/kit/health/service.go new file mode 100644 index 00000000..d808c5f2 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/health/service.go @@ -0,0 +1,159 @@ +package health + +import ( + "context" + "log" + "net/http" + "sync" + "time" +) + +// CheckPath is the baked in SOH endpoint path. +const CheckPath = "/soh" + +// Service provides a mechanism to update a service SOH status. +type Service struct { + mu sync.Mutex + + // status is used to indicate whether the service is running + status bool + // last stores the time of the last update. + last time.Time + + // start stores when the service was started. + start time.Time + // aged is the time if no updates have happened indicates the service is no longer running. + // Default zero value means no age check required. + aged time.Duration + // startup is the time after the start which the check is assumed to be successful. + startup time.Duration +} + +// New returns a health Service which provides running SOH capabilities. +func New(endpoint string, aged, startup time.Duration) *Service { + service := &Service{ + aged: aged, + last: time.Now(), + start: time.Now(), + startup: startup, + } + + router := http.NewServeMux() + router.HandleFunc(CheckPath, service.handler) + + srv := &http.Server{ + Addr: endpoint, + Handler: router, + ReadHeaderTimeout: 2 * time.Second, + } + + go func() { + if err := srv.ListenAndServe(); err != nil { + log.Println("error starting health check service", err) + } + }() + + return service +} + +// state returns the current application state, this is likely to +// be expanded as new checks are added. +func (s *Service) state() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.status +} + +func (s *Service) handler(w http.ResponseWriter, r *http.Request) { + ok := s.state() + switch { + case time.Since(s.start) < s.startup: + // Avoid terminating before initial check period + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("warn")); err != nil { + log.Println("error writing response", err) + } + case ok && (s.aged == 0 || time.Since(s.last) < s.aged): + // Service is OK and actively updating + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("ok")); err != nil { + log.Println("error writing response", err) + } + default: + // Service is not OK or has stopped updating + w.WriteHeader(http.StatusInternalServerError) + if _, err := w.Write([]byte("fail")); err != nil { + log.Println("error writing response", err) + } + } +} + +// Ok updates the Service to indicate the service is running as expected. +func (s *Service) Ok() { + s.Update(true) +} + +// Fail updates the Service to indicate the service is not running as expected. +func (s *Service) Fail() { + s.Update(false) +} + +// Update sets the Service to the given state, and stores the time since the last update. +func (s *Service) Update(status bool) { + s.mu.Lock() + defer s.mu.Unlock() + + s.status = status + s.last = time.Now() +} + +// Alive allows an application to perform a complex task while still sending hearbeats. +func (s *Service) Alive(ctx context.Context, heartbeat time.Duration) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + defer cancel() + + ticker := time.NewTicker(heartbeat) + defer ticker.Stop() + + s.Ok() + + for { + select { + case <-ticker.C: + s.Ok() + case <-ctx.Done(): + return + } + } + }() + + return cancel +} + +// Pause allows an application to stall for a set period of time while still sending hearbeats. +func (s *Service) Pause(ctx context.Context, deadline, heartbeat time.Duration) context.CancelFunc { + ctx, cancel := context.WithTimeout(ctx, deadline) + + go func() { + defer cancel() + + ticker := time.NewTicker(heartbeat) + defer ticker.Stop() + + s.Ok() + + for { + select { + case <-ticker.C: + s.Ok() + case <-ctx.Done(): + return + } + } + }() + + return cancel +} diff --git a/vendor/github.com/GeoNet/kit/seis/ms/doc.go b/vendor/github.com/GeoNet/kit/seis/ms/doc.go index f58ca47b..24f283b5 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/doc.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/doc.go @@ -1,3 +1,2 @@ // The ms module has been writen as a lightweight replacement for some parts of the libmseed C library. -// package ms diff --git a/vendor/github.com/GeoNet/kit/seis/ms/header.go b/vendor/github.com/GeoNet/kit/seis/ms/header.go index d22163eb..55c3b193 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/header.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/header.go @@ -3,6 +3,7 @@ package ms import ( "encoding/binary" "fmt" + //TODO: needs v 1.15 "hash/maphash" "strconv" "strings" diff --git a/vendor/github.com/GeoNet/kit/seis/ms/record.go b/vendor/github.com/GeoNet/kit/seis/ms/record.go index c4e8e5e7..e4fe13c4 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/record.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/record.go @@ -44,7 +44,7 @@ type Record struct { } // NewMSRecord decodes and unpacks the record samples from a byte slice and returns a Record pointer, -//or an empty pointer and an error if it could not be decoded. +// or an empty pointer and an error if it could not be decoded. func NewRecord(buf []byte) (*Record, error) { var r Record diff --git a/vendor/github.com/GeoNet/kit/seis/ms/steim.go b/vendor/github.com/GeoNet/kit/seis/ms/steim.go index f0ee66a1..6abafc11 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/steim.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/steim.go @@ -14,7 +14,7 @@ func getNibble(word []byte, index int) uint8 { return res } -//value must be 0, 1, 2 or 3, the nibble must not have been previously set +// value must be 0, 1, 2 or 3, the nibble must not have been previously set func writeNibble(word []byte, index int, value uint8) { b := word[index/4] i := index % 4 @@ -23,7 +23,7 @@ func writeNibble(word []byte, index int, value uint8) { } /* - Takes v: an integer where only the first numbits bits are used to represent the number and returns an int32 +Takes v: an integer where only the first numbits bits are used to represent the number and returns an int32 */ func uintVarToInt32(v uint32, numbits uint8) int32 { neg := (v & (0x1 << (numbits - 1))) != 0 //check positive/negative diff --git a/vendor/github.com/GeoNet/kit/slogger/logger.go b/vendor/github.com/GeoNet/kit/slogger/logger.go new file mode 100644 index 00000000..5f45a8f3 --- /dev/null +++ b/vendor/github.com/GeoNet/kit/slogger/logger.go @@ -0,0 +1,82 @@ +package slogger + +import ( + "fmt" + "log" + "strings" + "sync" + "time" +) + +/* +SmartLogger wraps around the standard logger and adds functionality to avoid repeated logs. +usage: +logger := NewSmartLogger(2*time.Second, "error connecting to:") +logger.Log("error connecting to AVLN") +logger.Log("error connecting to AUCK") +*/ +type SmartLogger struct { + window time.Duration //time window to calculate repeated messages + repeatedPrefix string //message prefix to evaluate, compare whole message if not specified + + mu sync.Mutex + lastMessage string + lastLogTime time.Time + repeatCount int +} + +// NewSmartLogger creates a new SmartLogger with the given time window for detecting repeated messages. +// and an optional predefined message prefix +func NewSmartLogger(window time.Duration, repeatedPrefix string) *SmartLogger { + sl := &SmartLogger{ + window: window, + repeatedPrefix: repeatedPrefix, + } + return sl +} + +// Log logs a message, checking if it is repeated within the time window +// return the repeatCount +func (sl *SmartLogger) Log(message ...any) int { + sl.mu.Lock() + defer sl.mu.Unlock() + now := time.Now() + msgString := fmt.Sprintln(message...) + repeated := sl.checkRepeated(msgString) + if repeated && now.Sub(sl.lastLogTime) <= sl.window { + sl.repeatCount++ + } else { + if sl.repeatedPrefix != "" && !repeated { //this is a random message + log.Println(msgString) + sl.lastMessage = "" // Reset lastMessage to avoid tracking it as a repeated message + sl.lastLogTime = time.Time{} // Reset the time + return 0 + } + sl.flush() + sl.lastMessage = msgString + sl.lastLogTime = now + sl.repeatCount = 1 + } + return sl.repeatCount +} + +// flush writes out the summary of repeated messages +func (sl *SmartLogger) flush() { + if sl.repeatCount > 1 { + if sl.repeatedPrefix != "" { + log.Printf("message with prefix \"%s\" repeated %d times", sl.repeatedPrefix, sl.repeatCount) + } else { + log.Printf("message \"%s\" repeated %d times", sl.lastMessage, sl.repeatCount) + } + sl.repeatCount = 0 + } +} + +// checks if message is repeated +func (sl *SmartLogger) checkRepeated(message string) bool { + if sl.repeatedPrefix != "" { + return strings.HasPrefix(message, sl.repeatedPrefix) + } else { + return message == sl.lastMessage + } +} diff --git a/vendor/github.com/GeoNet/kit/weft/weft.go b/vendor/github.com/GeoNet/kit/weft/weft.go index 4f27089c..71dd8195 100644 --- a/vendor/github.com/GeoNet/kit/weft/weft.go +++ b/vendor/github.com/GeoNet/kit/weft/weft.go @@ -73,9 +73,9 @@ func (s StatusError) Status() int { // Status returns the HTTP status code appropriate for err. // It returns: -// * http.StatusOk if err is nil -// * err.Code if err is a StatusErr and Code is set -// * otherwise http.StatusServiceUnavailable +// - http.StatusOk if err is nil +// - err.Code if err is a StatusErr and Code is set +// - otherwise http.StatusServiceUnavailable func Status(err error) int { if err == nil { return http.StatusOK diff --git a/vendor/modules.txt b/vendor/modules.txt index fd891b87..5dbd5322 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,11 +1,13 @@ -# github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 +# github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 ## explicit; go 1.21 github.com/GeoNet/kit/aws/s3 github.com/GeoNet/kit/aws/sqs github.com/GeoNet/kit/cfg +github.com/GeoNet/kit/health github.com/GeoNet/kit/metrics github.com/GeoNet/kit/sc3ml github.com/GeoNet/kit/seis/ms +github.com/GeoNet/kit/slogger github.com/GeoNet/kit/weft github.com/GeoNet/kit/weft/wefttest github.com/GeoNet/kit/wgs84 From 363a81b5f5cef648b3023f7a9382f48bdef3b290 Mon Sep 17 00:00:00 2001 From: bpeng Date: Thu, 5 Dec 2024 11:13:15 +1300 Subject: [PATCH 2/4] add start checks --- cmd/fdsn-holdings-consumer/main.go | 58 +++++++++++---------- cmd/fdsn-quake-consumer/main.go | 54 ++++++++++--------- cmd/fdsn-ws/fdsn_dataselect.go | 4 ++ go.mod | 2 +- go.sum | 4 +- vendor/github.com/GeoNet/kit/aws/s3/s3.go | 9 ++++ vendor/github.com/GeoNet/kit/aws/sqs/sqs.go | 12 +++++ vendor/modules.txt | 2 +- 8 files changed, 88 insertions(+), 57 deletions(-) diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index fc36d5dc..2fb4a0f8 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -28,11 +28,10 @@ import ( "github.com/GeoNet/kit/cfg" "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" - "github.com/GeoNet/kit/slogger" ) const ( - healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting healthCheckTimeout = 30 * time.Second //health check timeout healthCheckService = ":7777" //end point to listen to for SOH checks @@ -41,18 +40,40 @@ const ( var ( db *sql.DB - queueURL = os.Getenv("SQS_QUEUE_URL") + queueURL string sqsClient sqs.SQS s3Client *s3.S3 saveHoldings *sql.Stmt - - sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type event struct { s3.Event } +// init and check aws variables +func initAwsClient() { + queueURL = os.Getenv("SQS_QUEUE_URL") + if queueURL == "" { + log.Fatal("SQS_QUEUE_URL is not set") + } + + var err error + sqsClient, err = sqs.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("error creating SQS client: %s", err) + } + if err = sqsClient.CheckQueue(queueURL); err != nil { + log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + } + + s3c, err := s3.NewWithMaxRetries(3) + if err != nil { + log.Fatalf("error creating S3 client: %s", err) + } + s3Client = &s3c + +} + func main() { //check health if health.RunningHealthCheck() { @@ -60,6 +81,7 @@ func main() { } //run as normal service + initAwsClient() p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -117,17 +139,6 @@ ping: break ping } - sqsClient, err = sqs.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("error creating SQS client: %s", err) - } - - s3c, err := s3.NewWithMaxRetries(3) - if err != nil { - log.Fatalf("error creating S3 client: %s", err) - } - s3Client = &s3c - log.Println("listening for messages") var r sqs.Raw @@ -138,40 +149,31 @@ ping: loop1: for { + health.Ok() // update soh r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { switch { + case sqs.IsNoMessagesError(err): + continue case sqs.Cancelled(err): //stoped log.Println("##1 system stop... ") break loop1 - case sqs.IsNoMessagesError(err): - n := sLogger.Log(err) - if n%100 == 0 { //don't log all repeated error messages - log.Printf("no message received for %d times ", n) - } default: slog.Warn("problem receiving message, backing off", "err", err) time.Sleep(time.Second * 20) } - // update soh - health.Ok() continue } err = metrics.DoProcess(&e, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) - // update soh - health.Ok() continue } - err = sqsClient.Delete(queueURL, r.ReceiptHandle) if err != nil { log.Printf("problem deleting message, continuing: %s", err) } - // update soh - health.Ok() } } diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index 9f1ce3ae..5961c064 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -22,11 +22,10 @@ import ( "github.com/GeoNet/kit/cfg" "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" - "github.com/GeoNet/kit/slogger" ) const ( - healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting healthCheckTimeout = 30 * time.Second //health check timeout healthCheckService = ":7777" //end point to listen to for SOH checks @@ -38,14 +37,34 @@ var ( s3Client s3.S3 sqsClient sqs.SQS db *sql.DB - - sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type notification struct { s3.Event } +// init and check aws clients +func initAwsClients() { + queueURL = os.Getenv("SQS_QUEUE_URL") + if queueURL == "" { + log.Fatal("SQS_QUEUE_URL is not set") + } + + var err error + s3Client, err = s3.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("creating S3 client: %s", err) + } + + sqsClient, err = sqs.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("creating SQS client: %s", err) + } + if err = sqsClient.CheckQueue(queueURL); err != nil { + log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + } +} + func main() { //check health if health.RunningHealthCheck() { @@ -53,6 +72,8 @@ func main() { } //run as normal service + initAwsClients() + p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -83,16 +104,6 @@ ping: break ping } - s3Client, err = s3.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("creating S3 client: %s", err) - } - - sqsClient, err = sqs.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("creating SQS client: %s", err) - } - log.Println("listening for messages") var r sqs.Raw @@ -103,31 +114,26 @@ ping: loop1: for { + health.Ok() // update soh + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { switch { + case sqs.IsNoMessagesError(err): + continue case sqs.Cancelled(err): //stoped log.Println("##1 system stop... ") break loop1 - case sqs.IsNoMessagesError(err): - n := sLogger.Log(err) - if n%100 == 0 { //don't log all repeated error messages - log.Printf("no message received for %d times ", n) - } default: slog.Warn("problem receiving message, backing off", "err", err) time.Sleep(time.Second * 20) } - // update soh - health.Ok() continue } err = metrics.DoProcess(&n, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) - // update soh - health.Ok() continue } @@ -135,8 +141,6 @@ loop1: if err != nil { log.Printf("problem deleting message, continuing: %s", err) } - // update soh - health.Ok() } } diff --git a/cmd/fdsn-ws/fdsn_dataselect.go b/cmd/fdsn-ws/fdsn_dataselect.go index 9e0c9790..153da405 100644 --- a/cmd/fdsn-ws/fdsn_dataselect.go +++ b/cmd/fdsn-ws/fdsn_dataselect.go @@ -63,6 +63,10 @@ func initDataselectTemplate() { log.Fatalf("error creating S3 client: %s", err) } s3Client = &s3c + + if err = s3Client.CheckBucket(S3_BUCKET); err != nil { + log.Fatalf("error checking S3_BUCKET %s: %s", S3_BUCKET, err.Error()) + } } // fdsnDataMetricsV1Handler handles all datametrics queries. diff --git a/go.mod b/go.mod index 87342e01..a77ad9fb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn go 1.21 require ( - github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 + github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c github.com/gorilla/schema v1.4.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.3 diff --git a/go.sum b/go.sum index a5c407fb..8e893879 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80= -github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= +github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c h1:yrk9pbtLaPEWmmrx2v5U457PnEzIg1o+Q6X0hOZWWS0= +github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/vendor/github.com/GeoNet/kit/aws/s3/s3.go b/vendor/github.com/GeoNet/kit/aws/s3/s3.go index 1cd6bafe..9a0cdabe 100644 --- a/vendor/github.com/GeoNet/kit/aws/s3/s3.go +++ b/vendor/github.com/GeoNet/kit/aws/s3/s3.go @@ -271,6 +271,15 @@ func (s *S3) PutWithMetadata(bucket, key string, object []byte, metadata Meta) e return err } +// CheckBucket checks if the given S3 bucket exists and is accessible. +func (s *S3) CheckBucket(bucket string) error { + _, err := s.client.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(bucket), + }) + + return err +} + // Exists checks if an object for key already exists in the bucket. func (s *S3) Exists(bucket, key string) (bool, error) { diff --git a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go index bd7f21c1..58c7a4b7 100644 --- a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go +++ b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go @@ -320,6 +320,18 @@ func (s *SQS) CreateQueue(queueName string, isFifoQueue bool) (string, error) { return aws.ToString(queue.QueueUrl), err } +// CheckQueue checks if the given SQS queue exists and is accessible. +func (s *SQS) CheckQueue(queueUrl string) error { + params := sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(queueUrl), + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameAll, + }, + } + _, err := s.client.GetQueueAttributes(context.TODO(), ¶ms) + return err +} + // DeleteQueue deletes an Amazon SQS queue. func (s *SQS) DeleteQueue(queueUrl string) error { _, err := s.client.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ diff --git a/vendor/modules.txt b/vendor/modules.txt index 5dbd5322..447cf6a9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 +# github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c ## explicit; go 1.21 github.com/GeoNet/kit/aws/s3 github.com/GeoNet/kit/aws/sqs From c62f9eaee4cd144d6c6dc476cbda26e1f8f22189 Mon Sep 17 00:00:00 2001 From: bpeng Date: Thu, 12 Dec 2024 15:45:21 +1300 Subject: [PATCH 3/4] changes per review --- cmd/fdsn-holdings-consumer/main.go | 7 ++----- cmd/fdsn-quake-consumer/main.go | 2 -- cmd/fdsn-ws/server.go | 7 +++++-- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index 2fb4a0f8..bd5e9a06 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -42,7 +42,7 @@ var ( db *sql.DB queueURL string sqsClient sqs.SQS - s3Client *s3.S3 + s3Client s3.S3 saveHoldings *sql.Stmt ) @@ -66,12 +66,10 @@ func initAwsClient() { log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) } - s3c, err := s3.NewWithMaxRetries(3) + s3Client, err = s3.NewWithMaxRetries(3) if err != nil { log.Fatalf("error creating S3 client: %s", err) } - s3Client = &s3c - } func main() { @@ -132,7 +130,6 @@ ping: err = db.Ping() if err != nil { log.Println("problem pinging DB sleeping and retrying") - health.Ok() //send heartbeat time.Sleep(time.Second * 30) continue ping } diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index 5961c064..9a42a126 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -96,8 +96,6 @@ ping: err = db.Ping() if err != nil { log.Println("problem pinging DB sleeping and retrying") - health.Ok() //send heartbeat - time.Sleep(time.Second * 30) continue ping } diff --git a/cmd/fdsn-ws/server.go b/cmd/fdsn-ws/server.go index a05ac9c0..c10ec500 100644 --- a/cmd/fdsn-ws/server.go +++ b/cmd/fdsn-ws/server.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "fmt" "log" "net/http" "os" @@ -16,6 +17,8 @@ import ( _ "github.com/lib/pq" ) +const servicePort = ":8080" //http service port + var ( db *sql.DB decoder = newDecoder() // decoder for URL queries. @@ -85,7 +88,7 @@ func main() { log.Println("starting server") server := &http.Server{ - Addr: ":8080", + Addr: servicePort, Handler: mux, ReadTimeout: 1 * time.Minute, WriteTimeout: 10 * time.Minute, @@ -100,7 +103,7 @@ func healthCheck() { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - msg, err := health.Check(ctx, ":8080/soh", timeout) + msg, err := health.Check(ctx, fmt.Sprintf("%s/soh", servicePort), timeout) if err != nil { log.Printf("status: %v", err) os.Exit(1) From 07e167abba9b347a8b11154b0ccc9dff7a8e3278 Mon Sep 17 00:00:00 2001 From: bpeng Date: Fri, 20 Dec 2024 09:23:43 +1300 Subject: [PATCH 4/4] temporarily drop checkQueue to allow merge safely --- cmd/fdsn-holdings-consumer/main.go | 6 +++--- cmd/fdsn-quake-consumer/main.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index bd5e9a06..def3f45b 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -62,9 +62,9 @@ func initAwsClient() { if err != nil { log.Fatalf("error creating SQS client: %s", err) } - if err = sqsClient.CheckQueue(queueURL); err != nil { - log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) - } + // if err = sqsClient.CheckQueue(queueURL); err != nil { + // log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + // } s3Client, err = s3.NewWithMaxRetries(3) if err != nil { diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index 9a42a126..a815298f 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -60,9 +60,9 @@ func initAwsClients() { if err != nil { log.Fatalf("creating SQS client: %s", err) } - if err = sqsClient.CheckQueue(queueURL); err != nil { - log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) - } + // if err = sqsClient.CheckQueue(queueURL); err != nil { + // log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + // } } func main() {