Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import axon as an additional internal project, support JSONB fields in Changeset and expand demo #9

Merged
merged 33 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
efdc763
chore: Import axon from commit 553527c
perangel Apr 14, 2020
458d5a7
chore: Add axon dependencies
13rac1 Apr 14, 2020
f06083a
feat: Support JSON/B fields in Changesets
13rac1 Apr 16, 2020
ee11232
feat: Expand Demo to sync data to target DB with axon
13rac1 Apr 16, 2020
9f23ed3
feat: Additional detail to SQL error messages
13rac1 Apr 16, 2020
6198f5e
feat: Block Target DB versions less than 9.5
13rac1 Apr 16, 2020
9e989ac
feat: Support empty array/slice fields in Changeset
13rac1 Apr 16, 2020
f570d62
fix: prepareDeleteQuery will find query args in changeset.OldValues
13rac1 Apr 16, 2020
b375b66
feat: Update SERIAL sequence after INSERT queries
13rac1 Apr 18, 2020
885aeb5
feat: Add sourceDB connection and differentiate targetDB
13rac1 Apr 20, 2020
8f94419
feat: Update orphaned sequences on every INSERT
13rac1 Apr 21, 2020
0a73e83
feat: Changeset count on start, move Signal handler, print args on err
13rac1 Apr 22, 2020
c3ad1af
fix: Handle changesets including arrays of strings
13rac1 Apr 22, 2020
dc2d02a
fixup! feat: Changeset count on start, move Signal handler, print arg…
13rac1 May 27, 2020
fce1fde
fixup! feat: Update SERIAL sequence after INSERT queries
13rac1 May 27, 2020
0f5da22
fixup! feat: Block Target DB versions less than 9.5
13rac1 May 27, 2020
3b5cf68
fixup! feat: Block Target DB versions less than 9.5
13rac1 May 27, 2020
1aa6b53
fixup! feat: Changeset count on start, move Signal handler, print arg…
13rac1 May 27, 2020
c7d7a3c
fixup! feat: Update SERIAL sequence after INSERT queries
13rac1 May 27, 2020
201c16f
fixup! feat: Update SERIAL sequence after INSERT queries
13rac1 May 27, 2020
aa3e175
fixup! feat: Update SERIAL sequence after INSERT queries
13rac1 May 27, 2020
7d186ba
fixup! feat: Update SERIAL sequence after INSERT queries
13rac1 May 27, 2020
8c6f48e
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
734ea16
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
a7e5fd2
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
438f6ad
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
3f7044e
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
343361c
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
145d83a
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
e51e722
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
808fc72
fixup! feat: Additional detail to SQL error messages
13rac1 May 27, 2020
0f22315
fixup! feat: Additional detail to SQL error messages
13rac1 May 27, 2020
7dc5d18
fixup! feat: Update orphaned sequences on every INSERT
13rac1 May 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package warppipe
import (
"strings"
"time"

"fmt"
)

// ChangesetKind is the type for changeset kinds
Expand Down Expand Up @@ -50,6 +52,14 @@ func (c *Changeset) getColumnValue(values []*ChangesetColumn, column string) (in
return nil, false
}

// String implements Stringer to create a useful string representation of a Changeset.
func (c *Changeset) String() string {
// While c.newValues is an ordered array, the original data source is a JSON
// object used as a hashmap, therefore the data is stored as a map in Go
// which means the field order in the array is randomized.
return fmt.Sprintf("{timestamp: %s, kind: %s, schema: %s, table: %s}", c.Timestamp, c.Kind, c.Schema, c.Table)
}

// GetNewColumnValue returns the current value for a column and a bool denoting
// whether a new value is present in the changeset.
func (c *Changeset) GetNewColumnValue(column string) (interface{}, bool) {
Expand Down
27 changes: 27 additions & 0 deletions cmd/axon/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# axon

_NOTE: This is still a work-in-progress._

A tool for applying warp-pipe changesets across database replicas

## Encrypting an RDS instance

1. Install `warp-pipe`
- `go install github.com/perangel/warp-pipe/...`

2. Prepare the source database
- `warp-pipe setup-db -H <DB_HOST> -p <DB_PORT> -U <DB_USER> -d <DB_NAME> -P <DB_PASS>`

3. Snapshot the source database
- *NOTE:* You must run #2 before taking the snapshot

4. Copy the snapshot with encryption

5. Create a new instance from snapshot
- see: source database for configuration

6. Run `axon`

7. Validate new database state

8. Stop `axon`
19 changes: 19 additions & 0 deletions cmd/axon/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

// Config store configuration for axon
type Config struct {
// source db credentials
SourceDBHost string `envconfig:"source_db_host"`
SourceDBPort int `envconfig:"source_db_port"`
SourceDBName string `envconfig:"source_db_name"`
SourceDBUser string `envconfig:"source_db_user"`
SourceDBPass string `envconfig:"source_db_pass"`

// target db credentials
TargetDBHost string `envconfig:"target_db_host"`
TargetDBPort int `envconfig:"target_db_port"`
TargetDBName string `envconfig:"target_db_name"`
TargetDBUser string `envconfig:"target_db_user"`
TargetDBPass string `envconfig:"target_db_pass"`
TargetDBSchema string `envconfig:"target_db_schema" default:"public"`
}
19 changes: 19 additions & 0 deletions cmd/axon/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: '3'

services:
source_db:
image: postgres:9.5-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=user
- POSTGRES_DB=user
target_db:
image: postgres:11-alpine
ports:
- "6432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=user
- POSTGRES_DB=user
148 changes: 148 additions & 0 deletions cmd/axon/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/jmoiron/sqlx"
"github.com/kelseyhightower/envconfig"
_ "github.com/lib/pq"
warppipe "github.com/perangel/warp-pipe"
"github.com/sirupsen/logrus"
)

var (
logger *logrus.Logger
)

func init() {
logger = logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
}

func getDBConnString(host string, port int, name, user, pass string) string {
return fmt.Sprintf("user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
user,
pass,
name,
host,
port,
"disable",
)
}

func parseConfig() *Config {
var config Config
err := envconfig.Process("axon", &config)
if err != nil {
logger.WithError(err).Fatal("failed to process environment config")
}
return &config
}

func main() {
config := parseConfig()
shutdownCh := make(chan os.Signal)
signal.Notify(shutdownCh, os.Interrupt, syscall.SIGTERM)

targetDBConn, err := sqlx.Open("postgres", getDBConnString(
config.TargetDBHost,
config.TargetDBPort,
config.TargetDBName,
config.TargetDBUser,
config.TargetDBPass,
))
if err != nil {
logger.WithError(err).Fatal("unable to connect to target database")
}

err = loadPrimaryKeys(targetDBConn)
if err != nil {
logger.WithError(err).Fatal("unable to load target DB primary keys")
}

// create a notify listener and start from changeset id 1
listener := warppipe.NewNotifyListener(warppipe.StartFromID(0))
wp := warppipe.NewWarpPipe(listener)
err = wp.Open(&warppipe.DBConfig{
Host: config.SourceDBHost,
Port: config.SourceDBPort,
Database: config.SourceDBName,
User: config.SourceDBUser,
Password: config.SourceDBPass,
})
if err != nil {
logger.WithError(err).
WithField("component", "warp_pipe").
Fatal("unable to connect to source database")
}

ctx, cancel := context.WithCancel(context.Background())
changes, errs := wp.ListenForChanges(ctx)

for {
select {
case change := <-changes:
processChange(targetDBConn, config.TargetDBSchema, change)
case err := <-errs:
logger.WithError(err).
WithField("component", "warp_pipe").
Error("received an error")
case <-shutdownCh:
logger.Info("shutting down...")
cancel()
wp.Close()
return
}
}
}

func processChange(conn *sqlx.DB, schema string, change *warppipe.Changeset) {
switch change.Kind {
case warppipe.ChangesetKindInsert:
processInsert(conn, schema, change)
case warppipe.ChangesetKindUpdate:
processUpdate(conn, schema, change)
case warppipe.ChangesetKindDelete:
processDelete(conn, schema, change)
}
}

func processDelete(conn *sqlx.DB, schema string, change *warppipe.Changeset) {
pk, err := getPrimaryKeyForChange(change)
if err != nil {
logger.WithError(err).WithField("table", change.Table).
Errorf("Error: unable to process DELETE for table '%s', changeset has no primary key", change.Table)
}

err = deleteRow(conn, schema, change, pk)
if err != nil {
logger.WithError(err).WithField("table", change.Table).
Errorf("Error: failed to DELETE row for table '%s' (pk: %s)", change.Table, pk)
}
}

func processInsert(conn *sqlx.DB, schema string, change *warppipe.Changeset) {
err := insertRow(conn, schema, change)
if err != nil {
logger.WithError(err).WithField("table", change.Table).
Errorf("Error: failed to INSERT row for table '%s'", change.Table)
}
}

func processUpdate(conn *sqlx.DB, schema string, change *warppipe.Changeset) {
pk, err := getPrimaryKeyForChange(change)
if err != nil {
logger.WithError(err).WithField("table", change.Table).
Errorf("Error: unable to process UPDATE for table '%s', changeset has no primary key", change.Table)
}

err = updateRow(conn, schema, change, pk)
if err != nil {
logger.WithError(err).WithField("table", change.Table).
Errorf("Error: failed to UPDATE row for table '%s' (pk: %s)", change.Table, pk)
}
}
47 changes: 47 additions & 0 deletions cmd/axon/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"fmt"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
warppipe "github.com/perangel/warp-pipe"
)

// maps primary key columns by table
var primaryKeys = make(map[string][]string)

func loadPrimaryKeys(conn *sqlx.DB) error {
var rows []struct {
TableName string `db:"table_name"`
PrimaryKey pq.StringArray `db:"primary_key"`
}
err := conn.Select(&rows, `
SELECT
t.table_name,
string_to_array(string_agg(c.column_name, ','), ',') AS primary_key
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I wrote this, but iirc this query is slightly different for other versions of PG (older, I believe). Just making note because i can't remember if its the meta-data tables or the method for going making the array that I had to modify at some point.

FROM information_schema.key_column_usage AS c
LEFT JOIN information_schema.table_constraints AS t
ON c.constraint_name = t.constraint_name
AND t.constraint_type = 'PRIMARY KEY'
WHERE t.table_name != ''
GROUP BY t.table_name`,
)
if err != nil {
return err
}

for _, r := range rows {
primaryKeys[r.TableName] = r.PrimaryKey
}

return nil
}

func getPrimaryKeyForChange(change *warppipe.Changeset) ([]string, error) {
col, ok := primaryKeys[change.Table]
if !ok {
return nil, fmt.Errorf("no primary key in mapping for table `%s`", change.Table)
}
return col, nil
}
Loading