Skip to content

Commit

Permalink
move Watchable behind a go1.19 build tag because it uses atomic.Pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
bradenaw committed Nov 25, 2023
1 parent c413889 commit 239d32f
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 92 deletions.
31 changes: 31 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,37 @@ func (s *flattenStream[T]) Close() {
s.inner.Close()
}

func FlattenSlices[T any](s Stream[[]T]) Stream[T] {
return &flattenSlicesStream[T]{
inner: s,
}
}

type flattenSlicesStream[T any] struct {
inner Stream[[]T]
buffer []T
}

func (s *flattenSlicesStream[T]) Next(ctx context.Context) (T, error) {
var zero T
for {
if len(s.buffer) > 0 {
item := s.buffer[0]
s.buffer[0] = zero
s.buffer = s.buffer[1:]
return item, nil
}

var err error
s.buffer, err = s.inner.Next(ctx)
if err != nil {
return zero, err
}
}
}

func (s *flattenSlicesStream[T]) Close() { s.inner.Close() }

// Join returns a Stream that yields all elements from streams[0], then all elements from
// streams[1], and so on.
func Join[T any](streams ...Stream[T]) Stream[T] {
Expand Down
64 changes: 0 additions & 64 deletions xsync/xsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -321,66 +320,3 @@ func (f *Future[T]) WaitContext(ctx context.Context) (T, error) {
}
return f.x, nil
}

// Watchable contains a value. It is similar to an atomic.Pointer[T] but allows notifying callers
// that a new value has been set.
type Watchable[T any] struct {
p atomic.Pointer[watchableInner[T]]
}

type watchableInner[T any] struct {
t T
c chan struct{}
}

// Set sets the value in w and notifies callers of Value() that there is a new value.
func (w *Watchable[T]) Set(t T) {
newInner := &watchableInner[T]{
t: t,
c: make(chan struct{}),
}
oldInner := w.p.Swap(newInner)
if oldInner != nil {
close(oldInner.c)
}
}

// Value returns the current value inside w and a channel that will be closed when w is Set() to a
// newer value than the returned one.
//
// If called before the first Set(), returns the zero value of T.
//
// Normal usage has an observer waiting for new values in a loop:
//
// for {
// v, changed := w.Value()
//
// // do something with v
//
// <-changed
// }
//
// Note that the value in w may have changed multiple times between successive calls to Value(),
// Value() only ever returns the last-set value. This is by design so that slow observers cannot
// block Set(), unlike sending values on a channel.
func (w *Watchable[T]) Value() (T, chan struct{}) {
inner := w.p.Load()
if inner == nil {
// There's no inner, meaning w has not been Set() yet. Try filling it with an empty inner,
// so that we have a channel to listen on.
c := make(chan struct{})
emptyInner := &watchableInner[T]{
c: c,
}
// CompareAndSwap so we don't accidentally smash a real value that got put between our Load
// and here.
if w.p.CompareAndSwap(nil, emptyInner) {
var zero T
return zero, c
}
// If we fell through to here somebody Set() while we were trying to do this, so there's
// definitely an inner now.
inner = w.p.Load()
}
return inner.t, inner.c
}
70 changes: 70 additions & 0 deletions xsync/xsync_go1.19.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//go:build go1.19

package xsync

import (
"sync/atomic"
)

// Watchable contains a value. It is similar to an atomic.Pointer[T] but allows notifying callers
// that a new value has been set.
type Watchable[T any] struct {
p atomic.Pointer[watchableInner[T]]
}

type watchableInner[T any] struct {
t T
c chan struct{}
}

// Set sets the value in w and notifies callers of Value() that there is a new value.
func (w *Watchable[T]) Set(t T) {
newInner := &watchableInner[T]{
t: t,
c: make(chan struct{}),
}
oldInner := w.p.Swap(newInner)
if oldInner != nil {
close(oldInner.c)
}
}

// Value returns the current value inside w and a channel that will be closed when w is Set() to a
// newer value than the returned one.
//
// If called before the first Set(), returns the zero value of T.
//
// Normal usage has an observer waiting for new values in a loop:
//
// for {
// v, changed := w.Value()
//
// // do something with v
//
// <-changed
// }
//
// Note that the value in w may have changed multiple times between successive calls to Value(),
// Value() only ever returns the last-set value. This is by design so that slow observers cannot
// block Set(), unlike sending values on a channel.
func (w *Watchable[T]) Value() (T, chan struct{}) {
inner := w.p.Load()
if inner == nil {
// There's no inner, meaning w has not been Set() yet. Try filling it with an empty inner,
// so that we have a channel to listen on.
c := make(chan struct{})
emptyInner := &watchableInner[T]{
c: c,
}
// CompareAndSwap so we don't accidentally smash a real value that got put between our Load
// and here.
if w.p.CompareAndSwap(nil, emptyInner) {
var zero T
return zero, c
}
// If we fell through to here somebody Set() while we were trying to do this, so there's
// definitely an inner now.
inner = w.p.Load()
}
return inner.t, inner.c
}
36 changes: 36 additions & 0 deletions xsync/xsync_go1.19_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//go:build go1.19

package xsync

import (
"fmt"
"time"
)

func ExampleWatchable() {
start := time.Now()

var w Watchable[int]
w.Set(0)
go func() {
for i := 1; i < 20; i++ {
w.Set(i)
fmt.Printf("set %d at %s\n", i, time.Since(start).Round(time.Millisecond))
time.Sleep(5 * time.Millisecond)
}
}()

for {
v, changed := w.Value()
if v == 19 {
return
}

fmt.Printf("observed %d at %s\n", v, time.Since(start).Round(time.Millisecond))

// Sleep for longer between iterations to show that we don't slow down the setter.
time.Sleep(17 * time.Millisecond)

<-changed
}
}
28 changes: 0 additions & 28 deletions xsync/xsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,31 +97,3 @@ func TestGroup(t *testing.T) {
// Jank, but just in case we'd be safe from the above panic just because the test is over.
time.Sleep(200 * time.Millisecond)
}

func ExampleWatchable() {
start := time.Now()

var w Watchable[int]
w.Set(0)
go func() {
for i := 1; i < 20; i++ {
w.Set(i)
fmt.Printf("set %d at %s\n", i, time.Since(start).Round(time.Millisecond))
time.Sleep(5 * time.Millisecond)
}
}()

for {
v, changed := w.Value()
if v == 19 {
return
}

fmt.Printf("observed %d at %s\n", v, time.Since(start).Round(time.Millisecond))

// Sleep for longer between iterations to show that we don't slow down the setter.
time.Sleep(17 * time.Millisecond)

<-changed
}
}

0 comments on commit 239d32f

Please sign in to comment.