Skip to content

Commit

Permalink
NEOS-1747, NEOS-1754: Updates to on conflict and other async sql mechs (
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Feb 17, 2025
1 parent aef64bb commit 486375f
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export default function MssqlDBDestinationOptionsForm(
<FormItemContainer>
<FormHeader
title="Batch Count"
description="The max allowed per batch before flushing to S3. 0 to disable count-based batching."
description="The max allowed per batch before flushing to the database. 0 to disable count-based batching."
isErrored={!!errors?.batch?.count}
/>
<FormInputContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export default function MysqlDBDestinationOptionsForm(
<FormItemContainer>
<FormHeader
title="Batch Count"
description="The max allowed per batch before flushing to S3. 0 to disable count-based batching."
description="The max allowed per batch before flushing to the database. 0 to disable count-based batching."
isErrored={!!errors?.batch?.count}
/>
<FormInputContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export default function PostgresDBDestinationOptionsForm(
<FormItemContainer>
<FormHeader
title="Batch Count"
description="The max allowed per batch before flushing to S3. 0 to disable count-based batching."
description="The max allowed per batch before flushing to the database. 0 to disable count-based batching."
isErrored={!!errors?.batch?.count}
/>
<FormInputContainer>
Expand Down
4 changes: 2 additions & 2 deletions frontend/apps/web/yup-validations/transformer-validations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ const generateInt64Config = Yup.object().shape({

const generateStringPhoneNumberConfig = Yup.object().shape({
min: getBigIntValidator({
range: [8, 12],
range: [8, 14],
}).test(
'is-less-than-or-equal-to-max',
'Min must be less than or equal to Max',
Expand All @@ -158,7 +158,7 @@ const generateStringPhoneNumberConfig = Yup.object().shape({
}
),
max: getBigIntValidator({
range: [8, 12],
range: [8, 14],
}).test(
'is-greater-than-or-equal-to-min',
'Max must be greater than or equal to Min',
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
github.com/Blank-Xu/sql-adapter v1.1.1
github.com/Jeffail/gabs/v2 v2.7.0
github.com/Jeffail/shutdown v1.0.0
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/auth0/go-auth0 v1.14.0
github.com/auth0/go-jwt-middleware/v2 v2.2.2
Expand Down Expand Up @@ -137,6 +136,7 @@ require (
github.com/IBM/sarama v1.43.3 // indirect
github.com/Jeffail/checkpoint v1.0.1 // indirect
github.com/Jeffail/grok v1.1.0 // indirect
github.com/Jeffail/shutdown v1.0.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Masterminds/squirrel v1.5.4 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
Expand Down
62 changes: 16 additions & 46 deletions worker/pkg/benthos/sql/input_sql_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"sync"

"github.com/Jeffail/shutdown"

mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql"
continuation_token "github.com/nucleuscloud/neosync/internal/continuation-token"
database_record_mapper "github.com/nucleuscloud/neosync/internal/database-record-mapper"
Expand Down Expand Up @@ -63,8 +61,6 @@ type pooledInput struct {

recordMapper record_mapper_builder.DatabaseRecordMapper[any]

shutSig *shutdown.Signaller

stopActivityChannel chan<- error
onHasMorePages OnHasMorePagesFn
expectedTotalRows *int
Expand Down Expand Up @@ -127,7 +123,6 @@ func newInput(

return &pooledInput{
logger: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
connectionId: connectionId,
driver: driver,
queryStatic: queryStatic,
Expand All @@ -150,13 +145,9 @@ func (s *pooledInput) Connect(ctx context.Context) error {
s.dbMut.Lock()
defer s.dbMut.Unlock()

if s.db != nil {
return nil
}

db, err := s.provider.GetDb(ctx, s.connectionId)
if err != nil {
return nil
return err
}
s.db = db
s.logger.Debug(fmt.Sprintf("connected to database %s", s.connectionId))
Expand Down Expand Up @@ -202,20 +193,6 @@ func (s *pooledInput) Connect(ctx context.Context) error {

s.rows = rows
s.rowsRead = 0
go func() {
<-s.shutSig.HardStopChan()

s.dbMut.Lock()
if s.rows != nil {
_ = s.rows.Close()
s.rows = nil
}
// not closing the connection here as that is managed by an outside force
s.db = nil
s.dbMut.Unlock()

s.shutSig.TriggerHasStopped()
}()
return nil
}

Expand All @@ -239,19 +216,18 @@ func (s *pooledInput) Read(ctx context.Context) (*service.Message, service.AckFu
}
if !s.rows.Next() {
// Check if any error occurred.
if err := s.rows.Err(); err != nil {
_ = s.rows.Close()
s.rows = nil
rows := s.rows
s.rows = nil
_ = rows.Close()
if err := rows.Err(); err != nil {
return nil, nil, err
}
// For non-Postgres drivers, simply close and return EndOfInput
_ = s.rows.Close()
s.rows = nil

if s.expectedTotalRows != nil && s.onHasMorePages != nil && len(s.orderByColumns) > 0 {
// emit order by column values if ok
s.logger.Debug(fmt.Sprintf("[ROW END] rows read: %d, expected total rows: %d", s.rowsRead, *s.expectedTotalRows))
if s.rowsRead >= *s.expectedTotalRows {
s.logger.Debug("[ROW END] emitting order by column values")
s.logger.Debug("[ROW END] emitting onHasMorePages as rows read >= expected total rows")
s.onHasMorePages(s.lastReadOrderValues)
}
}
Expand Down Expand Up @@ -288,35 +264,29 @@ func (s *pooledInput) Read(ctx context.Context) (*service.Message, service.AckFu
return msg, emptyAck, nil
}

// emptyAck is a no-op ack function
// Original benthos input returns this, which causes downstream messages to continually be retried.
// Not sure we really want to ever enable this though because it's not clear how to handle the case in our system today.
// Also our broker will retry messages endlessly until we enable the escape hatch (critical errors).
//
// // return service.AutoRetryNacksToggled(conf, input)
func emptyAck(ctx context.Context, err error) error {
// Nacks are handled by AutoRetryNacks because we don't have an explicit
// ack mechanism right now.
return nil
}

func (s *pooledInput) Close(ctx context.Context) error {
s.shutSig.TriggerHardStop()
s.dbMut.Lock()

isNil := s.db == nil
if isNil {
s.dbMut.Unlock()
return nil
}
defer s.dbMut.Unlock()

if s.rows != nil {
_ = s.rows.Close()
s.rows = nil
}

s.db = nil // not closing here since it's managed by the pool

s.dbMut.Unlock()

select {
case <-s.shutSig.HasStoppedChan():
case <-ctx.Done():
return ctx.Err()
if s.db != nil {
s.db = nil // not closing here since it's managed by the pool
}
return nil
}
62 changes: 26 additions & 36 deletions worker/pkg/benthos/sql/output_sql_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"sync"

"github.com/Jeffail/shutdown"
_ "github.com/doug-martin/goqu/v9/dialect/mysql"
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql"
Expand Down Expand Up @@ -71,9 +70,7 @@ type pooledInsertOutput struct {
onConflictDoNothing bool
skipForeignKeyViolations bool
queryBuilder querybuilder.InsertQueryBuilder

shutSig *shutdown.Signaller
isRetry bool
isRetry bool
}

func newInsertOutput(conf *service.ParsedConfig, mgr *service.Resources, provider ConnectionProvider, isRetry bool, logger *slog.Logger) (*pooledInsertOutput, error) {
Expand Down Expand Up @@ -145,11 +142,10 @@ func newInsertOutput(conf *service.ParsedConfig, mgr *service.Resources, provide
querybuilder.WithSuffix(suffix),
}

if onConflictDoNothing || isRetry {
options = append(options, querybuilder.WithOnConflictDoNothing())
}
if onConflictDoUpdate {
options = append(options, querybuilder.WithOnConflictDoUpdate(primaryKeyColumns))
} else if onConflictDoNothing || isRetry {
options = append(options, querybuilder.WithOnConflictDoNothing())
}
if shouldOverrideColumnDefault {
options = append(options, querybuilder.WithShouldOverrideColumnDefault())
Expand All @@ -170,7 +166,6 @@ func newInsertOutput(conf *service.ParsedConfig, mgr *service.Resources, provide
driver: driver,
logger: mgr.Logger(),
slogger: logger,
shutSig: shutdown.NewSignaller(),
provider: provider,
schema: schema,
table: table,
Expand All @@ -195,22 +190,16 @@ func (s *pooledInsertOutput) Connect(ctx context.Context) error {
return err
}
s.db = db

go func() {
<-s.shutSig.HardStopChan()

s.dbMut.Lock()
// not closing the connection here as that is managed by an outside force
s.db = nil
s.dbMut.Unlock()

s.shutSig.TriggerHasStopped()
}()
return nil
}

func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
s.dbMut.RLock()
if s.db == nil {
s.dbMut.RUnlock()
return service.ErrNotConnected
}
db := s.db
defer s.dbMut.RUnlock()

batchLen := len(batch)
Expand All @@ -237,26 +226,28 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa
if err != nil {
return fmt.Errorf("failed to build insert query: %w", err)
}

if _, err := s.db.ExecContext(ctx, insertQuery, args...); err != nil {
if _, err := db.ExecContext(ctx, insertQuery, args...); err != nil {
shouldRetry := neosync_benthos.ShouldRetryInsert(err.Error(), s.skipForeignKeyViolations)
if !shouldRetry {
return fmt.Errorf("failed to execute insert query: %w", err)
}
s.logger.Infof("received error during batch write that is retryable, proceeding with row by row insert: %s", err.Error())

err = s.RetryInsertRowByRow(ctx, s.queryBuilder, rows)
err = retryInsertRowByRow(ctx, db, s.queryBuilder, rows, s.skipForeignKeyViolations, s.logger)
if err != nil {
return fmt.Errorf("failed to retry insert query: %w", err)
}
}
return nil
}

func (s *pooledInsertOutput) RetryInsertRowByRow(
func retryInsertRowByRow(
ctx context.Context,
db mysql_queries.DBTX,
builder querybuilder.InsertQueryBuilder,
rows []map[string]any,
skipForeignKeyViolations bool,
logger *service.Logger,
) error {
fkErrorCount := 0
otherErrorCount := 0
Expand All @@ -266,37 +257,36 @@ func (s *pooledInsertOutput) RetryInsertRowByRow(
if err != nil {
return err
}
_, err = s.db.ExecContext(ctx, insertQuery, args...)
_, err = db.ExecContext(ctx, insertQuery, args...)
if err != nil {
if !neosync_benthos.ShouldRetryInsert(err.Error(), s.skipForeignKeyViolations) {
if !neosync_benthos.ShouldRetryInsert(err.Error(), skipForeignKeyViolations) {
return fmt.Errorf("failed to retry insert query: %w", err)
} else if neosync_benthos.IsForeignKeyViolationError(err.Error()) {
fkErrorCount++
} else {
otherErrorCount++
s.logger.Warnf("received retryable error during row by row insert. skipping row: %s", err.Error())
logger.Warnf("received retryable error during row by row insert. skipping row: %s", err.Error())
}
}
if err == nil {
insertCount++
}
}
s.logger.Infof("Completed batch insert with %d foreign key violations. Total Skipped rows: %d, Successfully inserted: %d", fkErrorCount, otherErrorCount, insertCount)
logger.Infof("Completed row-by-row insert with %d foreign key violations. Total Skipped rows: %d, Successfully inserted: %d", fkErrorCount, otherErrorCount, insertCount)
return nil
}

func (s *pooledInsertOutput) Close(ctx context.Context) error {
s.shutSig.TriggerHardStop()
s.dbMut.RLock()
isNil := s.db == nil
s.dbMut.RUnlock()
if isNil {
if s.db == nil {
s.dbMut.RUnlock()
return nil
}
select {
case <-s.shutSig.HasStoppedChan():
case <-ctx.Done():
return ctx.Err()
}
s.dbMut.RUnlock()

// Take write lock to null out the connection
s.dbMut.Lock()
s.db = nil
s.dbMut.Unlock()
return nil
}
Loading

0 comments on commit 486375f

Please sign in to comment.