Skip to content

Commit

Permalink
Fix/connection issues (#1331)
Browse files Browse the repository at this point in the history
* update healthchecks across services

* fix typo

* fix linter

* Auto commit - update kustomization.yaml

* remove peer logging from contextbox

* update min time

* Auto commit - update kustomization.yaml

* disable idle timeout

* Auto commit - update kustomization.yaml

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: CI/CD pipeline <CI/[email protected]>
  • Loading branch information
3 people authored Apr 10, 2024
1 parent 4f26daa commit 56ef409
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 139 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
golang.org/x/sync v0.6.0
google.golang.org/api v0.169.0
google.golang.org/grpc v1.62.1
google.golang.org/grpc v1.63.0
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.3
Expand Down Expand Up @@ -138,7 +138,7 @@ require (
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/client-go v0.29.0
k8s.io/klog/v2 v2.110.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo=
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 h1:rIo7ocm2roD9DcFIX67Ym8icoGCKSARAiPljFhh5suQ=
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78 h1:Xs9lu+tLXxLIfuci70nG4cpwaRC+mRQPUL7LoIeDJC4=
Expand All @@ -435,8 +435,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8=
google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
91 changes: 91 additions & 0 deletions internal/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package healthcheck

import (
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -51,3 +55,90 @@ func (s *ClientHealthChecker) health(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
writeMsg(w, "ok")
}

type HealthCheck struct {
Ping func() error
ServiceName string
timeSinceFailure *time.Time
}

type HealthChecker struct {
services []HealthCheck
logger *zerolog.Logger
lock sync.Mutex
}

func NewHealthCheck(logger *zerolog.Logger, interval time.Duration, services []HealthCheck) *HealthChecker {
hc := &HealthChecker{
services: services,
logger: logger,
}

hc.check() // perform initial check

go func() {
ticker := time.NewTicker(interval)
for range ticker.C {
hc.check()
}
}()

return hc
}

func (c *HealthChecker) check() {
c.lock.Lock()
defer c.lock.Unlock()

updateTimeSinceFailure := func(n string, now *time.Time, t **time.Time, err error) {
if err == nil {
if *t != nil {
c.logger.Debug().Msgf("service:%v is healthy again", n)
*t = nil
}
return
}
if *t == nil {
c.logger.Debug().Msgf("service:%v return error in ping: %v", n, err)
*t = now
}
}

now := time.Now()
for i := range c.services {
updateTimeSinceFailure(c.services[i].ServiceName, &now, &c.services[i].timeSinceFailure, c.services[i].Ping())
}
}

func (c *HealthChecker) CheckForFailures() error {
c.lock.Lock()
defer c.lock.Unlock()

var err error
for _, svc := range c.services {
err = c.checkFailure(svc.timeSinceFailure, svc.ServiceName, err)
}
return err
}

func (c *HealthChecker) checkFailure(t *time.Time, service string, perr error) error {
if t != nil && time.Since(*t) >= 4*time.Minute {
if perr != nil {
return fmt.Errorf("%w; %s is unhealthy", perr, service)
}
return fmt.Errorf("%s is unhealthy", service)
}
return perr
}

func (c *HealthChecker) AnyServiceUnhealthy() bool {
c.lock.Lock()
defer c.lock.Unlock()

ok := false
for _, svc := range c.services {
ok = ok || (svc.timeSinceFailure != nil)
}

return ok
}
25 changes: 22 additions & 3 deletions internal/utils/grpc.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package utils

import (
"context"
"errors"
"fmt"
"math"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/peer"
)

var ErrConnectionNotReady = errors.New("unhealthy gRPC connection")
Expand All @@ -24,11 +27,26 @@ func CloseClientConnection(connection *grpc.ClientConn) {
}
}

func PeerInfoInterceptor(logger *zerolog.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
p, ok := peer.FromContext(ctx)
if ok {
peerAddr := p.Addr.String()
logger.Debug().Msgf("incoming request: %v from peer connected with addr: %s\n", info.FullMethod, peerAddr)
}
if !ok {
logger.Debug().Msg("Peer information cannot be extracted")
}

return handler(ctx, req)
}
}

func NewGRPCServer(opts ...grpc.ServerOption) *grpc.Server {
opts = append(opts,
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 20 * time.Second, // If a client pings more than once every 20 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
MinTime: 5 * time.Second, // If a client doesn't wait at least 5 seconds before a ping terminate.
PermitWithoutStream: true, // Allow pings even when there are no active streams
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: math.MaxInt64, // If a client is idle for INFINITE seconds, send a GOAWAY.
Expand Down Expand Up @@ -59,9 +77,10 @@ func GrpcDialWithRetryAndBackoff(serviceName, serviceURL string) (*grpc.ClientCo
grpc_retry.WithCodes(codes.Unavailable),
}

conn, err := grpc.Dial(
conn, err := grpc.NewClient(
serviceURL,
grpc.WithKeepaliveParams(kacp),
grpc.WithIdleTimeout(0), // Disable idle timeout will try to keep the connection active at all times.
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(opts...)),
Expand Down
16 changes: 8 additions & 8 deletions manifests/claudie/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: ghcr.io/berops/claudie/ansibler
newTag: 81b00f5-2706
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/autoscaler-adapter
newTag: 688726a-1932
- name: ghcr.io/berops/claudie/builder
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/claudie-operator
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/context-box
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/kube-eleven
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/kuber
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/scheduler
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
- name: ghcr.io/berops/claudie/terraformer
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
2 changes: 1 addition & 1 deletion manifests/testing-framework/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ secretGenerator:

images:
- name: ghcr.io/berops/claudie/testing-framework
newTag: 4f4c7c9-2693
newTag: dc35dd7-2715
5 changes: 4 additions & 1 deletion services/ansibler/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func main() {
&usecases.Usecases{
SpawnProcessLimit: make(chan struct{}, usecases.SpawnProcessLimit),
},
grpc2.UnaryInterceptor(metrics.MetricsMiddleware),
grpc2.ChainUnaryInterceptor(
metrics.MetricsMiddleware,
utils.PeerInfoInterceptor(&log.Logger),
),
)

metricsServer := &http.Server{Addr: fmt.Sprintf(":%s", utils.GetEnvDefault("PROMETHEUS_PORT", defaultPrometheusPort))}
Expand Down
5 changes: 4 additions & 1 deletion services/autoscaler-adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/berops/claudie/internal/utils"
"github.com/berops/claudie/services/autoscaler-adapter/claudie_provider"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/externalgrpc/protos"
)

Expand All @@ -21,7 +22,9 @@ func main() {
}
utils.InitLog(fmt.Sprintf("%s-%s", "autoscaler-adapter", clusterName))

server := utils.NewGRPCServer()
server := utils.NewGRPCServer(
grpc.ChainUnaryInterceptor(utils.PeerInfoInterceptor(&log.Logger)),
)

// Listen
serviceAddr := net.JoinHostPort("0.0.0.0", port)
Expand Down
Loading

0 comments on commit 56ef409

Please sign in to comment.