Skip to content

Commit

Permalink
fix(exporter): only commit messages after flushed successfully
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Dec 15, 2022
1 parent 832373b commit efed011
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
<-cx
err = e.onShutdown()
err = e.flushData()
if err != nil {
panic(err)
}
os.Exit(1)
}()
defer func() {
err = e.onShutdown()
err = e.flushData()
if err != nil {
panic(err)
}
Expand All @@ -76,11 +76,6 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
if err != nil {
return exportedCount, err
}
_, err = e.consumer.Commit()
if err != nil {
err = errors.Wrap(err, "Failed to commit messages")
return exportedCount, err
}
exportedCount++
log.Infof("Exported message: %v (Total: %d)", msg.TopicPartition, exportedCount)
if e.options != nil && exportedCount == e.options.Limit {
Expand All @@ -90,10 +85,15 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
}
}

func (e *Exporter) onShutdown() error {
func (e *Exporter) flushData() error {
err := e.writer.Flush()
if err != nil {
return errors.Wrap(err, "Failed to flush writer")
}
_, err = e.consumer.Commit()
if err != nil {
err = errors.Wrap(err, "Failed to commit messages")
return err
}
return nil
}

0 comments on commit efed011

Please sign in to comment.