Skip to content

Commit

Permalink
remove removecallback from interface (viamrobotics#3893)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviamiller authored May 6, 2024
1 parent a543fe9 commit 92e400a
Show file tree
Hide file tree
Showing 20 changed files with 55 additions and 168 deletions.
3 changes: 0 additions & 3 deletions components/board/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ func (dic *digitalInterruptClient) Name() string {
return dic.digitalInterruptName
}

func (dic *digitalInterruptClient) RemoveCallback(ch chan Tick) {
}

func (c *client) StreamTicks(ctx context.Context, interrupts []DigitalInterrupt, ch chan Tick,
extra map[string]interface{},
) error {
Expand Down
9 changes: 3 additions & 6 deletions components/board/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ type Tick struct {
// A DigitalInterrupt represents a configured interrupt on the board that
// when interrupted, calls the added callbacks.
type DigitalInterrupt interface {
// Name returns the name of the interrupt.
Name() string

// Value returns the current value of the interrupt which is
// based on the type of interrupt.
Value(ctx context.Context, extra map[string]interface{}) (int64, error)

// RemoveCallback removes a listener for interrupts.
RemoveCallback(c chan Tick)

// Name returns the name of the interrupt.
Name() string
}
4 changes: 0 additions & 4 deletions components/board/fake/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,6 @@ func (s *DigitalInterrupt) reset(conf board.DigitalInterruptConfig) {
s.conf = conf
}

// RemoveCallback removes a listener for interrupts.
func (s *DigitalInterrupt) RemoveCallback(c chan board.Tick) {
}

// Value returns the current value of the interrupt which is
// based on the type of interrupt.
func (s *DigitalInterrupt) Value(ctx context.Context, extra map[string]interface{}) (int64, error) {
Expand Down
18 changes: 16 additions & 2 deletions components/board/genericlinux/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
pb "go.viam.com/api/component/board/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils"

"go.viam.com/rdk/components/board"
"go.viam.com/rdk/components/board/genericlinux/buses"
Expand Down Expand Up @@ -368,7 +368,7 @@ func (a *wrappedAnalogReader) reset(ctx context.Context, chipSelect string, read
a.mu.Lock()
defer a.mu.Unlock()
if a.reader != nil {
goutils.UncheckedError(a.reader.Close(ctx))
utils.UncheckedError(a.reader.Close(ctx))
}
a.reader = reader
a.chipSelect = chipSelect
Expand Down Expand Up @@ -499,6 +499,20 @@ func (b *Board) StreamTicks(ctx context.Context, interrupts []board.DigitalInter
for _, i := range interrupts {
pinwrappers.AddCallback(i.(*pinwrappers.BasicDigitalInterrupt), ch)
}

b.activeBackgroundWorkers.Add(1)

utils.ManagedGo(func() {
// Wait until it's time to shut down then remove callbacks.
select {
case <-ctx.Done():
case <-b.cancelCtx.Done():
}
for _, i := range interrupts {
pinwrappers.RemoveCallback(i.(*pinwrappers.BasicDigitalInterrupt), ch)
}
}, b.activeBackgroundWorkers.Done)

return nil
}

Expand Down
32 changes: 0 additions & 32 deletions components/board/pi/impl/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,6 @@ func (i *BasicDigitalInterrupt) Value(ctx context.Context, extra map[string]inte
return count, nil
}

// Ticks is really just for testing.
func (i *BasicDigitalInterrupt) Ticks(ctx context.Context, num int, now uint64) error {
for x := 0; x < num; x++ {
if err := Tick(ctx, i, true, now+uint64(x)); err != nil {
return err
}
}
return nil
}

// Tick records an interrupt and notifies any interested callbacks. See comment on
// the DigitalInterrupt interface for caveats.
func Tick(ctx context.Context, i *BasicDigitalInterrupt, high bool, nanoseconds uint64) error {
Expand Down Expand Up @@ -136,21 +126,6 @@ func RemoveCallback(i *BasicDigitalInterrupt, c chan board.Tick) {
}
}

// RemoveCallback removes a listener for interrupts.
func (i *BasicDigitalInterrupt) RemoveCallback(c chan board.Tick) {
i.mu.Lock()
defer i.mu.Unlock()
for id := range i.callbacks {
if i.callbacks[id] == c {
// To remove this item, we replace it with the last item in the list, then truncate the
// list by 1.
i.callbacks[id] = i.callbacks[len(i.callbacks)-1]
i.callbacks = i.callbacks[:len(i.callbacks)-1]
break
}
}
}

// Name returns the name of the interrupt.
func (i *BasicDigitalInterrupt) Name() string {
i.mu.Lock()
Expand Down Expand Up @@ -207,13 +182,6 @@ func ServoTick(ctx context.Context, i *ServoDigitalInterrupt, high bool, now uin
return nil
}

// RemoveCallback currently panics.
func (i *ServoDigitalInterrupt) RemoveCallback(c chan board.Tick) {
i.mu.Lock()
defer i.mu.Unlock()
panic("servos can't have callback")
}

// Name returns the name of the interrupt.
func (i *ServoDigitalInterrupt) Name() string {
i.mu.Lock()
Expand Down
14 changes: 7 additions & 7 deletions components/board/pi/impl/digital_interrupts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBasicDigitalInterrupt1(t *testing.T) {
test.That(t, intVal, test.ShouldEqual, int64(1))

c := make(chan board.Tick)
AddCallback(i.(*BasicDigitalInterrupt), c)
AddCallback(basicInterrupt, c)

timeNanoSec := nowNanosecondsTest()
go func() { Tick(context.Background(), basicInterrupt, true, timeNanoSec) }()
Expand All @@ -55,10 +55,10 @@ func TestBasicDigitalInterrupt1(t *testing.T) {
test.That(t, v.High, test.ShouldBeTrue)
test.That(t, v.TimestampNanosec, test.ShouldEqual, timeNanoSec)

i.RemoveCallback(c)
RemoveCallback(basicInterrupt, c)

c = make(chan board.Tick, 2)
AddCallback(i.(*BasicDigitalInterrupt), c)
AddCallback(basicInterrupt, c)
go func() {
Tick(context.Background(), basicInterrupt, true, uint64(1))
Tick(context.Background(), basicInterrupt, true, uint64(4))
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestRemoveCallbackDigitalInterrupt(t *testing.T) {

c1 := make(chan board.Tick)
test.That(t, c1, test.ShouldNotBeNil)
AddCallback(i.(*BasicDigitalInterrupt), c1)
AddCallback(basicInterrupt, c1)
var wg sync.WaitGroup
wg.Add(1)
ret := false
Expand All @@ -113,11 +113,11 @@ func TestRemoveCallbackDigitalInterrupt(t *testing.T) {
wg.Wait()
c2 := make(chan board.Tick)
test.That(t, c2, test.ShouldNotBeNil)
AddCallback(i.(*BasicDigitalInterrupt), c2)
AddCallback(basicInterrupt, c2)
test.That(t, ret, test.ShouldBeTrue)

i.RemoveCallback(c1)
i.RemoveCallback(c1)
RemoveCallback(basicInterrupt, c1)
RemoveCallback(basicInterrupt, c1)

ret2 := false
result := make(chan bool, 1)
Expand Down
13 changes: 2 additions & 11 deletions components/board/pinwrappers/digital_interrupts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ func (i *BasicDigitalInterrupt) Value(ctx context.Context, extra map[string]inte
return count, nil
}

// Ticks is really just for testing.
func (i *BasicDigitalInterrupt) Ticks(ctx context.Context, num int, now uint64) error {
for x := 0; x < num; x++ {
if err := Tick(ctx, i, true, now+uint64(x)); err != nil {
return err
}
}
return nil
}

// Tick records an interrupt and notifies any interested callbacks. See comment on
// the DigitalInterrupt interface for caveats.
func Tick(ctx context.Context, i *BasicDigitalInterrupt, high bool, nanoseconds uint64) error {
Expand All @@ -84,9 +74,10 @@ func AddCallback(i *BasicDigitalInterrupt, c chan board.Tick) {
}

// RemoveCallback removes a listener for interrupts.
func (i *BasicDigitalInterrupt) RemoveCallback(c chan board.Tick) {
func RemoveCallback(i *BasicDigitalInterrupt, c chan board.Tick) {
i.mu.Lock()
defer i.mu.Unlock()

for id := range i.callbacks {
if i.callbacks[id] == c {
// To remove this item, we replace it with the last item in the list, then truncate the
Expand Down
30 changes: 16 additions & 14 deletions components/board/pinwrappers/digital_interrupts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestBasicDigitalInterrupt1(t *testing.T) {
i, err := CreateDigitalInterrupt(config)
test.That(t, err, test.ShouldBeNil)

di := i.(*BasicDigitalInterrupt)

intVal, err := i.Value(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)
test.That(t, intVal, test.ShouldEqual, int64(0))
Expand All @@ -36,28 +38,28 @@ func TestBasicDigitalInterrupt1(t *testing.T) {
test.That(t, intVal, test.ShouldEqual, int64(1))

c := make(chan board.Tick)
AddCallback(i.(*BasicDigitalInterrupt), c)
AddCallback(di, c)

timeNanoSec := nowNanosecondsTest()
go func() { Tick(context.Background(), i.(*BasicDigitalInterrupt), true, timeNanoSec) }()
go func() { Tick(context.Background(), di, true, timeNanoSec) }()
time.Sleep(1 * time.Microsecond)
v := <-c
test.That(t, v.High, test.ShouldBeTrue)
test.That(t, v.TimestampNanosec, test.ShouldEqual, timeNanoSec)

timeNanoSec = nowNanosecondsTest()
go func() { Tick(context.Background(), i.(*BasicDigitalInterrupt), true, timeNanoSec) }()
go func() { Tick(context.Background(), di, true, timeNanoSec) }()
v = <-c
test.That(t, v.High, test.ShouldBeTrue)
test.That(t, v.TimestampNanosec, test.ShouldEqual, timeNanoSec)

i.RemoveCallback(c)
RemoveCallback(di, c)

c = make(chan board.Tick, 2)
AddCallback(i.(*BasicDigitalInterrupt), c)
AddCallback(di, c)
go func() {
Tick(context.Background(), i.(*BasicDigitalInterrupt), true, uint64(1))
Tick(context.Background(), i.(*BasicDigitalInterrupt), true, uint64(4))
Tick(context.Background(), di, true, uint64(1))
Tick(context.Background(), di, true, uint64(4))
}()
v = <-c
v1 := <-c
Expand All @@ -72,17 +74,18 @@ func TestRemoveCallbackDigitalInterrupt(t *testing.T) {
}
i, err := CreateDigitalInterrupt(config)
test.That(t, err, test.ShouldBeNil)
di := i.(*BasicDigitalInterrupt)
intVal, err := i.Value(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)
test.That(t, intVal, test.ShouldEqual, int64(0))
test.That(t, Tick(context.Background(), i.(*BasicDigitalInterrupt), true, nowNanosecondsTest()), test.ShouldBeNil)
test.That(t, Tick(context.Background(), di, true, nowNanosecondsTest()), test.ShouldBeNil)
intVal, err = i.Value(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)
test.That(t, intVal, test.ShouldEqual, int64(1))

c1 := make(chan board.Tick)
test.That(t, c1, test.ShouldNotBeNil)
AddCallback(i.(*BasicDigitalInterrupt), c1)
AddCallback(di, c1)
var wg sync.WaitGroup
wg.Add(1)
ret := false
Expand All @@ -101,18 +104,17 @@ func TestRemoveCallbackDigitalInterrupt(t *testing.T) {
ret = tick.High
}
}()
test.That(t, Tick(context.Background(), i.(*BasicDigitalInterrupt), true, nowNanosecondsTest()), test.ShouldBeNil)
test.That(t, Tick(context.Background(), di, true, nowNanosecondsTest()), test.ShouldBeNil)
intVal, err = i.Value(context.Background(), nil)
test.That(t, err, test.ShouldBeNil)
test.That(t, intVal, test.ShouldEqual, int64(2))
wg.Wait()
c2 := make(chan board.Tick)
test.That(t, c2, test.ShouldNotBeNil)
AddCallback(i.(*BasicDigitalInterrupt), c2)
AddCallback(di, c2)
test.That(t, ret, test.ShouldBeTrue)

i.RemoveCallback(c1)
i.RemoveCallback(c1)
RemoveCallback(di, c1)

ret2 := false
result := make(chan bool, 1)
Expand All @@ -132,7 +134,7 @@ func TestRemoveCallbackDigitalInterrupt(t *testing.T) {
}()
wg.Add(1)
go func() {
err := Tick(context.Background(), i.(*BasicDigitalInterrupt), true, nowNanosecondsTest())
err := Tick(context.Background(), di, true, nowNanosecondsTest())
if err != nil {
result <- true
}
Expand Down
6 changes: 0 additions & 6 deletions components/board/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,6 @@ func (s *serviceServer) StreamTicks(
return err
}

defer func() {
for _, i := range interrupts {
i.RemoveCallback(ticksChan)
}
}()

// Send an empty response first so the client doesn't block while checking for errors.
err = server.Send(&pb.StreamTicksResponse{})
if err != nil {
Expand Down
15 changes: 0 additions & 15 deletions components/board/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,21 +825,6 @@ func TestStreamTicks(t *testing.T) {
}
return nil, nil
}
if tc.injectDigitalInterrupts != nil {
for _, i := range tc.injectDigitalInterrupts {
i.RemoveCallbackFunc = func(c chan board.Tick) {
for id := range callbacks {
if callbacks[id] == c {
// To remove this item, we replace it with the last item in the list, then truncate the
// list by 1.
callbacks[id] = callbacks[len(callbacks)-1]
callbacks = callbacks[:len(callbacks)-1]
break
}
}
}
}
}

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
3 changes: 0 additions & 3 deletions components/encoder/incremental/incremental_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ func (e *Encoder) Start(ctx context.Context, b board.Board) {
e.activeBackgroundWorkers.Add(1)

utils.ManagedGo(func() {
// Remove the callbacks added by the interrupt stream.
defer e.A.RemoveCallback(ch)
defer e.B.RemoveCallback(ch)
for {
// This looks redundant with the other select statement below, but it's not: if we're
// supposed to return, we need to do that even if chanA and chanB are full of data, and
Expand Down
6 changes: 0 additions & 6 deletions components/encoder/incremental/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,6 @@ func MakeBoard(t *testing.T) board.Board {
i2.ValueFunc = func(ctx context.Context, extra map[string]interface{}) (int64, error) {
return 0, nil
}
i1.RemoveCallbackFunc = func(c chan board.Tick) {
delete(callbacks, i1)
}
i2.RemoveCallbackFunc = func(c chan board.Tick) {
delete(callbacks, i2)
}

b.DigitalInterruptByNameFunc = func(name string) (board.DigitalInterrupt, error) {
if name == "11" {
Expand Down
2 changes: 0 additions & 2 deletions components/encoder/single/single_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ func (e *Encoder) Start(ctx context.Context, b board.Board) {
e.activeBackgroundWorkers.Add(1)

utils.ManagedGo(func() {
defer e.I.RemoveCallback(encoderChannel)

for {
select {
case <-e.cancelCtx.Done():
Expand Down
1 change: 0 additions & 1 deletion components/encoder/single/single_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func MakeBoard(t *testing.T, name, pinname string) board.Board {
ch <- board.Tick{Name: i.Name(), High: high, TimestampNanosec: nanoseconds}
return nil
}
i.RemoveCallbackFunc = func(c chan board.Tick) {}

b.DigitalInterruptByNameFunc = func(name string) (board.DigitalInterrupt, error) {
return i, nil
Expand Down
Loading

0 comments on commit 92e400a

Please sign in to comment.