Skip to content

Commit

Permalink
indexer: backups are now slower but deterministic
Browse files Browse the repository at this point in the history
they are a (zstd compressed) set of SQL statements

also, relevant methods now use io.Reader and io.Writer
  • Loading branch information
altergui committed Jun 17, 2024
1 parent d53df69 commit 47767ee
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 71 deletions.
12 changes: 12 additions & 0 deletions data/compressor/compression.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package compressor

import (
"io"
"time"

"github.com/klauspost/compress/zstd"
"go.vocdoni.io/dvote/log"
)

// NewWriter creates a new writer that uses zstd
func NewWriter(w io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(w)
}

// NewReader creates a new reader that uses zstd
func NewReader(r io.Reader) (io.ReadCloser, error) {
zr, err := zstd.NewReader(r)
return zr.IOReadCloser(), err
}

// Compressor is a data compressor that uses zstd.
type Compressor struct {
encoder *zstd.Encoder
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
github.com/pressly/goose/v3 v3.20.0
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.31.0
github.com/schollz/sqlite3dump v1.3.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
Expand Down Expand Up @@ -1326,6 +1327,8 @@ github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXn
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/schollz/sqlite3dump v1.3.1 h1:QXizJ7XEJ7hggjqjZ3YRtF3+javm8zKtzNByYtEkPRA=
github.com/schollz/sqlite3dump v1.3.1/go.mod h1:mzSTjZpJH4zAb1FN3iNlhWPbbdyeBpOaTW0hukyMHyI=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
Expand Down
44 changes: 2 additions & 42 deletions service/indexer.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package service

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/snapshot"
Expand All @@ -29,43 +24,8 @@ func (vs *VocdoniService) VochainIndexer() error {
// launch the indexer after sync routine (executed when the blockchain is ready)
go vs.Indexer.AfterSyncBootstrap(false)

snapshot.SetFnImportIndexer(func(r io.Reader) error {
log.Debugf("restoring indexer backup")

file, err := os.CreateTemp("", "indexer.sqlite3")
if err != nil {
return fmt.Errorf("creating tmpfile: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
log.Warnw("error closing tmpfile", "path", file.Name(), "err", err)
}
if err := os.Remove(file.Name()); err != nil {
log.Warnw("error removing tmpfile", "path", file.Name(), "err", err)
}
}()

if _, err := io.Copy(file, r); err != nil {
return fmt.Errorf("writing tmpfile: %w", err)
}

return vs.Indexer.RestoreBackup(file.Name())
})

snapshot.SetFnExportIndexer(func(w io.Writer) error {
log.Debugf("saving indexer backup")

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
data, err := vs.Indexer.ExportBackupAsBytes(ctx)
if err != nil {
return fmt.Errorf("creating indexer backup: %w", err)
}
if _, err := w.Write(data); err != nil {
return fmt.Errorf("writing data: %w", err)
}
return nil
})
snapshot.SetFnImportIndexer(vs.Indexer.ImportBackup)
snapshot.SetFnExportIndexer(vs.Indexer.ExportBackup)

if vs.Config.Indexer.ArchiveURL != "" && vs.Config.Indexer.ArchiveURL != "none" {
log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL)
Expand Down
127 changes: 103 additions & 24 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package indexer

import (
"bufio"
"bytes"
"context"
"database/sql"
"embed"
Expand All @@ -16,6 +18,7 @@ import (
"sync"
"time"

"go.vocdoni.io/dvote/data/compressor"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/statedb"
"go.vocdoni.io/dvote/types"
Expand All @@ -30,6 +33,7 @@ import (
"github.com/pressly/goose/v3"
"golang.org/x/exp/maps"

"github.com/schollz/sqlite3dump"
// modernc is a pure-Go version, but its errors have less useful info.
// We use mattn while developing and testing, and we can swap them later.
// _ "modernc.org/sqlite"
Expand Down Expand Up @@ -233,7 +237,8 @@ func (idx *Indexer) Close() error {
return nil
}

// BackupPath restores the database from a backup created via SaveBackup.
// RestoreBackup restores the indexer by copying the file (raw binary format)
// from the passed path.
// Note that this must be called with ExpectBackupRestore set to true,
// and before any indexing or queries happen.
func (idx *Indexer) RestoreBackup(path string) error {
Expand All @@ -249,37 +254,111 @@ func (idx *Indexer) RestoreBackup(path string) error {
return nil
}

// SaveBackup backs up the database to a file on disk.
// Note that writes to the database may be blocked until the backup finishes,
// and an error may occur if a file at path already exists.
//
// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database.
func (idx *Indexer) SaveBackup(ctx context.Context, path string) error {
_, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path)
return err
// ImportBackup restores the database from a backup created via ExportBackup.
// Note that this must be called with ExpectBackupRestore set to true,
// and before any indexing or queries happen.
func (idx *Indexer) ImportBackup(r io.Reader) error {
if idx.readWriteDB != nil {
panic("Indexer.RestoreBackup called after the database was initialized")
}
log.Debugf("restoring indexer backup")

zr, err := compressor.NewReader(r)
if err != nil {
return fmt.Errorf("could not init decompressor: %w", err)
}
defer zr.Close()

if err := restoreDBFromSQLDump(idx.dbPath, zr); err != nil {
return fmt.Errorf("could not restore indexer backup: %w", err)
}
if err := idx.startDB(); err != nil {
return err
}
return nil
}

func restoreDBFromSQLDump(dbPath string, r io.Reader) error {
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal&_foreign_keys=true", dbPath))
if err != nil {
return fmt.Errorf("could not open indexer db: %w", err)
}
defer db.Close()

scanner := bufio.NewScanner(r)
var statement strings.Builder
for scanner.Scan() {
line := scanner.Text()
statement.WriteString(line)
statement.WriteString("\n")

if strings.HasSuffix(line, ";") {
_, err := db.Exec(statement.String())
if err != nil {
return fmt.Errorf("failed to execute statement: %s (error: %w)", statement.String(), err)
}
statement.Reset()
}
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("error during restore: %w", err)
}

return nil
}

// ExportBackup exports a (compressed) deterministic set of SQL statements.
// Note that writes to the database may be blocked until the method finishes.
func (idx *Indexer) ExportBackup(w io.Writer) error {
log.Debugf("exporting indexer backup")
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tmpDB, err := os.CreateTemp("", "indexer*.sqlite3")
if err != nil {
return fmt.Errorf("could not create tmpdb file: %w", err)
}
defer os.Remove(tmpDB.Name())

if _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, tmpDB.Name()); err != nil {
return fmt.Errorf("could not vacuum into tmpdb: %w", err)
}

db, err := sql.Open("sqlite3", tmpDB.Name())
if err != nil {
return fmt.Errorf("could not open tmpDB: %w", err)
}
defer db.Close()

// first drop stats table
if _, err := db.ExecContext(ctx, `DROP TABLE IF EXISTS sqlite_stat1;`); err != nil {
return fmt.Errorf("could not drop table sqlite_stat1: %w", err)
}

// make goose_db_version table deterministic
if _, err := db.ExecContext(ctx, `UPDATE goose_db_version SET tstamp = '1970-01-01 00:00:00';`); err != nil {
return fmt.Errorf("could not update goose_db_version: %w", err)
}

zw, err := compressor.NewWriter(w)
if err != nil {
return fmt.Errorf("could not init compressor: %w", err)
}
defer zw.Close()

return sqlite3dump.DumpDB(db, zw)
}

// ExportBackupAsBytes backs up the database, and returns the contents as []byte.
//
// Note that writes to the database may be blocked until the backup finishes.
//
// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database.
func (idx *Indexer) ExportBackupAsBytes(ctx context.Context) ([]byte, error) {
tmpDir, err := os.MkdirTemp("", "indexer")
if err != nil {
return nil, fmt.Errorf("error creating tmpDir: %w", err)

}
tmpFilePath := filepath.Join(tmpDir, "indexer.sqlite3")
if err := idx.SaveBackup(ctx, tmpFilePath); err != nil {
var buf bytes.Buffer
if err := idx.ExportBackup(&buf); err != nil {
return nil, fmt.Errorf("error saving indexer backup: %w", err)
}
defer func() {
if err := os.Remove(tmpFilePath); err != nil {
log.Warnw("error removing indexer backup file", "path", tmpFilePath, "err", err)
}
}()
return os.ReadFile(tmpFilePath)
return buf.Bytes(), nil
}

// blockTxQueries assumes that lockPool is locked.
Expand Down
8 changes: 3 additions & 5 deletions vochain/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package indexer

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
stdlog "log"
"math/big"
"path/filepath"
"testing"

qt "github.com/frankban/quicktest"
Expand Down Expand Up @@ -88,8 +86,8 @@ func TestBackup(t *testing.T) {
wantTotalVotes(10)

// Back up the database.
backupPath := filepath.Join(t.TempDir(), "backup")
err = idx.SaveBackup(context.TODO(), backupPath)
var bkp bytes.Buffer
err = idx.ExportBackup(&bkp)
qt.Assert(t, err, qt.IsNil)

// Add another 5 votes which aren't in the backup.
Expand All @@ -110,7 +108,7 @@ func TestBackup(t *testing.T) {
idx.Close()
idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true})
qt.Assert(t, err, qt.IsNil)
err = idx.RestoreBackup(backupPath)
err = idx.ImportBackup(&bkp)
qt.Assert(t, err, qt.IsNil)
wantTotalVotes(10)

Expand Down

0 comments on commit 47767ee

Please sign in to comment.