Skip to content

Commit

Permalink
feat: adjust action poll timeout (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Nov 25, 2021
1 parent 57ce9fb commit c753e71
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 31 deletions.
29 changes: 15 additions & 14 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
)

type Config struct {
PollInterval time.Duration
PollTimeout time.Duration
AckTimeout time.Duration
AckRetriesCount int
AckRetryWait time.Duration
ClusterID string
PollWaitInterval time.Duration // How long to wait unit next long polling request.
PollTimeout time.Duration // hard timeout. Normally server should return empty result before this timeout.
AckTimeout time.Duration // How long to wait for ack request to complete.
AckRetriesCount int // Ack retry count.
AckRetryWait time.Duration // How long to wait before next ack retry request.
ClusterID string
}

type Service interface {
Expand Down Expand Up @@ -61,7 +61,7 @@ type service struct {
func (s *service) Run(ctx context.Context) {
for {
select {
case <-time.After(s.cfg.PollInterval):
case <-time.After(s.cfg.PollWaitInterval):
err := s.doWork(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -81,18 +81,19 @@ func (s *service) Run(ctx context.Context) {

func (s *service) doWork(ctx context.Context) error {
s.log.Debug("polling actions")
start := time.Now()
actions, err := s.pollActions(ctx)
if err != nil {
// Skip deadline errors. These are expected for long polling requests.
if errors.Is(err, context.DeadlineExceeded) {
s.log.Debugf("no actions returned in given duration=%s, will continue", s.cfg.PollTimeout)
return nil
}

return fmt.Errorf("polling actions: %w", err)
}

s.log.Debugf("received actions, len=%d", len(actions))
pollDuration := time.Now().Sub(start)
if len(actions) == 0 {
s.log.Debugf("no actions returned in %s", pollDuration)
return nil
}

s.log.Debugf("received %d actions in %s", len(actions), pollDuration)
if err := s.handleActions(ctx, actions); err != nil {
return fmt.Errorf("handling actions: %w", err)
}
Expand Down
25 changes: 19 additions & 6 deletions actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func TestActions(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
cfg := Config{
PollInterval: 1 * time.Millisecond,
PollTimeout: 100 * time.Millisecond,
AckTimeout: 1 * time.Second,
AckRetriesCount: 3,
AckRetryWait: 1 * time.Millisecond,
ClusterID: uuid.New().String(),
PollWaitInterval: 1 * time.Millisecond,
PollTimeout: 100 * time.Millisecond,
AckTimeout: 1 * time.Second,
AckRetriesCount: 3,
AckRetryWait: 1 * time.Millisecond,
ClusterID: uuid.New().String(),
}

newTestService := func(handler ActionHandler, client castai.Client) Service {
Expand Down Expand Up @@ -81,6 +81,19 @@ func TestActions(t *testing.T) {
svc.Run(ctx)
})

t.Run("continue polling on api error", func(t *testing.T) {
client := mock.NewMockAPIClient([]*castai.ClusterAction{})
client.GetActionsErr = errors.New("ups")
handler := &mockAgentActionHandler{err: errors.New("ups")}
svc := newTestService(handler, client)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer func() {
cancel()
r.Len(client.Acks, 0)
}()
svc.Run(ctx)
})

t.Run("ack with error when action handler failed", func(t *testing.T) {
apiActions := []*castai.ClusterAction{
{
Expand Down
11 changes: 6 additions & 5 deletions castai/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ type mockAck struct {
}

type mockClient struct {
Actions []*castai.ClusterAction
Logs []*castai.LogEvent
Acks []*mockAck
mu sync.Mutex
Actions []*castai.ClusterAction
GetActionsErr error
Logs []*castai.LogEvent
Acks []*mockAck
mu sync.Mutex
}

func (m *mockClient) GetActions(_ context.Context) ([]*castai.ClusterAction, error) {
return m.Actions, nil
return m.Actions, m.GetActionsErr
}

func (m *mockClient) SendLogs(_ context.Context, req *castai.LogEvent) error {
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func run(ctx context.Context, log logrus.FieldLogger, client castai.Client, logg
log = log.WithFields(fields)

actionsConfig := actions.Config{
PollInterval: 5 * time.Second,
PollTimeout: 1 * time.Minute,
AckTimeout: 30 * time.Second,
AckRetriesCount: 3,
AckRetryWait: 1 * time.Second,
ClusterID: cfg.ClusterID,
PollWaitInterval: 5 * time.Second,
PollTimeout: 5 * time.Minute,
AckTimeout: 30 * time.Second,
AckRetriesCount: 3,
AckRetryWait: 1 * time.Second,
ClusterID: cfg.ClusterID,
}
svc := actions.NewService(log, actionsConfig, clientset, client)
svc.Run(ctx)
Expand Down

0 comments on commit c753e71

Please sign in to comment.