Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import axon as an additional internal project, support JSONB fields in Changeset and expand demo #9

Merged
merged 33 commits into from
May 29, 2020

Conversation

13rac1
Copy link
Collaborator

@13rac1 13rac1 commented Apr 16, 2020

Demo run:

$ make demo
./scripts/setup_demo.sh
Creating warp-pipe_target_db_1 ... done
Creating warp-pipe_source_db_1 ... done
Creating warp-pipe_demo_service_1 ... done
db available
Setup Schema on Source
CREATE TABLE
Setup Schema on Target
CREATE TABLE
INFO[0000] registering trigger for table public.users   
INFO[0000] registering trigger for table public.pets    
Successfully created `warp_pipe` schema
Load the websocket demo app: http://localhost:8080/
Then press <Enter> to import SQL fixtures to see Warp Pipe Changesets get created...
Add data to Source
INSERT 0 5
Run axon to sync all changes. CTRL-C to exit when done.
INFO[0000] Starting notify listener for `warp_pipe_new_changeset`  component=listener
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: users}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: users}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: users}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: users}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: users}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: pets}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: pets}
2020/04/15 20:38:33 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: pets}
2020/04/15 20:38:34 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: pets}
2020/04/15 20:38:34 row inserted: {timestamp: 2020-04-15 20:38:33.157167 -0700 PDT, kind: insert, schema: public, table: pets}

perangel and others added 3 commits April 14, 2020 13:56
Axon is a separate project which should be integrated into Warp Pipe
JSON/B fields are stored in the NewValue ChangesetColumn as a
map[string]interface {} which the "sql" package does not support.
Solution: Attempt to convert the map to a string or Fatal.
@13rac1 13rac1 force-pushed the import-axon branch 2 times, most recently from dc71281 to ff9ed0f Compare April 16, 2020 03:45
}
log.Printf("row inserted: %s", change)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These may be too noisy. Perhaps should be a roll up reported every 10-30 seconds? See #1

13rac1 added 2 commits April 17, 2020 23:57
Resolves error from sql package:
unsupported type []interface {}, a slice of interface.
Resolves:
could not find name id in map[string]interface {}
@13rac1 13rac1 force-pushed the import-axon branch 2 times, most recently from 01f9c73 to 781fc4c Compare April 18, 2020 20:23
Copy link
Contributor

@bernielomax bernielomax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good to me, plus it has already been proven. Just some minor comments.

// TODO: More thorough version checks
major, err := strconv.Atoi(version[0])
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these could be panics, or have some context added to the messages.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. There's a lot of UX to add. I'll add this as a todo.

cmd/axon/main.go Outdated Show resolved Hide resolved

for _, r := range rows {
colDefault := strings.Split(r.ColumnDefault, "'")
sequenceName := colDefault[1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im guessing it is safe not to check the length?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of additional error checking needed in this entire codebase, but, yes, unless there is a database is broken this should never fail.

cmd/axon/sql.go Outdated Show resolved Hide resolved
13rac1 added 5 commits May 8, 2020 16:08
Sequence values are not updated when a row is inserted including the
sequenced value, often the ID.

Checks for sequenced columns on startup and then runs setval() after
each insert to manually update the value for that column.
InsertRow needs to load data from the sourceDB in the next commit
There is no way to detect sequence updates directly, so we'll assume
they are changed on INSERT and update the orphaned sequences.
Handles: unsupported type []interface {}, a slice of interface
when the array is not empty
Copy link
Contributor

@bernielomax bernielomax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

cmd/axon/main.go Show resolved Hide resolved
var primaryKeys = make(map[string][]string)

// maps serial key columns by table
var columnSequences = make(map[string]string)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps rename this since it read as if its the order of the columns in a table, and not the colums that are of a sequence type. maybe something like sequenceColumns instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped variable name

cmd/axon/schema.go Show resolved Hide resolved
err := conn.Select(&rows, `
SELECT
t.table_name,
string_to_array(string_agg(c.column_name, ','), ',') AS primary_key
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I wrote this, but iirc this query is slightly different for other versions of PG (older, I believe). Just making note because i can't remember if its the meta-data tables or the method for going making the array that I had to modify at some point.

return "", false
}

func updateColumnSequence(conn *sqlx.DB, table string, columns []*warppipe.ChangesetColumn) error {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pluralize the name since it takes multiple columns s/updateColumnSequence/updateColumnSequences/g

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil
}

// loadUnconnectedSequences loads all sequences in the source database not
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring and function name do not match. Should be loadOrphanSequences.

for _, sequenceName := range orphanSequences {
var lastVal int64 // PG bigint is 8bit

err := sourceDB.QueryRow(`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could also do

err := sourceDB.Get(&lastVal, "SELECT last_value FROM " + sequenceName)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless the target pointer needs to be a struct, in which case nvm what I said :P

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah your version is cleaner, switching

}

var setVal string
err = targetDB.QueryRow(`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here for all the other QueryRow calls

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cmd/axon/sql.go Outdated
warppipe "github.com/perangel/warp-pipe"
)

var regexOneSpace = regexp.MustCompile(`\s+`)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/regexOneSpace/regexSpace/g since this regex is technically 1 or more spaces

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

db/schema.go Outdated
log.Printf("%v+", pgErr)
}
// Failure case resolution:
// GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO master;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is RDS specific, please remove this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@perangel perangel merged commit 49f5fe5 into perangel:master May 29, 2020
13rac1 pushed a commit to 13rac1/warp-pipe that referenced this pull request May 29, 2020
Import axon as an additional internal project, support JSONB fields in Changeset and expand demo
@13rac1
Copy link
Collaborator Author

13rac1 commented May 29, 2020

fixups have been rebased out. Merge commit changed to 02f4352

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants