Skip to content

Commit

Permalink
use fullstatedump for initial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sudiptob2 committed Nov 8, 2024
1 parent afa19ef commit dd66a19
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 465 deletions.
13 changes: 12 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/G-Research/unicorn-history-server
go 1.22.4

require (
github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb
github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38
github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08
github.com/emicklei/go-restful-openapi/v2 v2.11.0
github.com/emicklei/go-restful/v3 v3.12.1
Expand All @@ -29,6 +29,8 @@ require (
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
Expand All @@ -39,6 +41,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand All @@ -50,22 +53,30 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/looplab/fsm v1.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
24 changes: 22 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb h1:Iw8CXMw+jP+BEChtwLuNK1/hhFYHlMn5UT4rSCZO+sM=
github.com/G-Research/yunikorn-core v0.0.0-20241104160512-441cd84df3cb/go.mod h1:N5oF1SL8ZznKnwu227/wUhatP3EzWMb5wHgwZwNTAzw=
github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38 h1:Tk+Ug6moEvi4Ltnj+nz3FIP4kxmwHGq9J5Z2WoXwT44=
github.com/G-Research/yunikorn-core v0.0.0-20241106152547-9cb4e802cd38/go.mod h1:N5oF1SL8ZznKnwu227/wUhatP3EzWMb5wHgwZwNTAzw=
github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08 h1:uV9X13Of+UNRBhvhHzf6A1KT2j85emuVMNEua5rO8g4=
github.com/G-Research/yunikorn-scheduler-interface v0.0.0-20241010085204-da837381ae08/go.mod h1:FQMPzj6bVpw0SLjDxVSdbp8DaQGeUrVD4WdxyI3go9Q=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -61,6 +65,8 @@ github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4C
github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down Expand Up @@ -94,6 +100,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
Expand All @@ -115,11 +123,15 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
Expand Down Expand Up @@ -159,6 +171,14 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
8 changes: 4 additions & 4 deletions internal/database/repository/mock_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions internal/database/repository/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ ORDER BY id DESC
return queues, nil
}

func (s *PostgresRepository) DeleteQueuesNotInIDs(ctx context.Context, partitionID string, ids []string, deletedAtNano int64) error {
func (s *PostgresRepository) DeleteQueuesNotInIDs(ctx context.Context, ids []string, deletedAtNano int64) error {
const q = `
UPDATE queues
SET deleted_at_nano = @deleted_at_nano
WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids)) AND partition_id = @partition_id
WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids))
`

_, err := s.dbpool.Exec(
Expand All @@ -344,7 +344,6 @@ WHERE deleted_at_nano IS NULL AND NOT (id = ANY(@ids)) AND partition_id = @parti
pgx.NamedArgs{
"ids": ids,
"deleted_at_nano": deletedAtNano,
"partition_id": partitionID,
},
)

Expand Down
2 changes: 1 addition & 1 deletion internal/database/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ type Repository interface {
UpdateQueue(ctx context.Context, queue *model.Queue) error
GetAllQueues(ctx context.Context) ([]*model.Queue, error)
GetQueuesInPartition(ctx context.Context, partitionID string) ([]*model.Queue, error)
DeleteQueuesNotInIDs(ctx context.Context, partitionID string, ids []string, deletedAtNano int64) error
DeleteQueuesNotInIDs(ctx context.Context, ids []string, deletedAtNano int64) error
}
2 changes: 2 additions & 0 deletions internal/yunikorn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"net/http"

"github.com/G-Research/yunikorn-core/pkg/webservice"
"github.com/G-Research/yunikorn-core/pkg/webservice/dao"
)

// Client defines the interface for interacting with the Yunikorn REST API.
//
//go:generate mockgen -destination=mock_client.go -package=yunikorn github.com/G-Research/unicorn-history-server/internal/yunikorn Client
type Client interface {
GetFullStateDump(ctx context.Context) (*webservice.AggregatedStateInfo, error)
GetPartitions(ctx context.Context) ([]*dao.PartitionInfo, error)
GetPartitionQueues(ctx context.Context, partitionName string) (*dao.PartitionQueueDAOInfo, error)
GetPartitionQueue(ctx context.Context, partitionName, queueName string) (*dao.PartitionQueueDAOInfo, error)
Expand Down
13 changes: 11 additions & 2 deletions internal/yunikorn/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,21 @@ func (s *Service) handleQueueEvent(ctx context.Context, ev *si.EventRecord) {
return
}

// Sync partitions before syncing queues
partitions, err := s.client.GetPartitions(ctx)
if err != nil {
logger.Errorf("could not get partitions: %v", err)
}
err = s.syncPartitions(ctx, partitions)
if err != nil {
logger.Errorf("could not sync partitions: %v", err)
}

isNew := ev.GetEventChangeType() == si.EventRecord_ADD &&
(ev.GetEventChangeDetail() == si.EventRecord_DETAILS_NONE || ev.GetEventChangeDetail() == si.EventRecord_QUEUE_DYNAMIC)

var queue *model.Queue
if isNew {
s.partitionAccumulator.add(ev)
queue = &model.Queue{
Metadata: model.Metadata{
CreatedAtNano: ev.TimestampNano,
Expand All @@ -110,7 +119,7 @@ func (s *Service) handleQueueEvent(ctx context.Context, ev *si.EventRecord) {
return
}

queue, err := s.repo.GetQueue(ctx, daoQueue.ID)
queue, err = s.repo.GetQueue(ctx, daoQueue.ID)
if err != nil {
logger.Errorf("could not get queue by partition name and queue name: %v", err)
return
Expand Down
16 changes: 16 additions & 0 deletions internal/yunikorn/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions internal/yunikorn/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"

"github.com/G-Research/unicorn-history-server/internal/config"
"github.com/G-Research/yunikorn-core/pkg/webservice"

"github.com/G-Research/unicorn-history-server/internal/log"

Expand All @@ -19,6 +20,7 @@ const (
endpointPartitions = "/ws/v1/partitions"
endpointAppsHistory = "/ws/v1/history/apps"
endpointContainersHistory = "/ws/v1/history/containers"
endpointFullStateDump = "/ws/v1/fullstatedump"
endpointHealthcheck = "/ws/v1/scheduler/healthcheck"
)

Expand All @@ -41,6 +43,24 @@ func NewRESTClient(cfg *config.YunikornConfig) *RESTClient {
}
}

func (c *RESTClient) GetFullStateDump(ctx context.Context) (*webservice.AggregatedStateInfo, error) {
resp, err := c.get(ctx, endpointFullStateDump)
if err != nil {
return nil, err
}
defer closeBody(ctx, resp)

if resp.StatusCode != 200 {
return nil, handleNonOKResponse(ctx, resp)
}

var state webservice.AggregatedStateInfo
if err = unmarshallBody(ctx, resp, &state); err != nil {
return nil, err
}
return &state, nil
}

func (c *RESTClient) GetPartitions(ctx context.Context) ([]*dao.PartitionInfo, error) {
resp, err := c.get(ctx, endpointPartitions)
if err != nil {
Expand Down
40 changes: 14 additions & 26 deletions internal/yunikorn/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/G-Research/yunikorn-core/pkg/webservice/dao"
"github.com/G-Research/yunikorn-scheduler-interface/lib/go/si"
"github.com/oklog/run"

"github.com/G-Research/unicorn-history-server/internal/database/repository"
Expand All @@ -21,8 +20,6 @@ type Service struct {
client Client
// eventHandler is a function that handles events from the Yunikorn event stream.
eventHandler EventHandler
// partitionAccumulator accumulates new queue events and synchronizes partitions after a certain interval.
partitionAccumulator *accumulator
// appMap is a map of application IDs to their respective DAOs.
appMap map[string]*dao.ApplicationDAOInfo
// workqueue processes jobs which store data in database during data sync and retries them with exponential backoff.
Expand All @@ -40,17 +37,6 @@ func NewService(repository repository.Repository, eventRepository repository.Eve
workqueue: workqueue.NewWorkQueue(workqueue.WithName("yunikorn_data_sync")),
}
s.eventHandler = s.handleEvent
s.partitionAccumulator = newAccumulator(
func(ctx context.Context, event []*si.EventRecord) {
logger := log.FromContext(ctx)
_, err := s.syncPartitions(ctx)
if err != nil {
logger.Errorf("error syncing partitions: %v", err)
return
}
},
2*time.Second,
)
for _, opt := range opts {
opt(s)
}
Expand All @@ -66,26 +52,28 @@ func (s *Service) Run(ctx context.Context) error {
}, func(err error) {},
)

g.Add(func() error {
return s.partitionAccumulator.run(ctx)
}, func(err error) {},
)

partitions, err := s.syncPartitions(ctx)
fullState, err := s.client.GetFullStateDump(ctx)
if err != nil {
return err
return fmt.Errorf("could not get full state dump: %v", err)
}

if err := s.syncPartitions(ctx, fullState.Partitions); err != nil {
return fmt.Errorf("error syncing partitions: %v", err)
}
if err := s.syncQueues(ctx, partitions); err != nil {
if err := s.syncQueues(ctx, fullState.Queues); err != nil {
return fmt.Errorf("error syncing queues: %v", err)
}
if err := s.syncApplications(ctx); err != nil {
if err := s.syncApplications(ctx, fullState.Applications); err != nil {
return fmt.Errorf("error syncing applications: %v", err)
}
if err := s.syncNodes(ctx, partitions); err != nil {
if err := s.syncNodes(ctx, fullState.Nodes); err != nil {
return fmt.Errorf("error syncing nodes: %v", err)
}
if err := s.syncHistory(ctx); err != nil {
return fmt.Errorf("error syncing app and container history: %v", err)
if err := s.syncAppHistory(ctx, fullState.AppHistory); err != nil {
return fmt.Errorf("error syncing app history: %v", err)
}
if err := s.syncContainerHistory(ctx, fullState.ContainerHistory); err != nil {
return fmt.Errorf("error syncing container history: %v", err)
}

g.Add(func() error {
Expand Down
Loading

0 comments on commit dd66a19

Please sign in to comment.