Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: Add scalable PostgreSQL backend #74

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
5720905
last anchor cleanup
amass01 Aug 4, 2020
5177c57
add schema.md to describe sql tables
amass01 Aug 5, 2020
bca6341
add records table description
amass01 Aug 5, 2020
33df874
add anchors table
amass01 Aug 5, 2020
c566934
opps
amass01 Aug 5, 2020
b9dae82
typo
amass01 Aug 5, 2020
2533029
add postgres package & struct
amass01 Aug 7, 2020
52ec32f
add backend config & interface funcs signatures
amass01 Aug 7, 2020
a3bbf17
setup postgres when configured
amass01 Aug 7, 2020
62798d4
add postgres connection info to config and validate
amass01 Aug 8, 2020
73eb7fd
provide all postgres ssl vars & establish connection
amass01 Aug 8, 2020
3317ae5
add scripts to generate postgres ssl certs & initial setup
amass01 Aug 8, 2020
c7129c4
postgress connection last touches
amass01 Aug 8, 2020
49bcd14
add createTables func
amass01 Aug 9, 2020
086a6f7
create anchors table if doesn't exsist
amass01 Aug 9, 2020
9d38731
add pg log & create pg tables if doesn't exist
amass01 Aug 10, 2020
d9afd5a
Implement backend Put func
amass01 Aug 10, 2020
f6a96e0
polish Put func
amass01 Aug 11, 2020
708c017
add unique indexes for sql cols
amass01 Aug 11, 2020
07dc8c7
.md land
amass01 Aug 11, 2020
c16c9e8
add unique indexes in sql table def.
amass01 Aug 11, 2020
68add0f
move sql logic to separate file - postgres/sql.go
amass01 Aug 11, 2020
9f8c03e
move postgres user to postgres.go
amass01 Aug 11, 2020
dda5331
delete dbuser from log
amass01 Aug 11, 2020
f52214e
ditch `key` col & use digest as primary key in records table
amass01 Aug 11, 2020
b6cddb6
Implement Close interface func
amass01 Aug 11, 2020
0e893c4
add GetBalance impl.
amass01 Aug 11, 2020
7f3d4a7
get wip
amass01 Aug 12, 2020
6328827
handle digest doesn't exist case
amass01 Aug 12, 2020
34b3dfc
add missing cols to get & change merkle root col type to bytea
amass01 Aug 12, 2020
59c339a
ErrorCode = backend.ErrorOK when digest found in Get
amass01 Aug 13, 2020
5731cc0
add flush logic - collect unflushed reocrds and calc merkle
amass01 Aug 13, 2020
c742744
insert anchor data into db
amass01 Aug 14, 2020
0bac395
fix anchor merkle insert and update records' merkle col
amass01 Aug 14, 2020
e8d5d7c
lazy flush anchor on digest get when chain timestamp isn't
amass01 Aug 15, 2020
aef715e
cleanup lazyflush
amass01 Aug 16, 2020
6ea6e68
- Get DISTINCT timestamps when flushing,
amass01 Aug 17, 2020
8abb29b
implement GetTimestamps interface func
amass01 Aug 18, 2020
1beed9e
cleanup GetTimestamps func
amass01 Aug 18, 2020
74eb336
update schema col types
amass01 Aug 18, 2020
f369711
query string indentation & tx hash col type bytea
amass01 Aug 19, 2020
c0ee982
adjust code to tx hash col type to bytea
amass01 Aug 19, 2020
38f8adb
add LastAnchor interface func
amass01 Aug 19, 2020
eee5982
add Fsck :beach:
amass01 Aug 25, 2020
c52cd06
cleanup Fsck
amass01 Aug 26, 2020
7f271b1
add postgres backend support for dcrtime_dumpdb
amass01 Aug 27, 2020
9f88cf6
fix duplicates in postgres Dump and preprare for Restore
amass01 Aug 29, 2020
2e643a7
cleanup logs
amass01 Aug 29, 2020
5f8956b
fix cli
amass01 Aug 29, 2020
0c73d1f
add postgres Restore implementation
amass01 Aug 29, 2020
8fd25b4
cleanup
amass01 Aug 29, 2020
90f3674
delete commited log file
amass01 Aug 29, 2020
2bbdd99
log root only when filesystem is used
amass01 Aug 30, 2020
f5ea722
`dcrtime_fsck`: require -destination flag only for filesystem backend
amass01 Aug 30, 2020
67fb76b
add sql models
amass01 Sep 11, 2020
efaa1de
fix get timestamp
amass01 Sep 11, 2020
86248ab
dump hashes in reverse to keep insertion order
amass01 Sep 12, 2020
14cb20b
swallow "no rows in result" error when updating anchor's chain timestamp
amass01 Sep 12, 2020
7f78482
ditch testing flag from Postgres struct & add TestPostgres
amass01 Sep 12, 2020
45613db
check rows.Err()
amass01 Sep 12, 2020
6e8ff5d
satisfy linter
amass01 Sep 12, 2020
3642168
cleanup
amass01 Sep 13, 2020
1aebba6
opps
amass01 Sep 13, 2020
522cf11
add failing tests - chunk 1
amass01 Sep 13, 2020
ca27cd1
fix duplicated digest in tests
amass01 Sep 14, 2020
d3b8aaf
finish tests
amass01 Sep 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions cmd/dcrtime_dumpdb/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2020 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package main

import (
"path/filepath"

"github.com/jessevdk/go-flags"
)

const defaultConfigFilename = "dcrtimed.conf"

var (
defaultConfigFile = filepath.Join(defaultHomeDir, defaultConfigFilename)
defaultBackend = "filesystem"
)

// config defines the configuration options for dcrtime_fsck
//
// See loadConfig for details on the configuration load process.
type config struct {
HomeDir string `short:"A" long:"appdata" description:"Path to application home directory"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid listing all dcrtimed params here and use only the subset we need ? 🤔

ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"`
DataDir string `short:"b" long:"datadir" description:"Directory to store data"`
LogDir string `long:"logdir" description:"Directory to log output."`
TestNet bool `long:"testnet" description:"Use the test network"`
SimNet bool `long:"simnet" description:"Use the simulation test network"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"`
CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"`
MemProfile string `long:"memprofile" description:"Write mem profile to the specified file"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 49152, testnet: 59152)"`
WalletHost string `long:"wallethost" description:"Hostname for wallet server"`
WalletCert string `long:"walletcert" description:"Certificate path for wallet server"`
WalletPassphrase string `long:"walletpassphrase" description:"Passphrase for wallet server"`
Version string
HTTPSCert string `long:"httpscert" description:"File containing the https certificate file"`
HTTPSKey string `long:"httpskey" description:"File containing the https certificate key"`
StoreHost string `long:"storehost" description:"Enable proxy mode - send requests to the specified ip:port"`
StoreCert string `long:"storecert" description:"File containing the https certificate file for storehost"`
EnableCollections bool `long:"enablecollections" description:"Allow clients to query collection timestamps."`
APITokens []string `long:"apitoken" description:"Token used to grant access to privileged API resources"`
APIVersions string `long:"apiversions" description:"Enables API versions on the daemon"`
Backend string `long:"backend" description:"Sets the cache layer type 'filesystem'/'postgres'"`
PostgresHost string `long:"postgreshost" description:"Postgres ip:port"`
PostgresRootCert string `long:"postgresrootcert" description:"File containing the CA certificate for postgres"`
PostgresCert string `long:"postgrescert" description:"File containing the dcrtimed client certificate for postgres"`
PostgresKey string `long:"postgreskey" description:"File containing the dcrtimed client certificate key for postgres"`
}

// loadConfig initializes and parses the config using a config file
func loadConfig() (*config, error) {
// Default config.
cfg := config{
Backend: defaultBackend,
}

err := flags.IniParse(defaultConfigFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, nil
}
59 changes: 40 additions & 19 deletions cmd/dcrtime_dumpdb/dcrtime_dumpdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/decred/dcrd/chaincfg/v2"
"github.com/decred/dcrd/dcrutil/v2"
"github.com/decred/dcrtime/dcrtimed/backend"
"github.com/decred/dcrtime/dcrtimed/backend/filesystem"
"github.com/decred/dcrtime/dcrtimed/backend/postgres"
)

var (
Expand All @@ -24,18 +26,9 @@ var (
func _main() error {
flag.Parse()

if *restore {
if *destination == "" {
return fmt.Errorf("-destination must be set")
}

fs, err := filesystem.NewRestore(*destination)
if err != nil {
return err
}
defer fs.Close()

return fs.Restore(os.Stdin, true, *destination)
loadedCfg, err := loadConfig()
if err != nil {
return fmt.Errorf("Could not load configuration file: %v", err)
}

root := *fsRoot
Expand All @@ -48,17 +41,45 @@ func _main() error {
}
}

// Dump

fmt.Printf("=== Root: %v\n", root)

fs, err := filesystem.NewDump(root)
var b backend.Backend
switch (*loadedCfg).Backend {
case "filesystem":
if *restore {
if *destination == "" {
return fmt.Errorf("-destination must be set")
}
b, err = filesystem.NewRestore(*destination)
break
}
b, err = filesystem.NewDump(root)
if !*dumpJSON {
fmt.Printf("=== Root: %v\n", root)
}
case "postgres":
var net string
switch loadedCfg.TestNet {
case true:
net = "testnet"
default:
net = "mainnet"
}
b, err = postgres.NewDB(loadedCfg.PostgresHost,
net,
loadedCfg.PostgresRootCert,
loadedCfg.PostgresCert,
loadedCfg.PostgresKey)
default:
err = fmt.Errorf("Unsupported backend type: %v", (*loadedCfg).Backend)
}
if err != nil {
return err
}
defer fs.Close()
defer b.Close()

return fs.Dump(os.Stdout, !*dumpJSON)
if *restore {
return b.Restore(os.Stdin, true, *destination)
}
return b.Dump(os.Stdout, !*dumpJSON)
}

func main() {
Expand Down
4 changes: 4 additions & 0 deletions cmd/dcrtime_fsck/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ dcrtime_fsck
The filesystem backend can under rare circumstances become incoherent. This
tool iterates over all timestamp directories and corrects known failures.

PostgreSQL backend also supported, `dcrtime_fsck` digs in `dcrtimed.conf`
file to determine which backend to run, and uses the provided db params
in case postgres backend is running.

## Flags

```
Expand Down
67 changes: 67 additions & 0 deletions cmd/dcrtime_fsck/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2020 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package main

import (
"path/filepath"

"github.com/jessevdk/go-flags"
)

const defaultConfigFilename = "dcrtimed.conf"

var (
defaultConfigFile = filepath.Join(defaultHomeDir, defaultConfigFilename)
defaultBackend = "filesystem"
)

// config defines the configuration options for dcrtime_fsck
//
// See loadConfig for details on the configuration load process.
type config struct {
HomeDir string `short:"A" long:"appdata" description:"Path to application home directory"`
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"`
DataDir string `short:"b" long:"datadir" description:"Directory to store data"`
LogDir string `long:"logdir" description:"Directory to log output."`
TestNet bool `long:"testnet" description:"Use the test network"`
SimNet bool `long:"simnet" description:"Use the simulation test network"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"`
CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"`
MemProfile string `long:"memprofile" description:"Write mem profile to the specified file"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 49152, testnet: 59152)"`
WalletHost string `long:"wallethost" description:"Hostname for wallet server"`
WalletCert string `long:"walletcert" description:"Certificate path for wallet server"`
WalletPassphrase string `long:"walletpassphrase" description:"Passphrase for wallet server"`
Version string
HTTPSCert string `long:"httpscert" description:"File containing the https certificate file"`
HTTPSKey string `long:"httpskey" description:"File containing the https certificate key"`
StoreHost string `long:"storehost" description:"Enable proxy mode - send requests to the specified ip:port"`
StoreCert string `long:"storecert" description:"File containing the https certificate file for storehost"`
EnableCollections bool `long:"enablecollections" description:"Allow clients to query collection timestamps."`
APITokens []string `long:"apitoken" description:"Token used to grant access to privileged API resources"`
APIVersions string `long:"apiversions" description:"Enables API versions on the daemon"`
Backend string `long:"backend" description:"Sets the cache layer type 'filesystem'/'postgres'"`
PostgresHost string `long:"postgreshost" description:"Postgres ip:port"`
PostgresRootCert string `long:"postgresrootcert" description:"File containing the CA certificate for postgres"`
PostgresCert string `long:"postgrescert" description:"File containing the dcrtimed client certificate for postgres"`
PostgresKey string `long:"postgreskey" description:"File containing the dcrtimed client certificate key for postgres"`
}

// loadConfig initializes and parses the config using a config file
func loadConfig() (*config, error) {
// Default config.
cfg := config{
Backend: defaultBackend,
}

err := flags.IniParse(defaultConfigFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, nil
}
31 changes: 28 additions & 3 deletions cmd/dcrtime_fsck/dcrtime_fsck.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/decred/dcrd/dcrutil/v2"
"github.com/decred/dcrtime/dcrtimed/backend"
"github.com/decred/dcrtime/dcrtimed/backend/filesystem"
"github.com/decred/dcrtime/dcrtimed/backend/postgres"
)

var (
Expand All @@ -28,6 +29,11 @@ var (
func _main() error {
flag.Parse()

loadedCfg, err := loadConfig()
if err != nil {
return fmt.Errorf("Could not load configuration file: %v", err)
}

root := *fsRoot
if root == "" {
root = filepath.Join(defaultHomeDir, "data")
Expand All @@ -52,13 +58,32 @@ func _main() error {

fmt.Printf("=== Root: %v\n", root)

fs, err := filesystem.NewDump(root)
var b backend.Backend
switch (*loadedCfg).Backend {
case "filesystem":
b, err = filesystem.NewDump(root)
case "postgres":
var net string
switch loadedCfg.TestNet {
case true:
net = "testnet"
default:
net = "mainnet"
}
b, err = postgres.NewDB(loadedCfg.PostgresHost,
net,
loadedCfg.PostgresRootCert,
loadedCfg.PostgresCert,
loadedCfg.PostgresKey)
default:
err = fmt.Errorf("Unsupported backend type: %v", (*loadedCfg).Backend)
}
if err != nil {
return err
}
defer fs.Close()
defer b.Close()

return fs.Fsck(&backend.FsckOptions{
return b.Fsck(&backend.FsckOptions{
Verbose: *verbose,
PrintHashes: *printHashes,
Fix: *fix,
Expand Down
2 changes: 1 addition & 1 deletion cmd/dcrtime_timestamp/dcrtime_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func _main() error {
continue
}

// Try timestam second
// Try timestamp second
timestamp, err := time.Parse(fStr, a)
if err == nil {
fmt.Printf("%v\n", timestamp.Unix())
Expand Down
45 changes: 23 additions & 22 deletions dcrtimed/backend/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,33 +934,34 @@ func (fs *FileSystem) LastAnchor() (*backend.LastAnchorResult, error) {
var fr *backend.FlushRecord
var me backend.LastAnchorResult
payload, err := db.Get([]byte(flushedKey), nil)
if err == nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from:

if err == nil {

} else {

}

to:

if err != nil {

}

the else logic here...

fr, err = DecodeFlushRecord(payload)
if err != nil {
return &me, err
}
me.Tx = fr.Tx
if err != nil {
return &backend.LastAnchorResult{}, err
}

// Close db conection as we may
// write & update it
db.Close()
fr, err = DecodeFlushRecord(payload)
if err != nil {
return &me, err
}
me.Tx = fr.Tx

// Lookup anchored tx info,
// and update db if info changed.
txWalletInfo, err := fs.lazyFlush(flushedTs, fr)
// Close db conection as we may
// write & update it
db.Close()

// If no error, or no enough confirmations
// err continue, else return err.
if err != nil && err != errNotEnoughConfirmation {
return &backend.LastAnchorResult{}, err
}
me.ChainTimestamp = fr.ChainTimestamp
me.BlockHash = txWalletInfo.BlockHash.String()
me.BlockHeight = txWalletInfo.BlockHeight
return &me, nil
// Lookup anchored tx info,
// and update db if info changed.
txWalletInfo, err := fs.lazyFlush(flushedTs, fr)

// If no error, or no enough confirmations
// err continue, else return err.
if err != nil && err != errNotEnoughConfirmation {
return &backend.LastAnchorResult{}, err
}

return &backend.LastAnchorResult{}, err
me.ChainTimestamp = fr.ChainTimestamp
me.BlockHash = txWalletInfo.BlockHash.String()
me.BlockHeight = txWalletInfo.BlockHeight
return &me, nil
}

// GetBalance provides the balance of the wallet and satisfies the
Expand Down
2 changes: 1 addition & 1 deletion dcrtimed/backend/filesystem/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestGetDigests(t *testing.T) {
gr.ErrorCode != foundGlobal) {
t.Fatalf("invalid digest got %x want %x ErrorCode "+
"got %v want %v", gr.Digest[:], hashes[i][:],
gr.ErrorCode, foundLocal)
gr.ErrorCode, foundGlobal)
}
if i >= count && gr.ErrorCode != backend.ErrorNotFound {
t.Fatalf("invalid ErrorCode got %x want %x",
Expand Down
Loading