Skip to content

Commit

Permalink
Add last state change timestamp to UserTasks (#50199)
Browse files Browse the repository at this point in the history
This PR adds a new field to the UserTask Status.
It will register the timestamp of when the UserTask state was last
changed.
  • Loading branch information
marcoandredinis authored Dec 16, 2024
1 parent 087d926 commit ff40022
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 147 deletions.
328 changes: 198 additions & 130 deletions api/gen/proto/go/teleport/usertasks/v1/user_tasks.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions api/proto/teleport/usertasks/v1/user_tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ message UserTask {
teleport.header.v1.Metadata metadata = 4;
// The configured properties of UserTask.
UserTaskSpec spec = 5;
// The current status for this UserTask.
UserTaskStatus status = 6;
}

// UserTaskSpec contains the properties of the UserTask.
Expand All @@ -62,6 +64,12 @@ message UserTaskSpec {
DiscoverEKS discover_eks = 6;
}

// UserTaskStatus contains the current status for the UserTask.
message UserTaskStatus {
// LastStateChange is the timestamp when the UserTask state was last modified.
google.protobuf.Timestamp last_state_change = 1;
}

// DiscoverEC2 contains the instances that failed to auto-enroll into the cluster.
message DiscoverEC2 {
// Instances maps an instance id to the result of enrolling that instance into teleport.
Expand Down
37 changes: 32 additions & 5 deletions lib/auth/usertasks/usertasksv1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -48,6 +49,9 @@ type ServiceConfig struct {
// Cache is the cache for storing UserTask.
Cache Reader

// Clock is used to control time - mainly used for testing.
Clock clockwork.Clock

// UsageReporter is the reporter for sending usage without it be related to an API call.
UsageReporter func() usagereporter.UsageReporter

Expand All @@ -74,6 +78,9 @@ func (s *ServiceConfig) CheckAndSetDefaults() error {
if s.Emitter == nil {
return trace.BadParameter("emitter is required")
}
if s.Clock == nil {
s.Clock = clockwork.NewRealClock()
}

return nil
}
Expand All @@ -92,6 +99,7 @@ type Service struct {
authorizer authz.Authorizer
backend services.UserTasks
cache Reader
clock clockwork.Clock
usageReporter func() usagereporter.UsageReporter
emitter apievents.Emitter
}
Expand All @@ -106,6 +114,7 @@ func NewService(cfg ServiceConfig) (*Service, error) {
authorizer: cfg.Authorizer,
backend: cfg.Backend,
cache: cfg.Cache,
clock: cfg.Clock,
usageReporter: cfg.UsageReporter,
emitter: cfg.Emitter,
}, nil
Expand All @@ -122,6 +131,8 @@ func (s *Service) CreateUserTask(ctx context.Context, req *usertasksv1.CreateUse
return nil, trace.Wrap(err)
}

s.updateStatus(req.UserTask)

rsp, err := s.backend.CreateUserTask(ctx, req.UserTask)
s.emitCreateAuditEvent(ctx, rsp, authCtx, err)
if err != nil {
Expand Down Expand Up @@ -249,13 +260,19 @@ func (s *Service) UpdateUserTask(ctx context.Context, req *usertasksv1.UpdateUse
return nil, trace.Wrap(err)
}

stateChanged := existingUserTask.GetSpec().GetState() != req.GetUserTask().GetSpec().GetState()

if stateChanged {
s.updateStatus(req.UserTask)
}

rsp, err := s.backend.UpdateUserTask(ctx, req.UserTask)
s.emitUpdateAuditEvent(ctx, existingUserTask, req.GetUserTask(), authCtx, err)
if err != nil {
return nil, trace.Wrap(err)
}

if existingUserTask.GetSpec().GetState() != req.GetUserTask().GetSpec().GetState() {
if stateChanged {
s.usageReporter().AnonymizeAndSubmit(userTaskToUserTaskStateEvent(req.GetUserTask()))
}

Expand Down Expand Up @@ -299,18 +316,22 @@ func (s *Service) UpsertUserTask(ctx context.Context, req *usertasksv1.UpsertUse
return nil, trace.Wrap(err)
}

var emitStateChangeEvent bool
var stateChanged bool

existingUserTask, err := s.backend.GetUserTask(ctx, req.GetUserTask().GetMetadata().GetName())
switch {
case trace.IsNotFound(err):
emitStateChangeEvent = true
stateChanged = true

case err != nil:
return nil, trace.Wrap(err)

default:
emitStateChangeEvent = existingUserTask.GetSpec().GetState() != req.GetUserTask().GetSpec().GetState()
stateChanged = existingUserTask.GetSpec().GetState() != req.GetUserTask().GetSpec().GetState()
}

if stateChanged {
s.updateStatus(req.UserTask)
}

rsp, err := s.backend.UpsertUserTask(ctx, req.UserTask)
Expand All @@ -319,13 +340,19 @@ func (s *Service) UpsertUserTask(ctx context.Context, req *usertasksv1.UpsertUse
return nil, trace.Wrap(err)
}

if emitStateChangeEvent {
if stateChanged {
s.usageReporter().AnonymizeAndSubmit(userTaskToUserTaskStateEvent(req.GetUserTask()))
}

return rsp, nil
}

func (s *Service) updateStatus(ut *usertasksv1.UserTask) {
ut.Status = &usertasksv1.UserTaskStatus{
LastStateChange: timestamppb.New(s.clock.Now()),
}
}

func (s *Service) emitUpsertAuditEvent(ctx context.Context, old, new *usertasksv1.UserTask, authCtx *authz.Context, err error) {
if old == nil {
s.emitCreateAuditEvent(ctx, new, authCtx, err)
Expand Down
38 changes: 31 additions & 7 deletions lib/auth/usertasks/usertasksv1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -88,7 +90,7 @@ func TestServiceAccess(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for _, verbs := range utils.Combinations(tt.allowedVerbs) {
t.Run(fmt.Sprintf("verbs=%v", verbs), func(t *testing.T) {
service := newService(t, fakeChecker{allowedVerbs: verbs}, testReporter, &libevents.DiscardEmitter{})
service := newService(t, fakeChecker{allowedVerbs: verbs}, testReporter, &libevents.DiscardEmitter{}, clockwork.NewFakeClock())
err := callMethod(t, service, tt.name)
// expect access denied except with full set of verbs.
if len(verbs) == len(tt.allowedVerbs) {
Expand Down Expand Up @@ -119,7 +121,8 @@ func TestEvents(t *testing.T) {
rwVerbs := []string{types.VerbList, types.VerbCreate, types.VerbRead, types.VerbUpdate, types.VerbDelete}
testReporter := &mockUsageReporter{}
auditEventsSink := eventstest.NewChannelEmitter(10)
service := newService(t, fakeChecker{allowedVerbs: rwVerbs}, testReporter, auditEventsSink)
fakeClock := clockwork.NewFakeClock()
service := newService(t, fakeChecker{allowedVerbs: rwVerbs}, testReporter, auditEventsSink, fakeClock)
ctx := context.Background()

ut1, err := usertasks.NewDiscoverEC2UserTask(&usertasksv1.UserTaskSpec{
Expand All @@ -142,29 +145,37 @@ func TestEvents(t *testing.T) {
require.NoError(t, err)
userTaskName := ut1.GetMetadata().GetName()

_, err = service.CreateUserTask(ctx, &usertasksv1.CreateUserTaskRequest{UserTask: ut1})
createUserTaskResp, err := service.CreateUserTask(ctx, &usertasksv1.CreateUserTaskRequest{UserTask: ut1})
require.NoError(t, err)
// Usage reporting happens when user task is created, so we expect to see an event.
require.Len(t, testReporter.emittedEvents, 1)
consumeAssertEvent(t, auditEventsSink.C(), auditEventFor(userTaskName, "create", "", ""))
// LastStateChange is updated.
require.Equal(t, timestamppb.New(fakeClock.Now()), createUserTaskResp.Status.LastStateChange)

ut1.Spec.DiscoverEc2.Instances["i-345"] = &usertasksv1.DiscoverEC2Instance{
InstanceId: "i-345",
DiscoveryConfig: "dc01",
DiscoveryGroup: "dg01",
}
_, err = service.UpsertUserTask(ctx, &usertasksv1.UpsertUserTaskRequest{UserTask: ut1})
fakeClock.Advance(1 * time.Minute)
upsertUserTaskResp, err := service.UpsertUserTask(ctx, &usertasksv1.UpsertUserTaskRequest{UserTask: ut1})
require.NoError(t, err)
// State was not updated, so usage events must not increase.
require.Len(t, testReporter.emittedEvents, 1)
consumeAssertEvent(t, auditEventsSink.C(), auditEventFor(userTaskName, "update", "OPEN", "OPEN"))
// LastStateChange is not updated.
require.Equal(t, createUserTaskResp.Status.LastStateChange, upsertUserTaskResp.Status.LastStateChange)

ut1.Spec.State = "RESOLVED"
_, err = service.UpdateUserTask(ctx, &usertasksv1.UpdateUserTaskRequest{UserTask: ut1})
fakeClock.Advance(1 * time.Minute)
updateUserTaskResp, err := service.UpdateUserTask(ctx, &usertasksv1.UpdateUserTaskRequest{UserTask: ut1})
require.NoError(t, err)
// State was updated, so usage events include this new usage report.
require.Len(t, testReporter.emittedEvents, 2)
consumeAssertEvent(t, auditEventsSink.C(), auditEventFor(userTaskName, "update", "OPEN", "RESOLVED"))
// LastStateChange was updated because the state changed.
require.Equal(t, timestamppb.New(fakeClock.Now()), updateUserTaskResp.Status.LastStateChange)

_, err = service.DeleteUserTask(ctx, &usertasksv1.DeleteUserTaskRequest{Name: userTaskName})
require.NoError(t, err)
Expand Down Expand Up @@ -241,9 +252,21 @@ func consumeAssertEvent(t *testing.T, q <-chan apievents.AuditEvent, expectedEve

// callMethod calls a method with given name in the UserTask service
func callMethod(t *testing.T, service *Service, method string) error {
emptyUserTask := &usertasksv1.UserTask{
Spec: &usertasksv1.UserTaskSpec{},
}

for _, desc := range usertasksv1.UserTaskService_ServiceDesc.Methods {
if desc.MethodName == method {
_, err := desc.Handler(service, context.Background(), func(_ any) error { return nil }, nil)
_, err := desc.Handler(service, context.Background(), func(arg any) error {
switch arg := arg.(type) {
case *usertasksv1.CreateUserTaskRequest:
arg.UserTask = emptyUserTask
case *usertasksv1.UpsertUserTaskRequest:
arg.UserTask = emptyUserTask
}
return nil
}, nil)
return err
}
}
Expand All @@ -266,7 +289,7 @@ func (f fakeChecker) CheckAccessToRule(_ services.RuleContext, _ string, resourc
return trace.AccessDenied("access denied to rule=%v/verb=%v", resource, verb)
}

func newService(t *testing.T, checker services.AccessChecker, usageReporter usagereporter.UsageReporter, emitter apievents.Emitter) *Service {
func newService(t *testing.T, checker services.AccessChecker, usageReporter usagereporter.UsageReporter, emitter apievents.Emitter, clock clockwork.Clock) *Service {
t.Helper()

b, err := memory.New(memory.Config{})
Expand Down Expand Up @@ -297,6 +320,7 @@ func newService(t *testing.T, checker services.AccessChecker, usageReporter usag
Cache: backendService,
UsageReporter: func() usagereporter.UsageReporter { return usageReporter },
Emitter: emitter,
Clock: clock,
})
require.NoError(t, err)
return service
Expand Down
15 changes: 10 additions & 5 deletions lib/web/ui/usertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package ui

import (
"time"

"github.com/gravitational/trace"

usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
Expand All @@ -37,6 +39,8 @@ type UserTask struct {
IssueType string `json:"issueType,omitempty"`
// Integration is the Integration Name this User Task refers to.
Integration string `json:"integration,omitempty"`
// LastStateChange indicates when the current's user task state was last changed.
LastStateChange time.Time `json:"lastStateChange,omitempty"`
}

// UserTaskDetail contains all the details for a User Task.
Expand Down Expand Up @@ -94,10 +98,11 @@ func MakeDetailedUserTask(ut *usertasksv1.UserTask) UserTaskDetail {
// MakeUserTask creates a UI UserTask representation.
func MakeUserTask(ut *usertasksv1.UserTask) UserTask {
return UserTask{
Name: ut.GetMetadata().GetName(),
TaskType: ut.GetSpec().GetTaskType(),
State: ut.GetSpec().GetState(),
IssueType: ut.GetSpec().GetIssueType(),
Integration: ut.GetSpec().GetIntegration(),
Name: ut.GetMetadata().GetName(),
TaskType: ut.GetSpec().GetTaskType(),
State: ut.GetSpec().GetState(),
IssueType: ut.GetSpec().GetIssueType(),
Integration: ut.GetSpec().GetIntegration(),
LastStateChange: ut.GetStatus().GetLastStateChange().AsTime(),
}
}
4 changes: 4 additions & 0 deletions lib/web/usertasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func TestUserTask(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "OPEN", userTaskDetailResp.State)
require.NotEmpty(t, userTaskDetailResp.DiscoverEC2)
lastStateChangeT0 := userTaskDetailResp.LastStateChange

// Mark it as resolved.
_, err = pack.clt.PutJSON(ctx, updateStateEndpoint(userTaskName), ui.UpdateUserTaskStateRequest{
Expand All @@ -153,5 +154,8 @@ func TestUserTask(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, userTaskDetailResp.DiscoverEC2)
require.Equal(t, "RESOLVED", userTaskDetailResp.State)
// Its last changed state should be updated.
lastStateChangeT1 := userTaskDetailResp.LastStateChange
require.True(t, lastStateChangeT1.After(lastStateChangeT0), "last state change was not updated after changing the UserTask state")
})
}

0 comments on commit ff40022

Please sign in to comment.