Skip to content

Commit

Permalink
feature: Implement support for DML
Browse files Browse the repository at this point in the history
  • Loading branch information
jtwatson committed Nov 29, 2024
1 parent 4574170 commit c60aa87
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 57 deletions.
10 changes: 2 additions & 8 deletions database/spanner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,11 @@ as described in [README.md#database-urls](../../README.md#database-urls)
> 1496601752/u add_index_on_user_emails (2m12.155787369s)
> 1496602638/u create_books_table (2m30.77299181s)
## DDL with comments
## DDL & DML with comments
At the moment the GCP Spanner backed does not seem to allow for comments (See https://issuetracker.google.com/issues/159730604)
so in order to be able to use migration with DDL containing comments `x-clean-statements` is required
## Multiple statements
In order to be able to use more than 1 DDL statement in the same migration file, the file has to be parsed and therefore the `x-clean-statements` flag is required
## Testing
To unit test the `spanner` driver, `SPANNER_DATABASE` needs to be set. You'll
need to sign-up to Google Cloud Platform (GCP) and have a running Spanner
instance since it is not possible to run Google Spanner outside GCP.
In order to be able to use more than 1 DDL or DML statement in the same migration file, the file has to be parsed and therefore the `x-clean-statements` flag is required
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE Users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create a table
CREATE TABLE Users (
UserId INT64,
Name STRING(40),
Email STRING(83)
) PRIMARY KEY(UserId /* even inline comments */);

CREATE UNIQUE INDEX UsersEmailIndex ON Users (Email);

-- Comments are okay

INSERT INTO Users(UserId, Name, Email)
VALUES
(100, "Username", "[email protected]"),
(200, "Username2", "[email protected]");
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Users DROP COLUMN city;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Users ADD COLUMN city STRING(100);
149 changes: 125 additions & 24 deletions database/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (

"cloud.google.com/go/spanner"
sdb "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/spansql"

"github.com/cloudspannerecosystem/memefish"
"github.com/cloudspannerecosystem/memefish/token"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"

Expand Down Expand Up @@ -60,11 +61,9 @@ type Config struct {

// Spanner implements database.Driver for Google Cloud Spanner
type Spanner struct {
db *DB

db *DB
config *Config

lock *uatomic.Uint32
lock *uatomic.Uint32
}

type DB struct {
Expand Down Expand Up @@ -179,26 +178,65 @@ func (s *Spanner) Run(migration io.Reader) error {
return err
}

stmts := []string{string(migr)}
if s.config.CleanStatements {
stmts, err = cleanStatements(migr)
if err != nil {
return err
ctx := context.Background()

if !s.config.CleanStatements {
return s.runDdl(ctx, []string{string(migr)})
}

stmtGroups, err := statementGroups(migr)
if err != nil {
return err
}

for _, group := range stmtGroups {
switch group.typ {
case statementTypeDDL:
if err := s.runDdl(ctx, group.stmts); err != nil {
return err
}
case statementTypeDML:
if err := s.runDml(ctx, group.stmts); err != nil {
return err
}
default:
return fmt.Errorf("unknown statement type: %s", group.typ)
}
}

ctx := context.Background()
return nil
}

func (s *Spanner) runDdl(ctx context.Context, stmts []string) error {
op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: s.config.DatabaseName,
Statements: stmts,
})

if err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

if err := op.Wait(ctx); err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

return nil
}

func (s *Spanner) runDml(ctx context.Context, stmts []string) error {
_, err := s.db.data.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
for _, s := range stmts {
_, err := txn.Update(ctx, spanner.Statement{SQL: s})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

return nil
Expand Down Expand Up @@ -345,17 +383,80 @@ func (s *Spanner) ensureVersionTable() (err error) {
return nil
}

func cleanStatements(migration []byte) ([]string, error) {
// The Spanner GCP backend does not yet support comments for the UpdateDatabaseDdl RPC
// (see https://issuetracker.google.com/issues/159730604) we use
// spansql to parse the DDL and output valid stamements without comments
ddl, err := spansql.ParseDDL("", string(migration))
if err != nil {
return nil, err
type statementType string

const (
statementTypeUnknown statementType = ""
statementTypeDDL statementType = "DDL"
statementTypeDML statementType = "DML"
)

type statementGroup struct {
typ statementType
stmts []string
}

func statementGroups(migr []byte) (groups []*statementGroup, err error) {
lex := &memefish.Lexer{
File: &token.File{Buffer: string(migr)},
}
stmts := make([]string, 0, len(ddl.List))
for _, stmt := range ddl.List {
stmts = append(stmts, stmt.SQL())

group := &statementGroup{}
var stmtTyp statementType
var stmt strings.Builder
for {
if err := lex.NextToken(); err != nil {
return nil, err
}

if stmtTyp == statementTypeUnknown {
switch {
case lex.Token.IsKeywordLike("INSERT") || lex.Token.IsKeywordLike("DELETE") || lex.Token.IsKeywordLike("UPDATE"):
stmtTyp = statementTypeDML
default:
stmtTyp = statementTypeDDL
}
if group.typ != stmtTyp {
if len(group.stmts) > 0 {
groups = append(groups, group)
}
group = &statementGroup{typ: stmtTyp}
}
}

if lex.Token.Kind == token.TokenEOF || lex.Token.Kind == ";" {
if stmt.Len() > 0 {
group.stmts = append(group.stmts, stmt.String())
}
stmtTyp = statementTypeUnknown
stmt.Reset()

if lex.Token.Kind == token.TokenEOF {
if len(group.stmts) > 0 {
groups = append(groups, group)
}

break
}

continue
}

if len(lex.Token.Comments) > 0 {
// preserve newline where comments are removed
if _, err := stmt.WriteString("\n"); err != nil {
return nil, err
}
}
if stmt.Len() > 0 {
if _, err := stmt.WriteString(lex.Token.Space); err != nil {
return nil, err
}
}
if _, err := stmt.WriteString(lex.Token.Raw); err != nil {
return nil, err
}
}
return stmts, nil

return groups, nil
}
Loading

0 comments on commit c60aa87

Please sign in to comment.