forked from superfly/litefs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
3821 lines (3249 loc) · 121 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package litefs
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/superfly/litefs/internal"
"github.com/superfly/ltx"
)
// WaitInterval is the time between checking if the DB has reached a position in DB.Wait().
const WaitInterval = 100 * time.Microsecond
// DB represents a SQLite database.
type DB struct {
store *Store // parent store
name string // name of database
path string // full on-disk path
pageSize uint32 // database page size, if known
pageN atomic.Uint32 // database size, in pages
pos atomic.Value // current tx position (Pos)
timestamp int64 // ms since epoch from last ltx
hwm atomic.Uint64 // high-water mark
mode atomic.Value // database journaling mode (rollback, wal)
// Halt lock prevents writes or checkpoints on the primary so that
// replica nodes can perform writes and send them back to the primary.
//
haltLockAndGuard atomic.Value // local halt lock & guard, if currently held
remoteHaltLock atomic.Value // remote halt lock, if currently held
chksums struct { // database page checksums
mu sync.Mutex
pages []ltx.Checksum // individual database page checksums
blocks []ltx.Checksum // aggregated database page checksums; grouped by ChecksumBlockSize
}
dirtyPageSet map[uint32]struct{}
wal struct {
offset int64 // offset of the start of the transaction
byteOrder binary.ByteOrder // determine by WAL header magic
salt1, salt2 uint32 // current WAL header salt values
chksum1, chksum2 uint32 // WAL checksum values at wal.offset
frameOffsets map[uint32]int64 // WAL frame offset of the last version of a given pgno before current tx
chksums map[uint32][]ltx.Checksum // wal page checksums
}
shmMu sync.Mutex // prevents updateSHM() from being called concurrently
updatingSHM atomic.Bool // marks when updateSHM is being called so SHM writes are prevented
// Collection of outstanding guard sets, protected by a mutex.
guardSets struct {
mu sync.Mutex
m map[uint64]*GuardSet
}
// SQLite database locks
pendingLock RWMutex
sharedLock RWMutex
reservedLock RWMutex
// SQLite WAL locks
writeLock RWMutex
ckptLock RWMutex
recoverLock RWMutex
read0Lock RWMutex
read1Lock RWMutex
read2Lock RWMutex
read3Lock RWMutex
read4Lock RWMutex
dmsLock RWMutex
// Returns the current time. Used for mocking time in tests.
Now func() time.Time
}
// NewDB returns a new instance of DB.
func NewDB(store *Store, name string, path string) *DB {
db := &DB{
store: store,
name: name,
path: path,
dirtyPageSet: make(map[uint32]struct{}),
Now: time.Now,
}
db.pos.Store(ltx.Pos{})
db.mode.Store(DBModeRollback)
db.haltLockAndGuard.Store((*haltLockAndGuard)(nil))
db.remoteHaltLock.Store((*HaltLock)(nil))
db.wal.frameOffsets = make(map[uint32]int64)
db.wal.chksums = make(map[uint32][]ltx.Checksum)
db.guardSets.m = make(map[uint64]*GuardSet)
return db
}
// Name of the database name.
func (db *DB) Name() string { return db.name }
// Store returns the store that the database is a member of.
func (db *DB) Store() *Store { return db.store }
// Path of the database's data directory.
func (db *DB) Path() string { return db.path }
// LTXDir returns the path to the directory of LTX transaction files.
func (db *DB) LTXDir() string { return filepath.Join(db.path, "ltx") }
// LTXPath returns the path of an LTX file.
func (db *DB) LTXPath(minTXID, maxTXID ltx.TXID) string {
return filepath.Join(db.LTXDir(), ltx.FormatFilename(minTXID, maxTXID))
}
// ReadLTXDir returns DirEntry for every LTX file.
func (db *DB) ReadLTXDir() ([]fs.DirEntry, error) {
ents, err := os.ReadDir(db.LTXDir())
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("readdir: %w", err)
}
for i := 0; i < len(ents); i++ {
if _, _, err := ltx.ParseFilename(ents[i].Name()); err != nil {
ents, i = append(ents[:i], ents[i+1:]...), i-1
}
}
// Ensure results are in sorted order.
sort.Slice(ents, func(i, j int) bool { return ents[i].Name() < ents[j].Name() })
return ents, nil
}
// DatabasePath returns the path to the underlying database file.
func (db *DB) DatabasePath() string { return filepath.Join(db.path, "database") }
// JournalPath returns the path to the underlying journal file.
func (db *DB) JournalPath() string { return filepath.Join(db.path, "journal") }
// WALPath returns the path to the underlying WAL file.
func (db *DB) WALPath() string { return filepath.Join(db.path, "wal") }
// SHMPath returns the path to the underlying shared memory file.
func (db *DB) SHMPath() string { return filepath.Join(db.path, "shm") }
// PageN returns the number of pages in the database.
func (db *DB) PageN() uint32 { return db.pageN.Load() }
// Pos returns the current transaction position of the database.
func (db *DB) Pos() ltx.Pos {
return db.pos.Load().(ltx.Pos)
}
// setPos sets the current transaction position of the database.
func (db *DB) setPos(pos ltx.Pos, ts int64) error {
db.pos.Store(pos)
atomic.StoreInt64(&db.timestamp, ts)
// Invalidate page cache.
if invalidator := db.store.Invalidator; invalidator != nil {
if err := invalidator.InvalidatePos(db); err != nil {
return fmt.Errorf("invalidate pos: %w", err)
}
}
// Update metrics.
dbTXIDMetricVec.WithLabelValues(db.name).Set(float64(pos.TXID))
return nil
}
// Timestamp is the timestamp from the last applied ltx.
func (db *DB) Timestamp() time.Time {
return time.UnixMilli(atomic.LoadInt64(&db.timestamp))
}
// HWM returns the current high-water mark from the backup service.
func (db *DB) HWM() ltx.TXID {
return ltx.TXID(db.hwm.Load())
}
// SetHWM sets the current high-water mark.
func (db *DB) SetHWM(txID ltx.TXID) {
db.hwm.Store(uint64(txID))
}
// Mode returns the journaling mode for the database (DBModeWAL or DBModeRollback).
func (db *DB) Mode() DBMode {
return db.mode.Load().(DBMode)
}
// AcquireHaltLock acquires the halt lock locally.
// This implicitly acquires locks required for locking & performs a checkpoint.
func (db *DB) AcquireHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error) {
if lockID == 0 {
return nil, fmt.Errorf("halt lock id required")
}
var msg string
TraceLog.Printf("[AcquireHaltLock(%s)]: lockID=%d", db.name, lockID)
defer func() {
TraceLog.Printf("[AcquireHaltLock.Done(%s)]: lockID=%d msg=%q %s", db.name, lockID, msg, errorKeyValue(retErr))
}()
acquireCtx, cancel := context.WithTimeout(ctx, db.store.HaltAcquireTimeout)
defer cancel()
// Acquire a write lock before setting the halt lock. This can cause a race
// by the replica when a FUSE call is interrupted and the call is retried.
// We check for the same lockID already being acquired and releasing if that
// is the case.
var currHaltLock HaltLock
guardSet, err := db.AcquireWriteLock(acquireCtx, func() error {
if curr := db.haltLockAndGuard.Load().(*haltLockAndGuard); curr != nil && curr.haltLock.ID == lockID {
msg = "lock-already-acquired"
currHaltLock = *curr.haltLock
return errHaltLockAlreadyAcquired
}
return nil
})
if err == errHaltLockAlreadyAcquired {
return &currHaltLock, nil
} else if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
guardSet.Unlock()
}
}()
// Perform a recovery to clear out journal & WAL files.
if err := db.recover(ctx); err != nil {
return nil, fmt.Errorf("recovery: %w", err)
}
// Generate a random identifier for the lock so it can be referenced by clients.
expires := time.Now().Add(db.store.HaltLockTTL)
haltLock := &HaltLock{
ID: lockID,
Pos: db.Pos(),
Expires: &expires,
}
// There shouldn't be an existing halt lock but clear it just in case.
prev := db.haltLockAndGuard.Load().(*haltLockAndGuard)
if prev != nil {
prev.guardSet.Unlock()
}
// Ensure we're swapping out the one we just unlocked so there's no race.
if !db.haltLockAndGuard.CompareAndSwap(prev, &haltLockAndGuard{
haltLock: haltLock,
guardSet: guardSet,
}) {
return nil, fmt.Errorf("halt lock conflict")
}
other := *haltLock
return &other, nil
}
// This is a marker error and should not be propagated to the client.
var errHaltLockAlreadyAcquired = errors.New("litefs: halt lock already acquired")
// ReleaseHaltLock releases a halt lock by identifier. If the current halt lock
// does not match the identifier then it has already been released.
func (db *DB) ReleaseHaltLock(ctx context.Context, id int64) {
TraceLog.Printf("[ReleaseHaltLock(%s)]:", db.name)
curr := db.haltLockAndGuard.Load().(*haltLockAndGuard)
if curr == nil {
TraceLog.Printf("[ReleaseHaltLock.Done(%s)]: no-lock", db.name)
return // no current lock
} else if curr.haltLock.ID != id {
TraceLog.Printf("[ReleaseHaltLock.Done(%s)]: not-current-lock", db.name)
return // not the current lock
}
// Remove as the current halt lock. Ignore the swapped return since that
// just means that a concurrent release already took care of it.
db.haltLockAndGuard.CompareAndSwap(curr, (*haltLockAndGuard)(nil))
// Release the guard set so the database can write again.
curr.guardSet.Unlock()
TraceLog.Printf("[ReleaseHaltLock.Done(%s)]:", db.name)
}
// EnforceHaltLockExpiration unsets the HALT lock if it has expired.
func (db *DB) EnforceHaltLockExpiration(ctx context.Context) {
curr := db.haltLockAndGuard.Load().(*haltLockAndGuard)
if curr == nil {
return
} else if curr.haltLock.Expires == nil || curr.haltLock.Expires.After(time.Now()) {
return
}
TraceLog.Printf("[ExpireHaltLock(%s)]: id=%d", db.name, curr.haltLock.ID)
// Clear lock & unlock its guards.
db.haltLockAndGuard.CompareAndSwap(curr, (*haltLockAndGuard)(nil))
curr.guardSet.Unlock()
}
// AcquireRemoteHaltLock acquires the remote lock and syncs the database to its
// position before returning to the caller. Caller should provide a random lock
// identifier so that the primary can deduplicate retry requests.
func (db *DB) AcquireRemoteHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error) {
cctx, cancel := context.WithTimeout(ctx, db.store.HaltAcquireTimeout)
defer cancel()
TraceLog.Printf("[AcquireRemoteHaltLock(%s)]: id=%d", db.name, lockID)
defer func() {
TraceLog.Printf("[AcquireRemoteHaltLock.Done(%s)]: id=%d %s", db.name, lockID, errorKeyValue(retErr))
}()
if lockID == 0 {
return nil, fmt.Errorf("remote halt lock id required")
}
isPrimary, info := db.store.PrimaryInfo()
if isPrimary {
return nil, ErrNoHaltPrimary
} else if info == nil {
return nil, fmt.Errorf("no primary available for remote transaction")
}
// Request the remote lock from the primary node.
haltLock, err := db.store.Client.AcquireHaltLock(cctx, info.AdvertiseURL, db.store.ID(), db.name, lockID)
if err != nil {
return nil, fmt.Errorf("remote begin: %w", err)
}
defer func() {
if retErr != nil {
if err := db.store.Client.ReleaseHaltLock(ctx, info.AdvertiseURL, db.store.ID(), db.name, haltLock.ID); err != nil {
log.Printf("cannot release remote halt lock after acquisition error: %s", err)
}
}
}()
// Store the remote lock so we can use it for commits. This may overwrite
// but there should only be one halt lock at any time since there can only
// be one primary. If a race condition occurs and the halt lock is replaced
// with a dead one then the next commit will simply be rejected.
db.remoteHaltLock.Store(haltLock)
// Wait for local node to catch up to remote position.
if err := db.WaitPosExact(cctx, haltLock.Pos); err != nil {
return nil, fmt.Errorf("wait: %w", err)
}
other := *haltLock
return &other, nil
}
// ReleaseRemoteHaltLock releases the current remote lock from the primary.
func (db *DB) ReleaseRemoteHaltLock(ctx context.Context, lockID int64) (retErr error) {
if err := db.UnsetRemoteHaltLock(ctx, lockID); err != nil {
return err
}
isPrimary, info := db.store.PrimaryInfo()
if isPrimary {
return nil // no remote halting on primary
} else if info == nil {
return fmt.Errorf("no primary available to release remote halt lock")
}
if err := db.store.Client.ReleaseHaltLock(ctx, info.AdvertiseURL, db.store.ID(), db.name, lockID); err != nil {
return err
}
return nil
}
// UnsetRemoteHaltLock releases the current remote lock because of expiration.
// This only removes the reference locally as it's assumed it has already been
// removed on the primary.
func (db *DB) UnsetRemoteHaltLock(ctx context.Context, lockID int64) (retErr error) {
TraceLog.Printf("[UnsetRemoteHaltLock(%s)]:", db.name)
haltLock := db.remoteHaltLock.Load().(*HaltLock)
if haltLock == nil {
TraceLog.Printf("[UnsetRemoteHaltLock.Done(%s)]: id=%d no-lock", db.name, lockID)
return nil
} else if haltLock.ID != lockID {
TraceLog.Printf("[UnsetRemoteHaltLock.Done(%s)]: id=%d curr=%d not-current-lock", db.name, lockID, haltLock.ID)
return nil
}
defer func() {
TraceLog.Printf("[UnsetRemoteHaltLock.Done(%s)]: %s", db.name, errorKeyValue(retErr))
}()
// Checkpoint when we release the remote lock.
if err := db.Recover(ctx); err != nil {
return fmt.Errorf("recovery: %w", err)
}
// Clear local reference before releasing from primary.
// This avoids a race condition where the next LTX file comes in before release confirmation.
db.remoteHaltLock.CompareAndSwap(haltLock, (*HaltLock)(nil))
return nil
}
// WaitPosExact returns once db has reached the target position.
// Returns an error if ctx is done, TXID is exceeded, or on checksum mismatch.
func (db *DB) WaitPosExact(ctx context.Context, target ltx.Pos) error {
ticker := time.NewTicker(WaitInterval)
defer ticker.Stop()
// TODO(fwd): Check for primary change.
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-ticker.C:
pos := db.Pos()
if pos.TXID < target.TXID {
continue // not there yet, try again
}
if pos.TXID > target.TXID {
return fmt.Errorf("target transaction id exceeded: %s > %s", pos.TXID.String(), target.TXID.String())
}
if pos.PostApplyChecksum != target.PostApplyChecksum {
return fmt.Errorf("target checksum mismatch: %s != %s", pos.PostApplyChecksum, target.PostApplyChecksum)
}
return nil
}
}
}
// RemoteHaltLock returns a copy of the current remote lock, if any.
func (db *DB) RemoteHaltLock() *HaltLock {
value := db.remoteHaltLock.Load().(*HaltLock)
if value == nil {
return nil
}
other := *value
return &other
}
// HasRemoteHaltLock returns true if the node currently has the remote lock acquired.
func (db *DB) HasRemoteHaltLock() bool {
return db.remoteHaltLock.Load().(*HaltLock) != nil
}
// Writeable returns true if the node is the primary or if we've acquire the
// HALT lock from the primary.
func (db *DB) Writeable() bool {
return db.HasRemoteHaltLock() || db.store.IsPrimary()
}
// TXID returns the current transaction ID.
func (db *DB) TXID() ltx.TXID { return db.Pos().TXID }
// Open initializes the database from files in its data directory.
func (db *DB) Open() error {
// Read page size & page count from database file.
if err := db.initFromDatabaseHeader(); err != nil {
return fmt.Errorf("init from database header: %w", err)
}
// Ensure "ltx" directory exists.
if err := os.MkdirAll(db.LTXDir(), 0o777); err != nil {
return err
}
// Remove all SHM files on start up.
if err := os.Remove(db.SHMPath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove shm: %w", err)
}
// Determine the last LTX file to replay from, if any.
ltxFilename, err := db.maxLTXFile(context.Background())
if err != nil {
return fmt.Errorf("max ltx file: %w", err)
}
// Sync up WAL and last LTX file, if they both exist.
if ltxFilename != "" {
if err := db.syncWALToLTX(context.Background(), ltxFilename); err != nil {
return fmt.Errorf("sync wal to ltx: %w", err)
}
}
// Reset the rollback journal and/or WAL.
if err := db.recover(context.Background()); err != nil {
return fmt.Errorf("recover: %w", err)
}
// Verify database header & initialize checksums.
if err := db.initDatabaseFile(); err != nil {
return fmt.Errorf("init database file: %w", err)
}
// Apply the last LTX file so our checksums match if there was a failure in
// between the LTX commit and journal/WAL commit.
if ltxFilename != "" {
if err := db.ApplyLTX(context.Background(), ltxFilename); err != nil {
return fmt.Errorf("recover ltx: %w", err)
}
}
return nil
}
// initFromDatabaseHeader reads the page size & page count from the database file header.
func (db *DB) initFromDatabaseHeader() error {
f, err := os.Open(db.DatabasePath())
if os.IsNotExist(err) {
return nil // no database file yet, skip
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Read page size into memory.
hdr, _, err := readSQLiteDatabaseHeader(f)
if err == io.EOF {
return nil
} else if err == errInvalidDatabaseHeader { // invalid file
log.Printf("invalid database header on %q, clearing data files", db.name)
if err := db.clean(); err != nil {
return fmt.Errorf("clean: %w", err)
}
return nil
} else if err != nil {
return err
}
db.pageSize = hdr.PageSize
db.pageN.Store(hdr.PageN)
// Initialize database mode.
if hdr.WriteVersion == 2 && hdr.ReadVersion == 2 {
db.mode.Store(DBModeWAL)
} else {
db.mode.Store(DBModeRollback)
}
return nil
}
// Recover forces a rollback (journal) or checkpoint (wal).
func (db *DB) Recover(ctx context.Context) error {
guard, err := db.AcquireWriteLock(ctx, nil)
if err != nil {
return err
}
defer guard.Unlock()
return db.recover(ctx)
}
func (db *DB) recover(ctx context.Context) (err error) {
defer func() {
TraceLog.Printf("[Recover(%s)]: %s", db.name, errorKeyValue(err))
}()
// If a journal file exists, rollback the last transaction so that we
// are in a consistent state before applying our last LTX file. Otherwise
// we could have a partial transaction in our database, apply our LTX, and
// then the SQLite client will recover from the journal and corrupt everything.
//
// See: https://github.com/superfly/litefs/issues/134
if err := db.rollbackJournal(ctx); err != nil {
return fmt.Errorf("rollback journal: %w", err)
}
// Copy the WAL file back to the main database. This ensures that we can
// compute the checksum only using the database file instead of having to
// first compute the latest page set from the WAL to overlay.
if err := db.CheckpointNoLock(ctx); err != nil {
return fmt.Errorf("checkpoint: %w", err)
}
return nil
}
// rollbackJournal copies all the pages from an existing rollback journal back
// to the database file. This is called on startup so that we can be in a
// consistent state in order to verify our checksums.
func (db *DB) rollbackJournal(ctx context.Context) error {
journalFile, err := os.OpenFile(db.JournalPath(), os.O_RDWR, 0o666)
if os.IsNotExist(err) {
return nil // no journal file, skip
} else if err != nil {
return err
}
defer func() { _ = journalFile.Close() }()
dbFile, err := os.OpenFile(db.DatabasePath(), os.O_RDWR, 0o666)
if err != nil {
return err
}
defer func() { _ = dbFile.Close() }()
// Copy every journal page back into the main database file.
r := NewJournalReader(journalFile, db.pageSize)
for i := 0; ; i++ {
if err := r.Next(); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("next segment(%d): %w", i, err)
}
if err := db.rollbackJournalSegment(ctx, r, dbFile); err != nil {
return fmt.Errorf("segment(%d): %w", i, err)
}
}
// Resize database to size before journal transaction, if a valid header exists.
if r.IsValid() {
if err := db.truncateDatabase(dbFile, r.commit); err != nil {
return err
}
db.pageN.Store(r.commit)
}
if err := dbFile.Sync(); err != nil {
return err
} else if err := dbFile.Close(); err != nil {
return err
}
if err := journalFile.Close(); err != nil {
return err
} else if err := os.Remove(db.JournalPath()); err != nil {
return err
}
if invalidator := db.store.Invalidator; invalidator != nil {
if err := invalidator.InvalidateEntry(db.name + "-journal"); err != nil {
return fmt.Errorf("invalidate journal: %w", err)
}
}
return nil
}
func (db *DB) rollbackJournalSegment(ctx context.Context, r *JournalReader, dbFile *os.File) error {
for i := 0; ; i++ {
pgno, data, err := r.ReadFrame()
if err == io.EOF {
return nil
} else if err != nil {
return fmt.Errorf("read frame(%d): %w", i, err)
}
// Write data to the database file.
if err := db.writeDatabasePage(dbFile, pgno, data, true); err != nil {
return fmt.Errorf("write to database (pgno=%d): %w", pgno, err)
}
}
}
// Checkpoint acquires locks and copies pages from the WAL into the database and truncates the WAL.
func (db *DB) Checkpoint(ctx context.Context) (err error) {
guard, err := db.AcquireWriteLock(ctx, nil)
if err != nil {
return err
}
defer guard.Unlock()
return db.CheckpointNoLock(ctx)
}
// CheckpointNoLock copies pages from the WAL into the database and truncates the WAL.
// Appropriate locks must be held by the caller.
func (db *DB) CheckpointNoLock(ctx context.Context) (err error) {
TraceLog.Printf("[CheckpointBegin(%s)]", db.name)
defer func() {
TraceLog.Printf("[CheckpointDone(%s)] %v", db.name, err)
}()
// Open the database file we'll checkpoint into. Skip if this hasn't been created.
dbFile, err := os.OpenFile(db.DatabasePath(), os.O_RDWR, 0o666)
if os.IsNotExist(err) {
return nil // no database file yet, skip
} else if err != nil {
return err
}
defer func() { _ = dbFile.Close() }()
// Open the WAL file that we'll copy from. Skip if it was cleanly closed and removed.
walFile, err := os.Open(db.WALPath())
if os.IsNotExist(err) {
return nil // no WAL file, skip
} else if err != nil {
return err
}
defer func() { _ = walFile.Close() }()
offsets, commit, err := db.readWALPageOffsets(walFile)
if err != nil {
return fmt.Errorf("read wal page offsets: %w", err)
}
// Copy pages from the WAL to the main database file & resize db file.
if len(offsets) > 0 {
buf := make([]byte, db.pageSize)
for pgno, offset := range offsets {
if _, err := walFile.Seek(offset+WALFrameHeaderSize, io.SeekStart); err != nil {
return fmt.Errorf("seek wal: %w", err)
} else if _, err := io.ReadFull(walFile, buf); err != nil {
return fmt.Errorf("read wal: %w", err)
}
if err := db.writeDatabasePage(dbFile, pgno, buf, true); err != nil {
return fmt.Errorf("write db page %d: %w", pgno, err)
}
}
if err := db.truncateDatabase(dbFile, commit); err != nil {
return fmt.Errorf("truncate: %w", err)
}
// Save the size of the database, in pages, based on last commit.
db.pageN.Store(commit)
}
// Remove WAL file.
if err := db.TruncateWAL(ctx, 0); err != nil {
return fmt.Errorf("truncate wal: %w", err)
}
// Clear per-page checksums within WAL.
db.wal.chksums = make(map[uint32][]ltx.Checksum)
// Update the SHM file.
if err := db.updateSHM(ctx); err != nil {
return fmt.Errorf("update shm: %w", err)
}
return nil
}
// readWALPageOffsets returns a map of the offsets of the last committed version
// of each page in the WAL. Also returns the commit size of the last transaction.
func (db *DB) readWALPageOffsets(f *os.File) (_ map[uint32]int64, lastCommit uint32, _ error) {
r := NewWALReader(f)
if err := r.ReadHeader(); err == io.EOF {
return nil, 0, nil
}
// Read the offset of the last version of each page in the WAL.
offsets := make(map[uint32]int64)
txOffsets := make(map[uint32]int64)
buf := make([]byte, r.PageSize())
for {
pgno, commit, err := r.ReadFrame(buf)
if err == io.EOF {
break
} else if err != nil {
return nil, 0, err
}
// Save latest offset for each page version.
txOffsets[pgno] = r.Offset()
// If this is not a committing frame, continue to next frame.
if commit == 0 {
continue
}
// At the end of each transaction, copy offsets to main map.
lastCommit = commit
for k, v := range txOffsets {
offsets[k] = v
}
txOffsets = make(map[uint32]int64)
}
return offsets, lastCommit, nil
}
// maxLTXFile returns the filename of the highest LTX file.
func (db *DB) maxLTXFile(ctx context.Context) (string, error) {
ents, err := os.ReadDir(db.LTXDir())
if err != nil {
return "", err
}
var max ltx.TXID
var filename string
for _, ent := range ents {
_, maxTXID, err := ltx.ParseFilename(ent.Name())
if err != nil {
continue
} else if maxTXID <= max {
continue
}
// Save filename with the highest TXID.
max, filename = maxTXID, filepath.Join(db.LTXDir(), ent.Name())
}
return filename, nil
}
// syncWALToLTX truncates the WAL file to the last LTX file if the WAL info
// in the LTX header does not match. This protects against a hard shutdown
// where a WAL file was sync'd past the last LTX file.
func (db *DB) syncWALToLTX(ctx context.Context, ltxFilename string) error {
// Open last LTX file.
ltxFile, err := os.Open(ltxFilename)
if err != nil {
return err
}
defer func() { _ = ltxFile.Close() }()
// Read header from LTX file to determine WAL fields.
// This also validates the LTX file before it gets processed by ApplyLTX().
dec := ltx.NewDecoder(ltxFile)
if err := dec.Verify(); err != nil {
return fmt.Errorf("validate ltx: %w", err)
}
ltxWALSize := dec.Header().WALOffset + dec.Header().WALSize
// Open WAL file, ignore if it doesn't exist.
walFile, err := os.OpenFile(db.WALPath(), os.O_RDWR, 0o666)
if os.IsNotExist(err) {
log.Printf("wal-sync: no wal file exists on %q, skipping sync with ltx", db.name)
return nil // no wal file, nothing to do
} else if err != nil {
return err
}
defer func() { _ = walFile.Close() }()
// Determine WAL size.
fi, err := walFile.Stat()
if err != nil {
return err
}
// Read WAL header.
hdr := make([]byte, WALHeaderSize)
if _, err := internal.ReadFullAt(walFile, hdr, 0); err == io.EOF || err == io.ErrUnexpectedEOF {
log.Printf("wal-sync: short wal file exists on %q, skipping sync with ltx", db.name)
return nil // short WAL header, skip
} else if err != nil {
return err
}
// If WAL salt doesn't match the LTX WAL salt then the WAL has been
// restarted and we need to remove it. We are just renaming it for now so
// we can debug in case this happens.
salt1 := binary.BigEndian.Uint32(hdr[16:])
salt2 := binary.BigEndian.Uint32(hdr[20:])
if salt1 != dec.Header().WALSalt1 || salt2 != dec.Header().WALSalt2 {
log.Printf("wal-sync: wal salt mismatch on %q, removing wal", db.name)
if err := os.Rename(db.WALPath(), db.WALPath()+".removed"); err != nil {
return fmt.Errorf("wal-sync: rename wal file with salt mismatch: %w", err)
}
return nil
}
// If the salt matches then we need to make sure we are at least up to the
// start of the last LTX transaction.
if fi.Size() < dec.Header().WALOffset {
return fmt.Errorf("wal-sync: short wal size (%d bytes) for %q, last ltx offset at %d bytes", fi.Size(), db.name, dec.Header().WALOffset)
}
// Resize WAL back to size in the LTX file.
if fi.Size() > ltxWALSize {
log.Printf("wal-sync: truncating wal of %q from %d bytes to %d bytes to match ltx", db.name, fi.Size(), ltxWALSize)
if err := walFile.Truncate(ltxWALSize); err != nil {
return fmt.Errorf("truncate wal: %w", err)
}
return nil
}
log.Printf("wal-sync: database %q has wal size of %d bytes within range of ltx file (@%d, %d bytes)", db.name, fi.Size(), dec.Header().WALOffset, dec.Header().WALSize)
return nil
}
// initDatabaseFile opens and validates the database file, if it exists.
// The journal & WAL should not exist at this point. The journal should be
// rolled back and the WAL should be checkpointed.
func (db *DB) initDatabaseFile() error {
f, err := os.Open(db.DatabasePath())
if os.IsNotExist(err) {
log.Printf("database file does not exist on initialization: %s", db.DatabasePath())
return nil // no database file yet
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()
hdr, _, err := readSQLiteDatabaseHeader(f)
if err == io.EOF {
log.Printf("database file is zero length on initialization: %s", db.DatabasePath())
return nil // no contents yet
} else if err != nil {
return fmt.Errorf("cannot read database header: %w", err)
}
db.pageSize = hdr.PageSize
db.pageN.Store(hdr.PageN)
assert(db.pageSize > 0, "page size must be greater than zero")
db.chksums.mu.Lock()
defer db.chksums.mu.Unlock()
// Build per-page checksum map for existing pages. The database could be
// short compared to the page count in the header so just checksum what we
// can. The database may recover in applyLTX() so we'll do validation then.
buf := make([]byte, db.pageSize)
db.chksums.pages = make([]ltx.Checksum, db.PageN())
db.chksums.blocks = make([]ltx.Checksum, pageChksumBlock(db.PageN()))
for pgno := uint32(1); pgno <= db.PageN(); pgno++ {
offset := int64(pgno-1) * int64(db.pageSize)
if _, err := internal.ReadFullAt(f, buf, offset); err == io.EOF || err == io.ErrUnexpectedEOF {
log.Printf("database checksum ending early at page %d of %d ", pgno-1, db.PageN())
break
} else if err != nil {
return fmt.Errorf("read database page %d: %w", pgno, err)
}
chksum := ltx.ChecksumPage(pgno, buf)
db.setDatabasePageChecksum(pgno, chksum)
}
return nil
}
// clean deletes and recreates the database data directory.
func (db *DB) clean() error {
if err := os.RemoveAll(db.path); err != nil && !os.IsNotExist(err) {
return err
}
return os.Mkdir(db.path, 0o777)
}
// OpenLTXFile returns a file handle to an LTX file that contains the given TXID.
func (db *DB) OpenLTXFile(txID ltx.TXID) (*os.File, error) {
return os.Open(db.LTXPath(txID, txID))
}
// OpenDatabase returns a handle for the database file.
func (db *DB) OpenDatabase(ctx context.Context) (*os.File, error) {
f, err := os.OpenFile(db.DatabasePath(), os.O_RDWR, 0o666)
TraceLog.Printf("[OpenDatabase(%s)]: %s", db.name, errorKeyValue(err))
return f, err
}
// CloseDatabase closes a handle associated with the database file.
func (db *DB) CloseDatabase(ctx context.Context, f *os.File, owner uint64) error {
err := f.Close()
TraceLog.Printf("[CloseDatabase(%s)]: owner=%d %s", db.name, owner, errorKeyValue(err))
return err
}
// TruncateDatabase sets the size of the database file.
func (db *DB) TruncateDatabase(ctx context.Context, size int64) (err error) {
// Require the page size because we need to check against the page count & checksums.
if db.pageSize == 0 {
return fmt.Errorf("page size required on database truncation")
} else if size%int64(db.pageSize) != 0 {
return fmt.Errorf("size must be page-aligned (%d bytes)", db.pageSize)
}
// Verify new size matches the database size specified in the header.
pageN := uint32(size / int64(db.pageSize))
if pageN != db.PageN() {
return fmt.Errorf("truncation size (%d pages) does not match database header size (%d pages)", pageN, db.PageN())
}
// Process the actual file system truncation.
if f, err := os.OpenFile(db.DatabasePath(), os.O_RDWR, 0o666); err != nil {
return err
} else if err := db.truncateDatabase(f, pageN); err != nil {
_ = f.Close()
return err
} else if err := f.Close(); err != nil {
return err
}
return nil
}
// truncateDatabase truncates the database to a given page count.
func (db *DB) truncateDatabase(f *os.File, pageN uint32) (err error) {