diff --git a/store/store.go b/store/store.go index 4dd0fe10..74e2d487 100644 --- a/store/store.go +++ b/store/store.go @@ -376,6 +376,9 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // (1) Appends not to be blocked on long disk IO writes and underlying DB compactions // (2) Batching header writes func (s *Store[H]) flushLoop() { + // advance based on what we have on disk. + s.doAdvanceContiguousHead(context.Background(), s.Height()) + defer close(s.writesDn) ctx := context.Background() for headers := range s.writes { @@ -383,7 +386,7 @@ func (s *Store[H]) flushLoop() { s.pending.Append(headers...) // try to advance contiguousHead if we don't have gaps. // and notify waiters in heightSub. - s.advanceContiguousHead(ctx, headers...) + s.tryAdvanceContiguousHead(ctx, headers...) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -497,17 +500,19 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { } // try advance contiguous head based on already written headers. -func (s *Store[H]) advanceContiguousHead(ctx context.Context, headers ...H) { +func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen for _, h := range headers { s.heightSub.UnblockHeight(h.Height()) } currHead := s.contiguousHead.Load() - if currHead == nil { - return + if currHead != nil { + s.doAdvanceContiguousHead(ctx, (*currHead).Height()) } - currHeight := (*currHead).Height() +} + +func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) { prevHeight := currHeight // TODO(cristaloleg): benchmark this timeout or make it dynamic.