Skip to content

Commit

Permalink
Merge pull request #345 from lovoo/visitor-tool-comments
Browse files Browse the repository at this point in the history
fixed comments from PR #341, added unit tests for action
  • Loading branch information
frairon authored Sep 13, 2021
2 parents 15746b5 + 470f61d commit 2b16f64
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 20 deletions.
6 changes: 3 additions & 3 deletions examples/10-visit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-migration-clicks-input"
group goka.Group = "example-migration-group"
topic goka.Stream = "example-visit-clicks-input"
group goka.Group = "example-visit-group"

tmc *goka.TopicManagerConfig
)
Expand Down Expand Up @@ -61,7 +61,7 @@ func main() {
}
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
log.Fatalf("Error creating kafka topic %s: %v", topic, err)
}

sigs := make(chan os.Signal)
Expand Down
2 changes: 1 addition & 1 deletion mockautoconsumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (cg *MockConsumerGroup) markMessage(topic string, partition int32, offset i
cnt := cg.messages[offset-1]

if cnt == 0 {
panic(fmt.Errorf("Cannot mark message with offest %d, it's not a valid offset or was already marked", offset))
panic(fmt.Errorf("Cannot mark message with offset %d, it's not a valid offset or was already marked", offset))
}

cg.messages[offset] = cnt - 1
Expand Down
6 changes: 3 additions & 3 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (pp *PartitionProcessor) run(ctx context.Context) (rerr error) {
}
err := pp.processVisit(ctx, &wg, visit, syncFailer, asyncFailer)
if err != nil {
return fmt.Errorf("Error migrating %s for %s: %v", visit.name, visit.key, err)
return fmt.Errorf("Error visiting %s for %s: %v", visit.name, visit.key, err)
}
case <-asyncErrs:
pp.log.Debugf("Errors occurred asynchronously. Will exit partition processor")
Expand Down Expand Up @@ -548,9 +548,9 @@ func (pp *PartitionProcessor) enqueueTrackOutputStats(ctx context.Context, topic

func (pp *PartitionProcessor) processVisit(ctx context.Context, wg *sync.WaitGroup, v *visit, syncFailer func(err error), asyncFailer func(err error)) error {
cb, ok := pp.visitCallbacks[v.name]
// no callback registered for migration
// no callback registered for visit
if !ok {
return fmt.Errorf("no callback registered for migration")
return fmt.Errorf("no callback registered for visit named '%s'", v.name)
}

msgContext := &cbContext{
Expand Down
23 changes: 17 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,8 @@ func (g *Processor) Setup(session sarama.ConsumerGroupSession) error {
for partition := range assignment {
// create partition processor for our partition
pproc, err := g.createPartitionProcessor(session.Context(), partition, runModeActive,
func(msg *message, metadata string) {
// We have to commit the offset that we want to read next, not the one that we just have
// processed, therefore msg.offset+1 is required to get the next message.
// This has the same semantics as sarama's implementation of session.MarkMessage (which calls MarkOffset with offset+1)
session.MarkOffset(msg.topic, msg.partition, msg.offset+1, metadata)
})
createMessageCommitter(session),
)
if err != nil {
return fmt.Errorf("Error creating partition processor for %s/%d: %v", g.Graph().Group(), partition, err)
}
Expand Down Expand Up @@ -974,3 +970,18 @@ func ensureCopartitioned(tm TopicManager, topics []string) (int, error) {
}
return npar, nil
}

// createMessageCommitter returns a commitCallback that allows to commit a message into the passed
// sarama.ConsumerGroupSession.
//
// Note that the offset to be committed must be the offset that the consumer expects to consume next, not the offset of the message.
// See documentation at https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html which says:
//
// Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed.
//
// This has the same semantics as sarama's implementation of session.MarkMessage (which calls MarkOffset with offset+1)
func createMessageCommitter(session sarama.ConsumerGroupSession) commitCallback {
return func(msg *message, metadata string) {
session.MarkOffset(msg.topic, msg.partition, msg.offset+1, metadata)
}
}
44 changes: 37 additions & 7 deletions web/actions/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type action struct {
name string
actor Actor
cancel context.CancelFunc
done chan struct{}
started time.Time
finished time.Time
runErr error
Expand All @@ -21,16 +22,28 @@ func (a *action) Name() string {
return a.name
}

// IsRunning returns whether the action is currently running.
func (a *action) IsRunning() bool {
a.RLock()
defer a.RUnlock()
return !a.started.IsZero() && a.finished.IsZero()

// if there's a "done"-channel and it's not closed, the action is running
if a.done != nil {
select {
case <-a.done:
return false
default:
return true
}
}
return false
}

func (a *action) Description() string {
return a.actor.Description()
}

// StartTime returns a rfc3339 format of the start time or "not started" if it was not started yet
func (a *action) StartTime() string {
a.RLock()
defer a.RUnlock()
Expand All @@ -39,6 +52,8 @@ func (a *action) StartTime() string {
}
return a.started.Format(time.RFC3339)
}

// FinishedTime returns a rfc3339 format of the finish time or "not finished" if it was not finished yet.
func (a *action) FinishedTime() string {
a.RLock()
defer a.RUnlock()
Expand All @@ -48,6 +63,7 @@ func (a *action) FinishedTime() string {
return a.finished.Format(time.RFC3339)
}

// Error returns the error of the last invocation
func (a *action) Error() error {

a.RLock()
Expand All @@ -56,30 +72,44 @@ func (a *action) Error() error {
}

// Start starts the action in a separate goroutine.
// Note that stopping the action won't wait for it to be finished/shutdown,
// so starting it a second time might lead to overwriting the finish time if the first run
// stops and writes "finish" again
// If Start is called while the action is running, `Stop` will be called first.
func (a *action) Start(value string) {
// stop the action (noop if it's not running)
a.Stop()

a.Lock()
defer a.Unlock()

a.started = time.Now()
a.finished = time.Time{}
ctx, cancel := context.WithCancel(context.Background())
a.ctx, a.cancel = ctx, cancel
done := make(chan struct{})
a.done = done
go func() {
defer cancel()
err := a.actor.RunAction(ctx, value)
defer close(done)

a.Lock()
defer a.Unlock()
a.finished = time.Now()
a.runErr = err
a.Unlock()

}()
}

// Stop stops the action. It waits for the action to be completed
func (a *action) Stop() {
a.Lock()
defer a.Unlock()
a.RLock()
done := a.done
if a.cancel != nil {
a.cancel()
}
a.RUnlock()

// if there was something to wait on, let's wait.
if done != nil {
<-done
}
}
117 changes: 117 additions & 0 deletions web/actions/action_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package actions

import (
"context"
"log"
"testing"
"time"

"github.com/lovoo/goka/internal/test"
)

var (
actionRuntime = 10 * time.Millisecond
)

func TestActionStart(t *testing.T) {

var run int64
actor := FuncActor("sleep", func(ctx context.Context, value string) error {
select {
case <-ctx.Done():
log.Printf("ctx done")
return ctx.Err()
case <-time.After(actionRuntime):
run++
}
return nil
})

t.Run("no-run", func(t *testing.T) {
a := &action{
name: "test",
actor: actor,
}

test.AssertEqual(t, a.Name(), "test")
test.AssertEqual(t, a.Description(), "sleep")
test.AssertTrue(t, a.Error() == nil)
test.AssertFalse(t, a.IsRunning())
test.AssertEqual(t, a.StartTime(), "not started")
test.AssertEqual(t, a.FinishedTime(), "not finished")
})

t.Run("stop-only", func(t *testing.T) {
a := &action{
name: "test",
actor: actor,
}

a.Stop()
})

t.Run("start-stop", func(t *testing.T) {
run = 0
a := &action{
name: "test",
actor: actor,
}

// start and check it's running
a.Start("")
test.AssertTrue(t, a.IsRunning())
test.AssertNotEqual(t, a.StartTime(), "not started")
test.AssertEqual(t, a.FinishedTime(), "not finished")

a.Stop()
test.AssertFalse(t, a.IsRunning())
test.AssertNotEqual(t, a.StartTime(), "not started")
// it's finished
test.AssertNotEqual(t, a.FinishedTime(), "not finished")
test.AssertTrue(t, a.Error() != nil)
test.AssertEqual(t, a.Error().Error(), context.Canceled.Error())
test.AssertEqual(t, run, int64(0))
})

t.Run("start-finish", func(t *testing.T) {
run = 0
a := &action{
name: "test",
actor: actor,
}

// start and check it's running
a.Start("")
time.Sleep(actionRuntime * 2)
test.AssertFalse(t, a.IsRunning())
test.AssertNotEqual(t, a.StartTime(), "not started")
test.AssertNotEqual(t, a.FinishedTime(), "not finished")
test.AssertTrue(t, a.Error() == nil)
test.AssertEqual(t, run, int64(1))
})

t.Run("start-restart-finish", func(t *testing.T) {
run = 0
a := &action{
name: "test",
actor: actor,
}

// start + stop immediately
a.Start("")
a.Stop()

// start and keep it running
a.Start("")
time.Sleep(actionRuntime * 2)

a.Start("")
time.Sleep(actionRuntime * 2)
test.AssertFalse(t, a.IsRunning())
test.AssertNotEqual(t, a.StartTime(), "not started")
test.AssertNotEqual(t, a.FinishedTime(), "not finished")
test.AssertTrue(t, a.Error() == nil)
test.AssertEqual(t, run, int64(2))
})

}

0 comments on commit 2b16f64

Please sign in to comment.