From dd66a19881bd7a917bfc0b05dfec7dcaca7a3917 Mon Sep 17 00:00:00 2001 From: sudipto baral Date: Fri, 8 Nov 2024 09:24:08 -0500 Subject: [PATCH] use fullstatedump for initial sync --- go.mod | 13 +- go.sum | 24 +- .../database/repository/mock_repository.go | 8 +- internal/database/repository/queue.go | 5 +- internal/database/repository/repository.go | 2 +- internal/yunikorn/client.go | 2 + internal/yunikorn/event_handler.go | 13 +- internal/yunikorn/mock_client.go | 16 + internal/yunikorn/rest.go | 20 + internal/yunikorn/service.go | 40 +- internal/yunikorn/sync.go | 99 +--- internal/yunikorn/sync_int_test.go | 517 ++++++------------ 12 files changed, 294 insertions(+), 465 deletions(-) diff --git a/go.mod b/go.mod index 73b984a..080c5b7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/G-Research/unicorn-history-server go 1.22.4 require ( - github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb + github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38 github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08 github.com/emicklei/go-restful-openapi/v2 v2.11.0 github.com/emicklei/go-restful/v3 v3.12.1 @@ -29,6 +29,8 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect @@ -39,6 +41,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.1.2 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -50,9 +53,12 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lib/pq v1.10.9 // indirect + github.com/looplab/fsm v1.0.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -60,12 +66,17 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_golang v1.18.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.27.0 // indirect + golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/go.sum b/go.sum index dc75a7d..fc53760 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,15 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb h1:Iw8CXMw+jP+BEChtwLuNK1/hhFYHlMn5UT4rSCZO+sM= -github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb/go.mod h1:N5oF1SL8ZznKnwu227/wUhatP3EzWMb5wHgwZwNTAzw= +github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38 h1:Tk+Ug6moEvi4Ltnj+nz3FIP4kxmwHGq9J5Z2WoXwT44= +github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38/go.mod h1:N5oF1SL8ZznKnwu227/wUhatP3EzWMb5wHgwZwNTAzw= github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08 h1:uV9X13Of+UNRBhvhHzf6A1KT2j85emuVMNEua5rO8g4= github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08/go.mod h1:FQMPzj6bVpw0SLjDxVSdbp8DaQGeUrVD4WdxyI3go9Q= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -61,6 +65,8 @@ github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4C github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= +github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -94,6 +100,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= @@ -115,11 +123,15 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU= +github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -159,6 +171,14 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/database/repository/mock_repository.go b/internal/database/repository/mock_repository.go index d429a72..5a5260c 100644 --- a/internal/database/repository/mock_repository.go +++ b/internal/database/repository/mock_repository.go @@ -83,17 +83,17 @@ func (mr *MockRepositoryMockRecorder) DeletePartitionsNotInIDs(arg0, arg1, arg2 } // DeleteQueuesNotInIDs mocks base method. -func (m *MockRepository) DeleteQueuesNotInIDs(arg0 context.Context, arg1 string, arg2 []string, arg3 int64) error { +func (m *MockRepository) DeleteQueuesNotInIDs(arg0 context.Context, arg1 []string, arg2 int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteQueuesNotInIDs", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "DeleteQueuesNotInIDs", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // DeleteQueuesNotInIDs indicates an expected call of DeleteQueuesNotInIDs. -func (mr *MockRepositoryMockRecorder) DeleteQueuesNotInIDs(arg0, arg1, arg2, arg3 any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) DeleteQueuesNotInIDs(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueuesNotInIDs", reflect.TypeOf((*MockRepository)(nil).DeleteQueuesNotInIDs), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueuesNotInIDs", reflect.TypeOf((*MockRepository)(nil).DeleteQueuesNotInIDs), arg0, arg1, arg2) } // GetAllApplications mocks base method. diff --git a/internal/database/repository/queue.go b/internal/database/repository/queue.go index 34aa22c..7edea6a 100644 --- a/internal/database/repository/queue.go +++ b/internal/database/repository/queue.go @@ -331,11 +331,11 @@ ORDER BY id DESC return queues, nil } -func (s *PostgresRepository) DeleteQueuesNotInIDs(ctx context.Context, partitionID string, ids []string, deletedAtNano int64) error { +func (s *PostgresRepository) DeleteQueuesNotInIDs(ctx context.Context, ids []string, deletedAtNano int64) error { const q = ` UPDATE queues SET deleted_at_nano = @deleted_at_nano -WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids)) AND partition_id = @partition_id +WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids)) ` _, err := s.dbpool.Exec( @@ -344,7 +344,6 @@ WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids)) AND partition_id = @parti pgx.NamedArgs{ "ids": ids, "deleted_at_nano": deletedAtNano, - "partition_id": partitionID, }, ) diff --git a/internal/database/repository/repository.go b/internal/database/repository/repository.go index 2b70643..3d49259 100644 --- a/internal/database/repository/repository.go +++ b/internal/database/repository/repository.go @@ -33,5 +33,5 @@ type Repository interface { UpdateQueue(ctx context.Context, queue *model.Queue) error GetAllQueues(ctx context.Context) ([]*model.Queue, error) GetQueuesInPartition(ctx context.Context, partitionID string) ([]*model.Queue, error) - DeleteQueuesNotInIDs(ctx context.Context, partitionID string, ids []string, deletedAtNano int64) error + DeleteQueuesNotInIDs(ctx context.Context, ids []string, deletedAtNano int64) error } diff --git a/internal/yunikorn/client.go b/internal/yunikorn/client.go index 7210457..8c40512 100644 --- a/internal/yunikorn/client.go +++ b/internal/yunikorn/client.go @@ -4,6 +4,7 @@ import ( "context" "net/http" + "github.com/G-Research/yunikorn-core/pkg/webservice" "github.com/G-Research/yunikorn-core/pkg/webservice/dao" ) @@ -11,6 +12,7 @@ import ( // //go:generate mockgen -destination=mock_client.go -package=yunikorn github.com/G-Research/unicorn-history-server/internal/yunikorn Client type Client interface { + GetFullStateDump(ctx context.Context) (*webservice.AggregatedStateInfo, error) GetPartitions(ctx context.Context) ([]*dao.PartitionInfo, error) GetPartitionQueues(ctx context.Context, partitionName string) (*dao.PartitionQueueDAOInfo, error) GetPartitionQueue(ctx context.Context, partitionName, queueName string) (*dao.PartitionQueueDAOInfo, error) diff --git a/internal/yunikorn/event_handler.go b/internal/yunikorn/event_handler.go index fc3e844..f40a0a9 100644 --- a/internal/yunikorn/event_handler.go +++ b/internal/yunikorn/event_handler.go @@ -89,12 +89,21 @@ func (s *Service) handleQueueEvent(ctx context.Context, ev *si.EventRecord) { return } + // Sync partitions before syncing queues + partitions, err := s.client.GetPartitions(ctx) + if err != nil { + logger.Errorf("could not get partitions: %v", err) + } + err = s.syncPartitions(ctx, partitions) + if err != nil { + logger.Errorf("could not sync partitions: %v", err) + } + isNew := ev.GetEventChangeType() == si.EventRecord_ADD && (ev.GetEventChangeDetail() == si.EventRecord_DETAILS_NONE || ev.GetEventChangeDetail() == si.EventRecord_QUEUE_DYNAMIC) var queue *model.Queue if isNew { - s.partitionAccumulator.add(ev) queue = &model.Queue{ Metadata: model.Metadata{ CreatedAtNano: ev.TimestampNano, @@ -110,7 +119,7 @@ func (s *Service) handleQueueEvent(ctx context.Context, ev *si.EventRecord) { return } - queue, err := s.repo.GetQueue(ctx, daoQueue.ID) + queue, err = s.repo.GetQueue(ctx, daoQueue.ID) if err != nil { logger.Errorf("could not get queue by partition name and queue name: %v", err) return diff --git a/internal/yunikorn/mock_client.go b/internal/yunikorn/mock_client.go index 0b5730b..01012d5 100644 --- a/internal/yunikorn/mock_client.go +++ b/internal/yunikorn/mock_client.go @@ -14,6 +14,7 @@ import ( http "net/http" reflect "reflect" + webservice "github.com/G-Research/yunikorn-core/pkg/webservice" dao "github.com/G-Research/yunikorn-core/pkg/webservice/dao" gomock "go.uber.org/mock/gomock" ) @@ -116,6 +117,21 @@ func (mr *MockClientMockRecorder) GetEventStream(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEventStream", reflect.TypeOf((*MockClient)(nil).GetEventStream), arg0) } +// GetFullStateDump mocks base method. +func (m *MockClient) GetFullStateDump(arg0 context.Context) (*webservice.AggregatedStateInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFullStateDump", arg0) + ret0, _ := ret[0].(*webservice.AggregatedStateInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetFullStateDump indicates an expected call of GetFullStateDump. +func (mr *MockClientMockRecorder) GetFullStateDump(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFullStateDump", reflect.TypeOf((*MockClient)(nil).GetFullStateDump), arg0) +} + // GetPartitionNodes mocks base method. func (m *MockClient) GetPartitionNodes(arg0 context.Context, arg1 string) ([]*dao.NodeDAOInfo, error) { m.ctrl.T.Helper() diff --git a/internal/yunikorn/rest.go b/internal/yunikorn/rest.go index 6c67244..6d3ad84 100644 --- a/internal/yunikorn/rest.go +++ b/internal/yunikorn/rest.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/G-Research/unicorn-history-server/internal/config" + "github.com/G-Research/yunikorn-core/pkg/webservice" "github.com/G-Research/unicorn-history-server/internal/log" @@ -19,6 +20,7 @@ const ( endpointPartitions = "/ws/v1/partitions" endpointAppsHistory = "/ws/v1/history/apps" endpointContainersHistory = "/ws/v1/history/containers" + endpointFullStateDump = "/ws/v1/fullstatedump" endpointHealthcheck = "/ws/v1/scheduler/healthcheck" ) @@ -41,6 +43,24 @@ func NewRESTClient(cfg *config.YunikornConfig) *RESTClient { } } +func (c *RESTClient) GetFullStateDump(ctx context.Context) (*webservice.AggregatedStateInfo, error) { + resp, err := c.get(ctx, endpointFullStateDump) + if err != nil { + return nil, err + } + defer closeBody(ctx, resp) + + if resp.StatusCode != 200 { + return nil, handleNonOKResponse(ctx, resp) + } + + var state webservice.AggregatedStateInfo + if err = unmarshallBody(ctx, resp, &state); err != nil { + return nil, err + } + return &state, nil +} + func (c *RESTClient) GetPartitions(ctx context.Context) ([]*dao.PartitionInfo, error) { resp, err := c.get(ctx, endpointPartitions) if err != nil { diff --git a/internal/yunikorn/service.go b/internal/yunikorn/service.go index dcc3078..2b6cdb9 100644 --- a/internal/yunikorn/service.go +++ b/internal/yunikorn/service.go @@ -7,7 +7,6 @@ import ( "time" "github.com/G-Research/yunikorn-core/pkg/webservice/dao" - "github.com/G-Research/yunikorn-scheduler-interface/lib/go/si" "github.com/oklog/run" "github.com/G-Research/unicorn-history-server/internal/database/repository" @@ -21,8 +20,6 @@ type Service struct { client Client // eventHandler is a function that handles events from the Yunikorn event stream. eventHandler EventHandler - // partitionAccumulator accumulates new queue events and synchronizes partitions after a certain interval. - partitionAccumulator *accumulator // appMap is a map of application IDs to their respective DAOs. appMap map[string]*dao.ApplicationDAOInfo // workqueue processes jobs which store data in database during data sync and retries them with exponential backoff. @@ -40,17 +37,6 @@ func NewService(repository repository.Repository, eventRepository repository.Eve workqueue: workqueue.NewWorkQueue(workqueue.WithName("yunikorn_data_sync")), } s.eventHandler = s.handleEvent - s.partitionAccumulator = newAccumulator( - func(ctx context.Context, event []*si.EventRecord) { - logger := log.FromContext(ctx) - _, err := s.syncPartitions(ctx) - if err != nil { - logger.Errorf("error syncing partitions: %v", err) - return - } - }, - 2*time.Second, - ) for _, opt := range opts { opt(s) } @@ -66,26 +52,28 @@ func (s *Service) Run(ctx context.Context) error { }, func(err error) {}, ) - g.Add(func() error { - return s.partitionAccumulator.run(ctx) - }, func(err error) {}, - ) - - partitions, err := s.syncPartitions(ctx) + fullState, err := s.client.GetFullStateDump(ctx) if err != nil { - return err + return fmt.Errorf("could not get full state dump: %v", err) + } + + if err := s.syncPartitions(ctx, fullState.Partitions); err != nil { + return fmt.Errorf("error syncing partitions: %v", err) } - if err := s.syncQueues(ctx, partitions); err != nil { + if err := s.syncQueues(ctx, fullState.Queues); err != nil { return fmt.Errorf("error syncing queues: %v", err) } - if err := s.syncApplications(ctx); err != nil { + if err := s.syncApplications(ctx, fullState.Applications); err != nil { return fmt.Errorf("error syncing applications: %v", err) } - if err := s.syncNodes(ctx, partitions); err != nil { + if err := s.syncNodes(ctx, fullState.Nodes); err != nil { return fmt.Errorf("error syncing nodes: %v", err) } - if err := s.syncHistory(ctx); err != nil { - return fmt.Errorf("error syncing app and container history: %v", err) + if err := s.syncAppHistory(ctx, fullState.AppHistory); err != nil { + return fmt.Errorf("error syncing app history: %v", err) + } + if err := s.syncContainerHistory(ctx, fullState.ContainerHistory); err != nil { + return fmt.Errorf("error syncing container history: %v", err) } g.Add(func() error { diff --git a/internal/yunikorn/sync.go b/internal/yunikorn/sync.go index 28294d1..f835f41 100644 --- a/internal/yunikorn/sync.go +++ b/internal/yunikorn/sync.go @@ -9,19 +9,12 @@ import ( "github.com/G-Research/yunikorn-core/pkg/webservice/dao" "github.com/oklog/ulid/v2" - "github.com/G-Research/unicorn-history-server/internal/log" "github.com/G-Research/unicorn-history-server/internal/model" "github.com/G-Research/unicorn-history-server/internal/util" ) // syncPartitions fetches partitions from the Yunikorn API and syncs them into the database -func (s *Service) syncPartitions(ctx context.Context) ([]*model.Partition, error) { - // Get partitions from Yunikorn API and upsert into DB - partitions, err := s.client.GetPartitions(ctx) - if err != nil { - return nil, fmt.Errorf("could not get partitions: %v", err) - } - +func (s *Service) syncPartitions(ctx context.Context, partitions []*dao.PartitionInfo) error { ids := make([]string, 0, len(partitions)) for _, p := range partitions { ids = append(ids, p.ID) @@ -29,10 +22,9 @@ func (s *Service) syncPartitions(ctx context.Context) ([]*model.Partition, error now := time.Now().UnixNano() if err := s.repo.DeletePartitionsNotInIDs(ctx, ids, now); err != nil { - return nil, fmt.Errorf("could not delete partitions not in IDs: %w", err) + return fmt.Errorf("could not delete partitions not in IDs: %w", err) } - result := make([]*model.Partition, 0, len(partitions)) for _, p := range partitions { current, err := s.repo.GetPartitionByID(ctx, p.ID) fmt.Printf("Getting partition resulted in current: %+v, err: %v\n", current, err) @@ -46,48 +38,24 @@ func (s *Service) syncPartitions(ctx context.Context) ([]*model.Partition, error } if err := s.repo.InsertPartition(ctx, partition); err != nil { - return nil, fmt.Errorf("could not insert partition: %w", err) + return fmt.Errorf("could not insert partition: %w", err) } - - result = append(result, partition) continue } current.MergeFrom(p) if err := s.repo.UpdatePartition(ctx, current); err != nil { - return nil, fmt.Errorf("could not update partition: %w", err) + return fmt.Errorf("could not update partition: %w", err) } - - result = append(result, current) } - - return result, nil + return nil } -// syncQueues fetches queues for each partition and upserts them into the database -func (s *Service) syncQueues(ctx context.Context, partitions []*model.Partition) error { - logger := log.FromContext(ctx) - +func (s *Service) syncQueues(ctx context.Context, clientQueues []dao.PartitionQueueDAOInfo) error { var errs []error - for _, p := range partitions { - logger.Info("syncing queues for partition", "partition", p.Name) - err := s.syncPartitionQueues(ctx, p) - if err != nil { - errs = append(errs, fmt.Errorf("syncing queues for partition %q failed: %v", p.Name, err)) - } - } - return errors.Join(errs...) -} - -func (s *Service) syncPartitionQueues(ctx context.Context, partition *model.Partition) error { - clientQueues, err := s.client.GetPartitionQueues(ctx, partition.Name) - if err != nil { - return fmt.Errorf("could not get queues for partition %s: %v", partition.Name, err) - } - - queues := flattenQueues([]*dao.PartitionQueueDAOInfo{clientQueues}) + queues := flattenQueues(util.ToPtrSlice(clientQueues)) ids := make([]string, 0, len(queues)) for _, q := range queues { @@ -95,7 +63,7 @@ func (s *Service) syncPartitionQueues(ctx context.Context, partition *model.Part } now := time.Now().UnixNano() - if err := s.repo.DeleteQueuesNotInIDs(ctx, partition.Name, ids, now); err != nil { + if err := s.repo.DeleteQueuesNotInIDs(ctx, ids, now); err != nil { return fmt.Errorf("could not delete queues not in IDs: %w", err) } @@ -109,18 +77,17 @@ func (s *Service) syncPartitionQueues(ctx context.Context, partition *model.Part PartitionQueueDAOInfo: *q, } if err := s.repo.InsertQueue(ctx, queue); err != nil { - return fmt.Errorf("could not insert queue: %w", err) + errs = append(errs, fmt.Errorf("could not insert queue: %w", err)) } continue } current.MergeFrom(q) if err := s.repo.UpdateQueue(ctx, current); err != nil { - return fmt.Errorf("could not update queue: %w", err) + errs = append(errs, fmt.Errorf("could not update queue: %w", err)) } } - - return nil + return errors.Join(errs...) } // flattenQueues returns a list of all queues in the hierarchy in a flat array. @@ -131,24 +98,16 @@ func flattenQueues(qs []*dao.PartitionQueueDAOInfo) []*dao.PartitionQueueDAOInfo for _, q := range qs { queues = append(queues, q) if len(q.Children) > 0 { - // update partitionName for children #148 - for i := range q.Children { - q.Children[i].PartitionID = q.PartitionID - } queues = append(queues, flattenQueues(util.ToPtrSlice(q.Children))...) } } return queues } -func (s *Service) syncNodes(ctx context.Context, partitions []*model.Partition) error { +func (s *Service) syncNodes(ctx context.Context, daoNodes []*dao.NodesDAOInfo) error { var errs []error - for _, p := range partitions { - nodes, err := s.client.GetPartitionNodes(ctx, p.Name) - if err != nil { - return fmt.Errorf("could not get nodes for partition %s: %v", p.Name, err) - } - + for _, nodesInfo := range daoNodes { + nodes := nodesInfo.Nodes ids := make([]string, 0, len(nodes)) for _, n := range nodes { ids = append(ids, n.ID) @@ -184,11 +143,7 @@ func (s *Service) syncNodes(ctx context.Context, partitions []*model.Partition) } // syncApplications fetches applications for each queue and upserts them into the database -func (s *Service) syncApplications(ctx context.Context) error { - applications, err := s.client.GetApplications(ctx, "", "") - if err != nil { - return fmt.Errorf("could not get applications: %v", err) - } +func (s *Service) syncApplications(ctx context.Context, applications []*dao.ApplicationDAOInfo) error { ids := make([]string, 0, len(applications)) for _, a := range applications { @@ -224,23 +179,13 @@ func (s *Service) syncApplications(ctx context.Context) error { return nil } -func (s *Service) syncHistory(ctx context.Context) error { - - appsHistory, err := s.client.GetAppsHistory(ctx) - if err != nil { - return fmt.Errorf("could not get apps history: %w", err) - } - containersHistory, err := s.client.GetContainersHistory(ctx) - if err != nil { - return fmt.Errorf("could not get containers history: %w", err) - } - - now := time.Now().UnixNano() +func (s *Service) syncAppHistory(ctx context.Context, appsHistory []*dao.ApplicationHistoryDAOInfo) error { var errs []error + nowNano := time.Now().UnixNano() for _, ah := range appsHistory { history := &model.AppHistory{ Metadata: model.Metadata{ - CreatedAtNano: now, + CreatedAtNano: nowNano, }, ID: ulid.Make().String(), ApplicationHistoryDAOInfo: *ah, @@ -249,11 +194,16 @@ func (s *Service) syncHistory(ctx context.Context) error { errs = append(errs, fmt.Errorf("could not insert app history: %v", err)) } } + return errors.Join(errs...) +} +func (s *Service) syncContainerHistory(ctx context.Context, containersHistory []*dao.ContainerHistoryDAOInfo) error { + var errs []error + nowNano := time.Now().UnixNano() for _, ch := range containersHistory { history := &model.ContainerHistory{ Metadata: model.Metadata{ - CreatedAtNano: now, + CreatedAtNano: nowNano, }, ID: ulid.Make().String(), ContainerHistoryDAOInfo: *ch, @@ -262,6 +212,5 @@ func (s *Service) syncHistory(ctx context.Context) error { errs = append(errs, fmt.Errorf("could not insert container history: %v", err)) } } - return errors.Join(errs...) } diff --git a/internal/yunikorn/sync_int_test.go b/internal/yunikorn/sync_int_test.go index f98802a..6dfa0e2 100644 --- a/internal/yunikorn/sync_int_test.go +++ b/internal/yunikorn/sync_int_test.go @@ -2,10 +2,6 @@ package yunikorn import ( "context" - "net/http" - "net/http/httptest" - "sort" - "strings" "testing" "time" @@ -38,28 +34,19 @@ func TestSync_syncNodes_Integration(t *testing.T) { tests := []struct { name string - setup func() *httptest.Server - partitions []*model.Partition + stateNodes []*dao.NodesDAOInfo existingNodes []*model.Node expectedNodes []*model.Node wantErr bool }{ { name: "Sync nodes with no existing nodes", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - response := []dao.NodeDAOInfo{ - {ID: "1", NodeID: "node-1", Partition: partitionName, HostName: "host-1"}, - {ID: "2", NodeID: "node-2", Partition: partitionName, HostName: "host-2"}, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ + stateNodes: []*dao.NodesDAOInfo{ { - PartitionInfo: dao.PartitionInfo{ - Name: "default", + PartitionName: "default", + Nodes: []*dao.NodeDAOInfo{ + {ID: "1", NodeID: "node-1", Partition: "default", HostName: "host-1"}, + {ID: "2", NodeID: "node-2", Partition: "default", HostName: "host-2"}, }, }, }, @@ -72,19 +59,11 @@ func TestSync_syncNodes_Integration(t *testing.T) { }, { name: "Sync nodes with existing nodes in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - response := []dao.NodeDAOInfo{ - {ID: "2", NodeID: "node-2", Partition: partitionName, HostName: "host-2-updated"}, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ + stateNodes: []*dao.NodesDAOInfo{ { - PartitionInfo: dao.PartitionInfo{ - Name: "default", + PartitionName: "default", + Nodes: []*dao.NodeDAOInfo{ + {ID: "2", NodeID: "node-2", Partition: "default", HostName: "host-2-updated"}, }, }, }, @@ -116,13 +95,9 @@ func TestSync_syncNodes_Integration(t *testing.T) { require.NoError(t, err) } - ts := tt.setup() - defer ts.Close() - - client := NewRESTClient(getMockServerYunikornConfig(t, ts.URL)) - s := NewService(repo, eventRepository, client) + s := NewService(repo, eventRepository, nil) - err := s.syncNodes(ctx, tt.partitions) + err := s.syncNodes(ctx, tt.stateNodes) if tt.wantErr { require.Error(t, err) return @@ -159,49 +134,36 @@ func TestSync_syncQueues_Integration(t *testing.T) { tests := []struct { name string - setup func() *httptest.Server - partitions []*model.Partition + stateQueues []dao.PartitionQueueDAOInfo existingQueues []*model.Queue expected []*model.Queue wantErr bool }{ { name: "Sync queues with no existing queues", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - response := dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-1.1", - PartitionID: partitionName, - }, - { - ID: "4", - QueueName: "root.child-1.2", - PartitionID: partitionName, - }, + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + }, + { + ID: "4", + QueueName: "root.child-1.2", + PartitionID: "1", }, }, }, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ - { - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", }, }, }, @@ -240,38 +202,25 @@ func TestSync_syncQueues_Integration(t *testing.T) { }, { name: "Sync queues with existing queues in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - - response := dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: partitionName, - }, - { - ID: "3", - QueueName: "root.child-2", - PartitionID: partitionName, - }, - }, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ - { - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", + stateQueues: []dao.PartitionQueueDAOInfo{{ + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", }, }, }, + }, + existingQueues: []*model.Queue{ { Metadata: model.Metadata{ @@ -311,29 +260,17 @@ func TestSync_syncQueues_Integration(t *testing.T) { }, { name: "Sync queues when queue is deleted", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - response := dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-2", - PartitionID: partitionName, - }, - }, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ + stateQueues: []dao.PartitionQueueDAOInfo{ { - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, }, }, }, @@ -377,65 +314,41 @@ func TestSync_syncQueues_Integration(t *testing.T) { }, wantErr: false, }, - { - name: "Sync queues with HTTP error", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "internal server error", http.StatusInternalServerError) - })) - }, - partitions: []*model.Partition{{ - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", - }, - }}, - existingQueues: nil, - expected: nil, - wantErr: true, - }, { name: "Sync queues with multiple partitions", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - response := dao.PartitionQueueDAOInfo{ - ID: partitionName + "1", - QueueName: "root", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: partitionName + "2", - QueueName: "root.child-1", - PartitionID: partitionName, - }, - { - ID: partitionName + "3", - QueueName: "root.child-2", - PartitionID: partitionName, - }, - }, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{ - { - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", - }, - }, + stateQueues: []dao.PartitionQueueDAOInfo{ { - PartitionInfo: dao.PartitionInfo{ - ID: "2", - Name: "2", + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + }, + { + ID: "3", + QueueName: "root.child-2", + PartitionID: "1", + }, }, }, { - PartitionInfo: dao.PartitionInfo{ - ID: "3", - Name: "3", + ID: "4", + QueueName: "root", + PartitionID: "2", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "5", + QueueName: "root.child-1", + PartitionID: "2", + }, + { + ID: "6", + QueueName: "root.child-2", + PartitionID: "2", + }, }, }, }, @@ -443,117 +356,85 @@ func TestSync_syncQueues_Integration(t *testing.T) { expected: []*model.Queue{ { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "11", + ID: "1", QueueName: "root", PartitionID: "1", }, }, { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "12", + ID: "2", QueueName: "root.child-1", PartitionID: "1", }, }, { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "13", + ID: "3", QueueName: "root.child-2", PartitionID: "1", }, }, { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "21", + ID: "4", QueueName: "root", PartitionID: "2", }, }, { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "22", + ID: "5", QueueName: "root.child-1", PartitionID: "2", }, }, { PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "23", + ID: "6", QueueName: "root.child-2", PartitionID: "2", }, }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "31", - QueueName: "root", - PartitionID: "3", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "32", - QueueName: "root.child-1", - PartitionID: "3", - }, - }, - { - PartitionQueueDAOInfo: dao.PartitionQueueDAOInfo{ - ID: "33", - QueueName: "root.child-2", - PartitionID: "3", - }, - }, }, wantErr: false, }, { name: "Sync queues with deeply nested queues", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - partitionName := extractPartitionNameFromURL(r.URL.Path) - - response := dao.PartitionQueueDAOInfo{ - ID: "1", - QueueName: "root", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "2", - QueueName: "root.child-1", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "3", - QueueName: "root.child-1.1", - PartitionID: partitionName, - Children: []dao.PartitionQueueDAOInfo{ - { - ID: "4", - QueueName: "root.child-1.1.1", - PartitionID: partitionName, - }, - { - ID: "5", - QueueName: "root.child-1.1.2", - PartitionID: partitionName, - }, + stateQueues: []dao.PartitionQueueDAOInfo{ + { + ID: "1", + QueueName: "root", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "2", + QueueName: "root.child-1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "3", + QueueName: "root.child-1.1", + PartitionID: "1", + Children: []dao.PartitionQueueDAOInfo{ + { + ID: "4", + QueueName: "root.child-1.1.1", + PartitionID: "1", + }, + { + ID: "5", + QueueName: "root.child-1.1.2", + PartitionID: "1", }, }, }, }, }, - } - writeResponse(t, w, response) - })) - }, - partitions: []*model.Partition{{ - PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", + }, }, - }}, + }, existingQueues: nil, expected: []*model.Queue{ { @@ -609,21 +490,9 @@ func TestSync_syncQueues_Integration(t *testing.T) { require.NoError(t, err) } - ts := tt.setup() - defer ts.Close() + s := NewService(repo, eventRepository, nil) - client := NewRESTClient(getMockServerYunikornConfig(t, ts.URL)) - s := NewService(repo, eventRepository, client) - - // Create a cancellable context for this specific service - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Start the service in a goroutine - go func() { - _ = s.Run(ctx) - }() - - err := s.syncQueues(context.Background(), tt.partitions) + err := s.syncQueues(context.Background(), tt.stateQueues) if tt.wantErr { require.Error(t, err) return @@ -656,58 +525,49 @@ func TestSync_syncPartitions_Integration(t *testing.T) { tests := []struct { name string - setup func() *httptest.Server + statePartitions []*dao.PartitionInfo existingPartitions []*model.Partition expected []*model.Partition wantErr bool }{ { name: "Sync partition with no existing partitions in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - response := []*dao.PartitionInfo{ - { - ID: "1", - Name: "1", - }, - { - ID: "2", - Name: "2", - }, - } - writeResponse(t, w, response) - })) + statePartitions: []*dao.PartitionInfo{ + { + ID: "1", + Name: "1", + }, + { + ID: "2", + Name: "2", + }, }, existingPartitions: nil, expected: []*model.Partition{ { PartitionInfo: dao.PartitionInfo{ - ID: "1", - Name: "1", + ID: "2", + Name: "2", }, }, { PartitionInfo: dao.PartitionInfo{ - ID: "2", - Name: "2", + ID: "1", + Name: "1", }, }, }, wantErr: false, }, { - name: "Should mark secondary partition as deleted in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - response := []*dao.PartitionInfo{ - { - ID: "1", - Name: "1", - }, - } - writeResponse(t, w, response) - })) + name: "Should mark partition 2 as deleted in DB", + statePartitions: []*dao.PartitionInfo{ + { + ID: "1", + Name: "1", + }, }, + existingPartitions: []*model.Partition{ { Metadata: model.Metadata{ @@ -741,6 +601,7 @@ func TestSync_syncPartitions_Integration(t *testing.T) { }, wantErr: false, }, + // TODO: test syncPartition when statePartitions is nil } for _, tt := range tests { @@ -756,56 +617,37 @@ func TestSync_syncPartitions_Integration(t *testing.T) { require.NoError(t, err) } - ts := tt.setup() - defer ts.Close() - - client := NewRESTClient(getMockServerYunikornConfig(t, ts.URL)) - s := NewService(repo, eventRepository, client) + s := NewService(repo, eventRepository, nil) // Start the service ctx, cancel := context.WithCancel(context.Background()) defer cancel() - partitions, err := s.syncPartitions(ctx) + err := s.syncPartitions(ctx, tt.statePartitions) if tt.wantErr { require.Error(t, err) return } require.NoError(t, err) - sort.Slice(partitions, func(i, j int) bool { - return partitions[i].Name < partitions[j].Name - }) - var partitionsInDB []*model.Partition partitionsInDB, err = s.repo.GetAllPartitions(ctx, repository.PartitionFilters{}) require.NoError(t, err) - sort.Slice(partitionsInDB, func(i, j int) bool { - return partitionsInDB[i].Name < partitionsInDB[j].Name - }) - - i := 0 - j := 0 - for i < len(partitions) && j < len(partitionsInDB) { - newPartition := partitions[i] - dbPartition := partitionsInDB[j] - if newPartition.ID == dbPartition.ID { - assert.Equal(t, newPartition.PartitionInfo, dbPartition.PartitionInfo) - assert.Nil(t, newPartition.DeletedAtNano) - i++ - j++ - continue + for _, dbPartition := range partitionsInDB { + found := false + for _, expectedPartition := range tt.expected { + if dbPartition.ID == expectedPartition.ID { + assert.Equal(t, expectedPartition.PartitionInfo, dbPartition.PartitionInfo) + assert.Nil(t, expectedPartition.DeletedAtNano) + found = true + } + } + if !found { + assert.NotNil(t, dbPartition.DeletedAtNano) } - assert.NotNil(t, dbPartition.DeletedAtNano) - j++ } - assert.Equal(t, i, len(partitions)) - assert.Equal(t, len(partitions), i) - for i := j; i < len(partitionsInDB); i++ { - assert.NotNil(t, partitionsInDB[i].DeletedAtNano) - } }) } } @@ -825,7 +667,7 @@ func TestSync_syncApplications_Integration(t *testing.T) { now := time.Now().UnixNano() tests := []struct { name string - setup func() *httptest.Server + stateApplications []*dao.ApplicationDAOInfo existingApplications []*model.Application expectedLive []*model.Application expectedDeleted []*model.Application @@ -833,14 +675,9 @@ func TestSync_syncApplications_Integration(t *testing.T) { }{ { name: "Sync applications with no existing applications in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - response := []*dao.ApplicationDAOInfo{ - {ID: "1", ApplicationID: "app-1"}, - {ID: "2", ApplicationID: "app-2"}, - } - writeResponse(t, w, response) - })) + stateApplications: []*dao.ApplicationDAOInfo{ + {ID: "1", ApplicationID: "app-1"}, + {ID: "2", ApplicationID: "app-2"}, }, existingApplications: nil, expectedLive: []*model.Application{ @@ -867,13 +704,8 @@ func TestSync_syncApplications_Integration(t *testing.T) { }, { name: "Should mark application as deleted in DB", - setup: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - response := []*dao.ApplicationDAOInfo{ - {ID: "1", ApplicationID: "app-1"}, - } - writeResponse(t, w, response) - })) + stateApplications: []*dao.ApplicationDAOInfo{ + {ID: "1", ApplicationID: "app-1"}, }, existingApplications: []*model.Application{ { @@ -934,20 +766,13 @@ func TestSync_syncApplications_Integration(t *testing.T) { require.NoError(t, err) } - ts := tt.setup() - defer ts.Close() - - client := NewRESTClient(getMockServerYunikornConfig(t, ts.URL)) - s := NewService(repo, eventRepository, client) + s := NewService(repo, eventRepository, nil) // Start the service ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - _ = s.Run(ctx) - }() - err := s.syncApplications(ctx) + err := s.syncApplications(ctx, tt.stateApplications) if tt.wantErr { require.Error(t, err) return @@ -1000,16 +825,6 @@ func isQueuePresent(queuesInDB []*model.Queue, targetQueue *model.Queue) bool { return false } -// Helper function to extract partition name from the URL -func extractPartitionNameFromURL(urlPath string) string { - // Assume URL is like: /ws/v1/partition/{partitionName}/queues - parts := strings.Split(urlPath, "/") - if len(parts) > 4 { - return parts[4] - } - return "" -} - func setupDatabase(t *testing.T, ctx context.Context) (*pgxpool.Pool, repository.Repository, func()) { schema := database.CreateTestSchema(ctx, t) cfg := config.GetTestPostgresConfig()