Skip to content

Commit

Permalink
history: handle gracefulstop when history is active
Browse files Browse the repository at this point in the history
When GracefulStop is called gRPC waits for current requests to finish
before closing. While this is generally the behavior we want, it is
not always same for the History.Listen endpoint. That endpoint is
usually open even if buildkit is not actively processing any builds,
because client may be waiting for new events.

The new logic is that if GracefulStop will happen, history will
close active listeners if there are no active builds. If there are
active builds then active listeners will be closed after all the
active builds have completed their finalizers.

Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Nov 15, 2024
1 parent 65f5dad commit b241581
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
5 changes: 3 additions & 2 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func main() {
return err
}

controller, err := newController(c, &cfg)
controller, err := newController(ctx, c, &cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -758,7 +758,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
return tlsConf, nil
}

func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) {
sessionManager, err := session.NewManager()
if err != nil {
return nil, err
Expand Down Expand Up @@ -851,6 +851,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
ContentStore: w.ContentStore(),
HistoryConfig: cfg.History,
GarbageCollect: w.GarbageCollect,
GracefulStop: ctx.Done(),
})
}

Expand Down
2 changes: 2 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Opt struct {
ContentStore *containerdsnapshot.Store
HistoryConfig *config.HistoryConfig
GarbageCollect func(context.Context) error
GracefulStop <-chan struct{}
}

type Controller struct { // TODO: ControlService
Expand All @@ -95,6 +96,7 @@ func NewController(opt Opt) (*Controller, error) {
ContentStore: opt.ContentStore,
CleanConfig: opt.HistoryConfig,
GarbageCollect: opt.GarbageCollect,
GracefulStop: opt.GracefulStop,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create history queue")
Expand Down
31 changes: 31 additions & 0 deletions solver/llbsolver/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type HistoryQueueOpt struct {
ContentStore *containerdsnapshot.Store
CleanConfig *config.HistoryConfig
GarbageCollect func(context.Context) error
GracefulStop <-chan struct{}
}

type HistoryQueue struct {
Expand Down Expand Up @@ -137,6 +138,16 @@ func NewHistoryQueue(opt HistoryQueueOpt) (*HistoryQueue, error) {
}
}()

go func() {
<-h.opt.GracefulStop
h.mu.Lock()
defer h.mu.Unlock()
// if active builds then close will happen in finalizer
if len(h.finalizers) == 0 && len(h.active) == 0 {
go h.ps.Close()
}
}()

return h, nil
}

Expand Down Expand Up @@ -637,6 +648,14 @@ func (h *HistoryQueue) AcquireFinalizer(ref string) (<-chan struct{}, func()) {
<-f.done
h.mu.Lock()
delete(h.finalizers, ref)
// if gracefulstop then release listeners after finalize
if len(h.finalizers) == 0 {
select {
case <-h.opt.GracefulStop:
go h.ps.Close()
default:
}
}
h.mu.Unlock()
}()
return trigger, sync.OnceFunc(func() {
Expand Down Expand Up @@ -1032,6 +1051,18 @@ func (p *pubsub[T]) Send(v T) {
p.mu.Unlock()
}

func (p *pubsub[T]) Close() {
p.mu.Lock()
channels := make([]*channel[T], 0, len(p.m))
for c := range p.m {
channels = append(channels, c)
}
p.mu.Unlock()
for _, c := range channels {
c.close()
}
}

type channel[T any] struct {
ps *pubsub[T]
ch chan T
Expand Down

0 comments on commit b241581

Please sign in to comment.