Skip to content

Commit

Permalink
extract util functions
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Jun 9, 2022
1 parent f6d79d3 commit 1c1fbd2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 37 deletions.
16 changes: 1 addition & 15 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T) {

for _, skipRegex := range skipRegexs {
if skipRegex.MatchString(t.Name()) {
t.Skip(fmt.Sprintf("caller requested to skip tests that match the regex %q", skipRegex.String()))
t.Skipf("caller requested to skip tests that match the regex %q", skipRegex.String())
}
}
}
Expand Down Expand Up @@ -1129,17 +1129,3 @@ func (a acceptanceTest) isEqualData(is *is.I, want, got Data) {
is.Equal(want.Bytes(), got.Bytes()) // data did not match (want != got)
}
}

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
25 changes: 3 additions & 22 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *destinationPluginAdapter) Run(ctx context.Context, stream cpluginv1.Des
if err == io.EOF {
// stream is closed
// wait for all acks to be sent back to Conduit
return a.waitForAcks(ctx)
return waitOrDone(ctx, &a.wgAckFuncs)
}
return fmt.Errorf("write stream error: %w", err)
}
Expand Down Expand Up @@ -235,25 +235,6 @@ func (a *destinationPluginAdapter) ackFunc(r Record, stream cpluginv1.Destinatio
}
}

func (a *destinationPluginAdapter) waitForAcks(ctx context.Context) error {
// wait for all acks to be sent back to Conduit
ackFuncsDone := make(chan struct{})
go func() {
a.wgAckFuncs.Wait()
close(ackFuncsDone)
}()
return a.waitForClose(ctx, ackFuncsDone)
}

func (a *destinationPluginAdapter) waitForClose(ctx context.Context, stop chan struct{}) error {
select {
case <-stop:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.DestinationStopRequest) (cpluginv1.DestinationStopResponse, error) {
// last thing we do is cancel context in Open
defer a.openCancel()
Expand All @@ -263,7 +244,7 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
defer cancel()

// wait for all acks to be sent back to Conduit
waitErr := a.waitForAcks(waitCtx)
waitErr := waitOrDone(waitCtx, &a.wgAckFuncs)
if waitErr != nil {
// just log error and continue to flush at least the processed records
Logger(ctx).Warn().Err(waitErr).Msg("failed to wait for all acks to be sent back to Conduit")
Expand All @@ -286,7 +267,7 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
// everything went as expected, let's cancel the context in Open and
// wait for Run to stop gracefully
a.openCancel()
err = a.waitForClose(ctx, a.runDone)
err = waitForClose(ctx, a.runDone)
return cpluginv1.DestinationStopResponse{}, err
}

Expand Down
52 changes: 52 additions & 0 deletions wait_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sdk

import (
"context"
"sync"
"time"
)

// waitTimeout returns true if the given WaitGroup's counter is zero
// before the given timeout is reached. Returns false otherwise.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
withTimeout, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return waitOrDone(withTimeout, wg) == nil
}

// waitTimeout returns nil if the given WaitGroup's counter is zero
// before the given context is done. Returns the context's Err() otherwise.
func waitOrDone(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return waitForClose(ctx, done)
}

// waitForClose waits until the given channel receives a struct or until the given context is done.
// If the channel receives a struct before the context is done, nil is returned.
// Returns context's Err() otherwise.
func waitForClose(ctx context.Context, stop chan struct{}) error {
select {
case <-stop:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit 1c1fbd2

Please sign in to comment.