Skip to content

Commit

Permalink
refactor opencdc record, include serializer, rename ctxutil to cconte…
Browse files Browse the repository at this point in the history
…xt, include csync functions from connector-sdk
  • Loading branch information
lovromazgon committed Nov 30, 2023
1 parent 1c5139d commit 469a7c0
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 207 deletions.
8 changes: 4 additions & 4 deletions ctxutil/context.go → ccontext/detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ctxutil
package ccontext

import (
"context"
Expand Down Expand Up @@ -49,9 +49,9 @@ import (
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// DetachContext returns a context that keeps all the values of its parent
// context but detaches from the cancellation and error handling.
func DetachContext(ctx context.Context) context.Context { return detachedContext{ctx} }
// Detach returns a context that keeps all the values of its parent context but
// detaches from the cancellation and error handling.
func Detach(ctx context.Context) context.Context { return detachedContext{ctx} }

type detachedContext struct{ context.Context } //nolint:containedctx // we are wrapping a context

Expand Down
4 changes: 2 additions & 2 deletions ctxutil/context_test.go → ccontext/detach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ctxutil
package ccontext

import (
"context"
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestDetachContext(t *testing.T) {
defer cancel()

// detach context and assert it is detached
detachedCtx := DetachContext(ctx)
detachedCtx := Detach(ctx)
assertDetachedContext(detachedCtx)

// cancel parent context and assert again
Expand Down
47 changes: 47 additions & 0 deletions csync/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package csync

import (
"context"
"time"

"github.com/conduitio/conduit-commons/cchan"
)

// Run executes fn in a goroutine and waits for it to return. If the context
// gets canceled before that happens the method returns the context error.
//
// This is useful for executing long-running functions like sync.WaitGroup.Wait
// that don't take a context and can potentially block the execution forever.
func Run(ctx context.Context, fn func()) error {
done := make(chan struct{})
go func() {
defer close(done)
fn()
}()

_, _, err := cchan.ChanOut[struct{}](done).Recv(ctx)
return err //nolint:wrapcheck // errors from cchan are context errors and we don't wrap them
}

// RunTimeout executes fn in a goroutine and waits for it to return. If the
// context gets canceled before that happens or the timeout is reached the
// method returns the context error.
func RunTimeout(ctx context.Context, fn func(), timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return Run(ctx, fn)
}
55 changes: 55 additions & 0 deletions csync/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package csync

import (
"context"
"testing"
"time"

"github.com/matryer/is"
)

func TestRun_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()

var executed bool
err := Run(ctx, func() { executed = true })
is.NoErr(err)
is.True(executed)
}

func TestRun_Canceled(t *testing.T) {
is := is.New(t)
ctx, cancel := context.WithCancel(context.Background())
cancel()

// run function that blocks for 1 second
err := Run(ctx, func() { <-time.After(time.Second) })
is.Equal(err, context.Canceled)
}

func TestRun_DeadlineReached(t *testing.T) {
is := is.New(t)
ctx := context.Background()

start := time.Now()
err := RunTimeout(ctx, func() { <-time.After(time.Second) }, time.Millisecond*100)
since := time.Since(start)

is.Equal(err, context.DeadlineExceeded)
is.True(since >= time.Millisecond*100)
}
100 changes: 76 additions & 24 deletions csync/valuewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/conduitio/conduit-commons/cchan"
"github.com/google/uuid"
)

Expand All @@ -32,11 +33,11 @@ type ValueWatcher[T any] struct {
listeners map[string]chan T
}

type ValueWatcherFunc[T any] func(val T) bool
type ValueWatcherCompareFunc[T any] func(val T) bool

// WatchValues is a utility function for creating a simple ValueWatcherFunc that
// WatchValues is a utility function for creating a simple ValueWatcherCompareFunc that
// waits for one of the supplied values.
func WatchValues[T comparable](want ...T) ValueWatcherFunc[T] {
func WatchValues[T comparable](want ...T) ValueWatcherCompareFunc[T] {
if len(want) == 0 {
// this would block forever, prevent misuse
panic("invalid use of WatchValues, need to supply at least one value")
Expand All @@ -58,16 +59,28 @@ func WatchValues[T comparable](want ...T) ValueWatcherFunc[T] {
}
}

// Set stores val in ValueWatcher and notifies all goroutines that called Watch
// about the new value, if such goroutines exists.
func (vw *ValueWatcher[T]) Set(val T) {
// CompareAndSwap allows the caller to peek at the currently stored value in
// ValueWatcher and decide if it should be swapped for a new val, all while
// holding the lock. If the value was swapped, it notifies all goroutines
// that called Watch about the new value and returns true. Otherwise it returns
// false.
func (vw *ValueWatcher[T]) CompareAndSwap(val T, shouldSwap func(T) bool) bool {
vw.m.Lock()
defer vw.m.Unlock()

vw.val = val
for _, l := range vw.listeners {
l <- val
if !shouldSwap(vw.val) {
return true
}

vw.val = val
vw.notify(val)
return false
}

// Set stores val in ValueWatcher and notifies all goroutines that called Watch
// about the new value, if such goroutines exists.
func (vw *ValueWatcher[T]) Set(val T) {
vw.CompareAndSwap(val, func(T) bool { return true })
}

// Get returns the current value stored in ValueWatcher.
Expand All @@ -79,34 +92,36 @@ func (vw *ValueWatcher[T]) Get() T {
}

// Watch blocks and calls f for every value that is put into the ValueWatcher.
// Once f returns true it stops blocking and returns nil. First call to f will
// Once f returns true it stops blocking and returns the value. First call to f will
// be with the current value stored in ValueWatcher. Note that if no value was
// stored in ValueWatcher yet, the zero value of type T will be passed to f.
// If the context gets canceled before the value is found, the function returns
// the last seen value and the context error.
//
// Watch can be safely called by multiple goroutines. If the context gets
// cancelled before f returns true, the function will return the context error.
func (vw *ValueWatcher[T]) Watch(ctx context.Context, f ValueWatcherFunc[T]) (T, error) {
val, found, listener, unsubscribe := vw.findOrSubscribe(f)
func (vw *ValueWatcher[T]) Watch(ctx context.Context, f ValueWatcherCompareFunc[T]) (T, error) {
lastVal, found, listener, unsubscribe := vw.findOrSubscribe(f)
if found {
return val, nil
return lastVal, nil
}
defer unsubscribe()

// val was not found yet, we need to keep watching
clistener := cchan.ChanOut[T](listener)
for {
select {
case <-ctx.Done():
var empty T
return empty, ctx.Err()
case val = <-listener:
if f(val) {
return val, nil
}
val, _, err := clistener.Recv(ctx)
if err != nil {
return lastVal, ctx.Err()
}
if f(val) {
return val, nil
}
lastVal = val
}
}

func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherFunc[T]) (T, bool, chan T, func()) {
func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherCompareFunc[T]) (T, bool, chan T, func()) {
vw.m.Lock()
defer vw.m.Unlock()

Expand All @@ -116,8 +131,7 @@ func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherFunc[T]) (T, bool, chan
}

listener, unsubscribe := vw.subscribe()
var empty T
return empty, false, listener, unsubscribe
return vw.val, false, listener, unsubscribe
}

// subscribe creates a channel that will receive changes and returns it
Expand Down Expand Up @@ -154,3 +168,41 @@ func (vw *ValueWatcher[T]) unsubscribe(id string, c chan T) {
close(c)
delete(vw.listeners, id)
}

func (vw *ValueWatcher[T]) notify(val T) {
for _, l := range vw.listeners {
l <- val
}
}

// Lock locks the ValueWatcher and returns an instance of LockedValueWatcher
// that allows one to set and get the value while holding the lock. After the
// caller executed the operations and doesn't need the lock anymore it should
// call Unlock on the LockedValueWatcher before discarding it.
func (vw *ValueWatcher[T]) Lock() *LockedValueWatcher[T] {
vw.m.Lock()
return &LockedValueWatcher[T]{vw: vw}
}

type LockedValueWatcher[T any] struct {
vw *ValueWatcher[T]
}

// Unlock unlocks the ValueWatcher. After this the LockedValueWatcher should be
// discarded.
func (lvw *LockedValueWatcher[T]) Unlock() *ValueWatcher[T] {
lvw.vw.m.Unlock()
return lvw.vw
}

// Set stores val in ValueWatcher and notifies all goroutines that called Watch
// about the new value, if such goroutines exists.
func (lvw *LockedValueWatcher[T]) Set(val T) {
lvw.vw.val = val
lvw.vw.notify(val)
}

// Get returns the current value stored in ValueWatcher.
func (lvw *LockedValueWatcher[T]) Get() T {
return lvw.vw.val
}
Loading

0 comments on commit 469a7c0

Please sign in to comment.