Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikandfor committed Jan 6, 2024
1 parent c4a7e8b commit 4e2875e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 4 deletions.
19 changes: 16 additions & 3 deletions batch3.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,22 @@ func (c *Controller[Res]) commit(ctx context.Context, err error) (Res, error) {
again:
if err != nil || atomic.LoadInt32(&c.queue) == 0 {
c.cnt = -c.cnt
c.ready = true

if ep, ok := err.(PanicError); ok {
c.err = err
c.ready = true
panic(ep.Panic)
} else if err != nil {
c.err = err
c.ready = true
} else {
func() {
var res Res
var err error

defer func() {
c.res, c.err = res, err
c.ready = true

if p := recover(); p != nil {
c.err = PanicError{Panic: p}
Expand All @@ -160,11 +162,15 @@ again:
}()
}
} else {
wait:
c.cond.Wait()

if !c.ready {
if c.cnt > 0 {
goto again
}
if !c.ready {
goto wait
}
}

res, err := c.res, c.err
Expand Down Expand Up @@ -260,8 +266,15 @@ func (b *Batch[Res]) Rollback(ctx context.Context, err error) (Res, error) {
return b.c.commit(ctx, err)
}

func AsPanicError(err error) (PanicError, bool) {
var pe PanicError

return pe, errors.As(err, &pe)
}

func (e PanicError) Error() string {
return fmt.Sprintf("panic: %v", e.Panic)
}

func (noCopy) Lock() {}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
42 changes: 42 additions & 0 deletions batch3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,48 @@ func TestBatch(tb *testing.T) {
})
}
})

tb.Run("okAfterAll", func(tb *testing.T) {
const N = 100

ctx := context.Background()

var sum int

b.Commit = func(ctx context.Context, _ int) (int, error) {
runtime.Gosched()
return sum, nil
}

var wg sync.WaitGroup

wg.Add(*jobs)

for j := 0; j < *jobs; j++ {
go func() {
defer wg.Done()

for i := 0; i < N; i++ {
func() {
b, idx := b.Enter(true)
defer b.Exit()

if idx == 0 {
sum = 0
}

runtime.Gosched()
sum++
runtime.Gosched()

_, _ = b.Commit(ctx)
}()
}
}()
}

wg.Wait()
})
}

func TestBatchNonBlocking(tb *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module nikand.dev/go/batch

go 1.18

retract v0.4.0 // has a bug
5 changes: 4 additions & 1 deletion multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func (c *Multi[Res]) Enter(blocking bool) (b Batch[Res], coach, index int) {
for coach := range c.cs {
b, idx := c.cs[coach].Enter(false)
if idx >= 0 {
return b, coach, idx
return Batch[Res]{
c: b.c,
state: b.state,
}, coach, idx
}
}

Expand Down

0 comments on commit 4e2875e

Please sign in to comment.