Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow: Rename Record to Run #15

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ type MyType struct {
func Workflow() *workflow.Workflow[MyType, Step] {
b := workflow.NewBuilder[MyType, Step]("my workflow name")

b.AddStep(StepOne, func(ctx context.Context, r *workflow.Record[MyType, Step]) (Step, error) {
b.AddStep(StepOne, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field = "Hello,"
return StepTwo, nil
}, StepTwo)

b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Record[MyType, Step]) (Step, error) {
b.AddStep(StepTwo, func(ctx context.Context, r *workflow.Run[MyType, Step]) (Step, error) {
r.Object.Field += " world!"
return StepThree, nil
}, StepThree)
Expand Down Expand Up @@ -181,8 +181,6 @@ title: Diagram the run states of a workflow
stateDiagram-v2
direction LR

[*]-->Initiated

Initiated-->Running

Running-->Completed
Expand All @@ -199,7 +197,6 @@ stateDiagram-v2

DataDeleted-->RequestedDataDeleted
RequestedDataDeleted-->DataDeleted
DataDeleted-->[*]
}
```

Expand Down
2 changes: 1 addition & 1 deletion adapters/adaptertest/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func RunConnectorTest(t *testing.T, maker func(seedEvents []workflow.ConnectorEv
)

builder.AddStep(SyncStatusStarted,
func(ctx context.Context, r *workflow.Record[User, SyncStatus]) (SyncStatus, error) {
func(ctx context.Context, r *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
r.Object.Email = "[email protected]"
return SyncStatusEmailSet, nil
},
Expand Down
16 changes: 8 additions & 8 deletions adapters/adaptertest/eventstreaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ func RunEventStreamerTest(t *testing.T, constructor workflow.EventStreamer) {
require.NotEmpty(t, record.Object.UID)
}

func setEmail() func(ctx context.Context, t *workflow.Record[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Record[User, SyncStatus]) (SyncStatus, error) {
func setEmail() func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
t.Object.Email = "[email protected]"
return SyncStatusEmailSet, nil
}
}

func coolDownTimerFunc() func(ctx context.Context, r *workflow.Record[User, SyncStatus], now time.Time) (time.Time, error) {
return func(ctx context.Context, r *workflow.Record[User, SyncStatus], now time.Time) (time.Time, error) {
func coolDownTimerFunc() func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (time.Time, error) {
return func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (time.Time, error) {
// Place a 1-hour cool down period for Great Britain users
if r.Object.CountryCode == "GB" {
return now.Add(time.Hour), nil
Expand All @@ -124,8 +124,8 @@ func coolDownTimerFunc() func(ctx context.Context, r *workflow.Record[User, Sync
}
}

func coolDownTimeout() func(ctx context.Context, r *workflow.Record[User, SyncStatus], now time.Time) (SyncStatus, error) {
return func(ctx context.Context, r *workflow.Record[User, SyncStatus], now time.Time) (SyncStatus, error) {
func coolDownTimeout() func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (SyncStatus, error) {
return func(ctx context.Context, r *workflow.Run[User, SyncStatus], now time.Time) (SyncStatus, error) {
if r.Object.Email == "[email protected]" {
return SyncStatusRegulationTimeout, nil
}
Expand All @@ -134,8 +134,8 @@ func coolDownTimeout() func(ctx context.Context, r *workflow.Record[User, SyncSt
}
}

func generateUserID() func(ctx context.Context, t *workflow.Record[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Record[User, SyncStatus]) (SyncStatus, error) {
func generateUserID() func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
return func(ctx context.Context, t *workflow.Run[User, SyncStatus]) (SyncStatus, error) {
t.Object.UID = uuid.New().String()
return SyncStatusCompleted, nil
}
Expand Down
2 changes: 1 addition & 1 deletion adapters/adaptertest/recordstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func testDeleteOutboxEvent(t *testing.T, factory func() workflow.RecordStore) {
expected := dummyWireRecord(t)

maker := func(recordID int64) (workflow.OutboxEventData, error) {
// Record ID would not have been set if it is a new record. Assign the recordID that the Store provides
// Run ID would not have been set if it is a new record. Assign the recordID that the Store provides
expected.ID = recordID
return workflow.WireRecordToOutboxEventData(*expected, workflow.RunStateInitiated)
}
Expand Down
8 changes: 4 additions & 4 deletions adapters/reflexstreamer/reflex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/luno/reflex"
"github.com/luno/reflex/rpatterns"
"github.com/luno/reflex/rsql"
"github.com/stretchr/testify/require"

"github.com/luno/workflow"
"github.com/luno/workflow/adapters/adaptertest"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
"github.com/luno/workflow/adapters/memtimeoutstore"
"github.com/stretchr/testify/require"

andrewwormald marked this conversation as resolved.
Show resolved Hide resolved
"github.com/luno/workflow/adapters/reflexstreamer"
)

Expand All @@ -35,15 +35,15 @@ func TestStreamFunc(t *testing.T) {
b := workflow.NewBuilder[string, status](workflowName)
b.AddStep(
statusStart,
func(ctx context.Context, r *workflow.Record[string, status]) (status, error) {
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
*r.Object = "Started and "
return statusMiddle, nil
},
statusMiddle,
)
b.AddStep(
statusMiddle,
func(ctx context.Context, r *workflow.Record[string, status]) (status, error) {
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
*r.Object += "Completed in a Workflow"
return statusEnd, nil
},
Expand Down
6 changes: 3 additions & 3 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/luno/jettison/errors"
)

func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Record[Type, Status], error) {
func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) {
var opt awaitOpts
for _, option := range opts {
option(&opt)
Expand All @@ -23,7 +23,7 @@ func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID str
return awaitWorkflowStatusByForeignID[Type, Status](ctx, w, status, foreignID, runID, role, pollFrequency)
}

func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Record[Type, Status], error) {
func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Run[Type, Status], error) {
topic := Topic(w.Name, int(status))
stream, err := w.eventStreamer.NewConsumer(
ctx,
Expand Down Expand Up @@ -77,7 +77,7 @@ func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Con
return nil, err
}

return &Record[Type, Status]{
return &Run[Type, Status]{
WireRecord: *r,
Status: Status(r.Status),
Object: &t,
Expand Down
2 changes: 1 addition & 1 deletion await_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAwait(t *testing.T) {
b := workflow.NewBuilder[string, status]("consumer lag")
b.AddStep(
StatusStart,
func(ctx context.Context, r *workflow.Record[string, status]) (status, error) {
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
*r.Object = "hello world"
return StatusEnd, nil
},
Expand Down
4 changes: 2 additions & 2 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ func (b *Builder[Type, Status]) determineEndPoints(graph map[int][]int) map[Stat
}

func DurationTimerFunc[Type any, Status StatusType](duration time.Duration) TimerFunc[Type, Status] {
return func(ctx context.Context, r *Record[Type, Status], now time.Time) (time.Time, error) {
return func(ctx context.Context, r *Run[Type, Status], now time.Time) (time.Time, error) {
return now.Add(duration), nil
}
}

func TimeTimerFunc[Type any, Status StatusType](t time.Time) TimerFunc[Type, Status] {
return func(ctx context.Context, r *Record[Type, Status], now time.Time) (time.Time, error) {
return func(ctx context.Context, r *Run[Type, Status], now time.Time) (time.Time, error) {
return t, nil
}
}
18 changes: 9 additions & 9 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestWithClock(t *testing.T) {
}

func TestAddingCallbacks(t *testing.T) {
var exampleFn CallbackFunc[string, testStatus] = func(ctx context.Context, s *Record[string, testStatus], r io.Reader) (testStatus, error) {
var exampleFn CallbackFunc[string, testStatus] = func(ctx context.Context, s *Run[string, testStatus], r io.Reader) (testStatus, error) {
return statusEnd, nil
}

Expand All @@ -124,7 +124,7 @@ func TestWithTimeoutErrBackOff(t *testing.T) {
b.AddTimeout(
statusStart,
DurationTimerFunc[string, testStatus](time.Hour),
func(ctx context.Context, t *Record[string, testStatus], now time.Time) (testStatus, error) {
func(ctx context.Context, t *Run[string, testStatus], now time.Time) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -140,7 +140,7 @@ func TestWithTimeoutPollingFrequency(t *testing.T) {
b.AddTimeout(
statusStart,
DurationTimerFunc[string, testStatus](time.Hour),
func(ctx context.Context, t *Record[string, testStatus], now time.Time) (testStatus, error) {
func(ctx context.Context, t *Run[string, testStatus], now time.Time) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestWithStepLagAlert(t *testing.T) {
b := NewBuilder[string, testStatus]("consumer lag alert")
b.AddStep(
statusStart,
func(ctx context.Context, r *Record[string, testStatus]) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus]) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -261,7 +261,7 @@ func TestAddStepSingleUseValidation(t *testing.T) {
b := NewBuilder[string, testStatus]("consumer lag")
b.AddStep(
statusStart,
func(ctx context.Context, r *Record[string, testStatus]) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus]) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -273,7 +273,7 @@ func TestAddStepSingleUseValidation(t *testing.T) {
func() {
b.AddStep(
statusStart,
func(ctx context.Context, r *Record[string, testStatus]) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus]) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -286,7 +286,7 @@ func TestConfigureTimeoutWithoutTimeoutStore(t *testing.T) {
b.AddTimeout(
statusStart,
DurationTimerFunc[string, testStatus](time.Hour),
func(ctx context.Context, r *Record[string, testStatus], now time.Time) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus], now time.Time) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -309,7 +309,7 @@ func TestWithStepConsumerLag(t *testing.T) {
b := NewBuilder[string, testStatus]("consumer lag")
b.AddStep(
statusStart,
func(ctx context.Context, r *Record[string, testStatus]) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus]) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand All @@ -325,7 +325,7 @@ func TestWithDefaultOptions(t *testing.T) {
b := NewBuilder[string, testStatus]("consumer lag")
b.AddStep(
statusStart,
func(ctx context.Context, r *Record[string, testStatus]) (testStatus, error) {
func(ctx context.Context, r *Run[string, testStatus]) (testStatus, error) {
return statusEnd, nil
},
statusEnd,
Expand Down
2 changes: 1 addition & 1 deletion callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type callback[Type any, Status StatusType] struct {
CallbackFunc CallbackFunc[Type, Status]
}

type CallbackFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], reader io.Reader) (Status, error)
type CallbackFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status], reader io.Reader) (Status, error)

func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error {
updateFn := newUpdater[Type, Status](w.recordStore.Lookup, w.recordStore.Store, w.statusGraph, w.clock)
Expand Down
14 changes: 7 additions & 7 deletions callback_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ func TestProcessCallback(t *testing.T) {
Object: b,
}

callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Record[string, testStatus], reader io.Reader) (testStatus, error) {
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
calls["callbackFunc"] += 1
*r.Object = "new data"
return statusEnd, nil
})

updater := func(ctx context.Context, current testStatus, next testStatus, record *Record[string, testStatus]) error {
updater := func(ctx context.Context, current testStatus, next testStatus, record *Run[string, testStatus]) error {
calls["updater"] += 1
require.Equal(t, "new data", *record.Object)
return nil
Expand Down Expand Up @@ -96,13 +96,13 @@ func TestProcessCallback(t *testing.T) {
"latestLookup": 0,
}

callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Record[string, testStatus], reader io.Reader) (testStatus, error) {
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
calls["callbackFunc"] += 1
*r.Object = "new data"
return statusEnd, nil
})

updater := func(ctx context.Context, current testStatus, next testStatus, record *Record[string, testStatus]) error {
updater := func(ctx context.Context, current testStatus, next testStatus, record *Run[string, testStatus]) error {
calls["updater"] += 1
require.Equal(t, "new data", *record.Object)
return nil
Expand Down Expand Up @@ -136,12 +136,12 @@ func TestProcessCallback(t *testing.T) {
"latestLookup": 0,
}

callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Record[string, testStatus], reader io.Reader) (testStatus, error) {
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
calls["callbackFunc"] += 1
return testStatus(SkipTypeDefault), nil
})

updater := func(ctx context.Context, current testStatus, next testStatus, record *Record[string, testStatus]) error {
updater := func(ctx context.Context, current testStatus, next testStatus, record *Run[string, testStatus]) error {
calls["updater"] += 1
return nil
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestProcessCallback(t *testing.T) {
return current, nil
}

callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Record[string, testStatus], reader io.Reader) (testStatus, error) {
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
return 0, errors.New("test error")
})

Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// the record will not be stored and the event will be skipped and move onto the next event. If a non-nil error is
// returned then the consumer will back off and try again until a nil error occurs or the retry max has been reached
// if a Dead Letter Queue has been configured for the workflow.
type ConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status]) (Status, error)
type ConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status]) (Status, error)

type consumerConfig[Type any, Status StatusType] struct {
pollingFrequency time.Duration
Expand Down Expand Up @@ -248,7 +248,7 @@ func consume[Type any, Status StatusType](
})
}

// Record paused - now clear the error counter.
// Run paused - now clear the error counter.
w.errorCounter.Clear(originalErr, processName, record.RunID)
return ack()
}
Expand Down
Loading
Loading