Skip to content

Commit

Permalink
workflow: Remove record ID and simplify record store (#62)
Browse files Browse the repository at this point in the history
* workflow: Remove record ID and simplify record store

* workflow: Remove record ID and simplify record store

* workflow: Remove record ID and simplify record store

* workflow: Remove record ID and simplify record store

* workflow: Remove record ID and simplify record store

* clean up
  • Loading branch information
andrewwormald authored Nov 1, 2024
1 parent cc11a65 commit 81ae418
Show file tree
Hide file tree
Showing 51 changed files with 299 additions and 403 deletions.
2 changes: 2 additions & 0 deletions _examples/webui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func main() {
http.HandleFunc(paths.ObjectData, webui.ObjectDataHandlerFunc(recordStore))
http.HandleFunc(paths.Update, webui.UpdateHandlerFunc(recordStore))

fmt.Println("Head on over to 'http://localhost:9492' to view!")

err := http.ListenAndServe("localhost:9492", nil)
if err != nil {
panic(err)
Expand Down
74 changes: 25 additions & 49 deletions adapters/adaptertest/recordstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -34,10 +35,9 @@ func testLatest(t *testing.T, factory func() workflow.RecordStore) {
t.Run("Latest", func(t *testing.T) {
store := factory()
ctx := context.Background()
expected := dummyWireRecordWithID(t, "my_workflow", 1)
maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil }
expected := dummyWireRecord(t, "my_workflow")

err := store.Store(ctx, expected, maker)
err := store.Store(ctx, expected)
require.Nil(t, err)

latest, err := store.Latest(ctx, expected.WorkflowName, expected.ForeignID)
Expand All @@ -47,7 +47,7 @@ func testLatest(t *testing.T, factory func() workflow.RecordStore) {

expected.Status = int(statusEnd)
expected.RunState = workflow.RunStateCompleted
err = store.Store(ctx, expected, maker)
err = store.Store(ctx, expected)
require.Nil(t, err)

latest, err = store.Latest(ctx, expected.WorkflowName, expected.ForeignID)
Expand All @@ -60,13 +60,12 @@ func testLookup(t *testing.T, factory func() workflow.RecordStore) {
t.Run("Lookup", func(t *testing.T) {
store := factory()
ctx := context.Background()
expected := dummyWireRecordWithID(t, "my_workflow", 1)
maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil }
expected := dummyWireRecord(t, "my_workflow")

err := store.Store(ctx, expected, maker)
err := store.Store(ctx, expected)
require.Nil(t, err)

latest, err := store.Lookup(ctx, 1)
latest, err := store.Lookup(ctx, expected.RunID)
require.Nil(t, err)

recordIsEqual(t, *expected, *latest)
Expand All @@ -77,30 +76,29 @@ func testStore(t *testing.T, factory func() workflow.RecordStore) {
t.Run("RecordStore", func(t *testing.T) {
store := factory()
ctx := context.Background()
expected := dummyWireRecordWithID(t, "my_workflow", 1)
maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil }
expected := dummyWireRecord(t, "my_workflow")

err := store.Store(ctx, expected, maker)
err := store.Store(ctx, expected)
require.Nil(t, err)

latest, err := store.Lookup(ctx, 1)
latest, err := store.Lookup(ctx, expected.RunID)
require.Nil(t, err)

latest.Status = int(statusMiddle)
expected.Status = int(statusMiddle)

err = store.Store(ctx, latest, maker)
err = store.Store(ctx, latest)
require.Nil(t, err)

recordIsEqual(t, *expected, *latest)

latest, err = store.Lookup(ctx, 1)
latest, err = store.Lookup(ctx, expected.RunID)
require.Nil(t, err)

latest.Status = int(statusEnd)
expected.Status = int(statusEnd)

err = store.Store(ctx, latest, maker)
err = store.Store(ctx, latest)
require.Nil(t, err)

recordIsEqual(t, *expected, *latest)
Expand All @@ -113,21 +111,15 @@ func testListOutboxEvents(t *testing.T, factory func() workflow.RecordStore) {
ctx := context.Background()
expected := dummyWireRecord(t, "my_workflow")

maker := func(recordID int64) (workflow.OutboxEventData, error) {
// Record ID would not have been set if it is a new record. Assign the recordID that the Store provides
expected.ID = recordID
return workflow.RecordToOutboxEventData(*expected, workflow.RunStateInitiated)
}

err := store.Store(ctx, expected, maker)
err := store.Store(ctx, expected)
require.Nil(t, err)

ls, err := store.ListOutboxEvents(ctx, expected.WorkflowName, 1000)
require.Nil(t, err)

require.Equal(t, 1, len(ls))

require.Equal(t, int64(1), ls[0].ID)
require.True(t, strings.Contains(ls[0].ID, "outbox-id-"))
require.Equal(t, expected.WorkflowName, ls[0].WorkflowName)

var r outboxpb.OutboxRecord
Expand All @@ -147,16 +139,10 @@ func testDeleteOutboxEvent(t *testing.T, factory func() workflow.RecordStore) {
ctx := context.Background()
expected := dummyWireRecord(t, "my_workflow")

maker := func(recordID int64) (workflow.OutboxEventData, error) {
// Run ID would not have been set if it is a new record. Assign the recordID that the Store provides
expected.ID = recordID
return workflow.RecordToOutboxEventData(*expected, workflow.RunStateInitiated)
}

err := store.Store(ctx, expected, maker)
err := store.Store(ctx, expected)
require.Nil(t, err)

latest, err := store.Lookup(ctx, 1)
latest, err := store.Lookup(ctx, expected.RunID)
require.Nil(t, err)

latest.Status = int(statusMiddle)
Expand All @@ -177,7 +163,6 @@ func testDeleteOutboxEvent(t *testing.T, factory func() workflow.RecordStore) {
func testList(t *testing.T, factory func() workflow.RecordStore) {
workflowName := "my_workflow"
secondWorkflowName := "my_second_workflow"
maker := func(recordID int64) (workflow.OutboxEventData, error) { return workflow.OutboxEventData{}, nil }

t.Run("List with no workflow specified returns all", func(t *testing.T) {
store := factory()
Expand All @@ -189,7 +174,7 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
if i > seedCount/2 {
name = secondWorkflowName
}
err := store.Store(ctx, dummyWireRecord(t, name), maker)
err := store.Store(ctx, dummyWireRecord(t, name))
require.Nil(t, err)
}

Expand All @@ -202,25 +187,23 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
require.Equal(t, 100, len(ls2))

// Make sure the last of the first page is not the same as the first of the next page
require.NotEqual(t, ls[52].ID, ls2[0])
require.NotEqual(t, ls[52].RunID, ls2[0].RunID)

ls3, err := store.List(ctx, "", 153, seedCount-153, workflow.OrderTypeAscending)
require.Nil(t, err)
require.Equal(t, seedCount-153, len(ls3))

// Make sure the last of the first page is not the same as the first of the next page.
require.NotEqual(t, ls3[152].ID, ls3[0])
require.NotEqual(t, ls3[152].RunID, ls3[0].RunID)

// Make sure that if 950 is the offset, and we only have 1000 then only 50 items will be returned.
lastPageAsc, err := store.List(ctx, "", 950, 1000, workflow.OrderTypeAscending)
require.Nil(t, err)
require.Equal(t, 50, len(lastPageAsc))
require.Equal(t, int64(1000), lastPageAsc[len(lastPageAsc)-1].ID)

lastPageDesc, err := store.List(ctx, "", 950, 1000, workflow.OrderTypeDescending)
require.Nil(t, err)
require.Equal(t, 50, len(lastPageDesc))
require.Equal(t, int64(1000), lastPageDesc[0].ID)
})

t.Run("List - WorkflowName", func(t *testing.T) {
Expand All @@ -236,21 +219,20 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
newRecord := dummyWireRecord(t, workflowName)
newRecord.Status = int(status)

err := store.Store(ctx, newRecord, maker)
err := store.Store(ctx, newRecord)
require.Nil(t, err)

secondRecord := dummyWireRecord(t, secondWorkflowName)
newRecord.Status = int(status)

err = store.Store(ctx, secondRecord, maker)
err = store.Store(ctx, secondRecord)
require.Nil(t, err)
}
}

for status, count := range config {
ls, err := store.List(ctx, workflowName, 0, 100, workflow.OrderTypeAscending, workflow.FilterByStatus(int64(status)))
require.Nil(t, err)
fmt.Println(count, len(ls))
require.Equal(t, count, len(ls))

for _, l := range ls {
Expand All @@ -268,7 +250,7 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
wr := dummyWireRecord(t, workflowName)
wr.ForeignID = foreignID

err := store.Store(ctx, wr, maker)
err := store.Store(ctx, wr)
require.Nil(t, err)
}
}
Expand Down Expand Up @@ -302,7 +284,7 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
wr := dummyWireRecord(t, workflowName)
wr.RunState = runState

err := store.Store(ctx, wr, maker)
err := store.Store(ctx, wr)
require.Nil(t, err)
}
}
Expand Down Expand Up @@ -331,7 +313,7 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
newRecord := dummyWireRecord(t, workflowName)
newRecord.Status = int(status)

err := store.Store(ctx, newRecord, maker)
err := store.Store(ctx, newRecord)
require.Nil(t, err)
}
}
Expand All @@ -349,10 +331,6 @@ func testList(t *testing.T, factory func() workflow.RecordStore) {
}

func dummyWireRecord(t *testing.T, workflowName string) *workflow.Record {
return dummyWireRecordWithID(t, workflowName, 0)
}

func dummyWireRecordWithID(t *testing.T, workflowName string, id int64) *workflow.Record {
foreignID := "Andrew Wormald"
runID, err := uuid.NewUUID()
require.Nil(t, err)
Expand All @@ -368,7 +346,6 @@ func dummyWireRecordWithID(t *testing.T, workflowName string, id int64) *workflo
createdAt := time.Now()

return &workflow.Record{
ID: id,
WorkflowName: workflowName,
ForeignID: foreignID,
RunID: runID.String(),
Expand All @@ -381,7 +358,6 @@ func dummyWireRecordWithID(t *testing.T, workflowName string, id int64) *workflo
}

func recordIsEqual(t *testing.T, a, b workflow.Record) {
require.Equal(t, a.ID, b.ID)
require.Equal(t, a.WorkflowName, b.WorkflowName)
require.Equal(t, a.ForeignID, b.ForeignID)
require.Equal(t, a.RunID, b.RunID)
Expand Down
12 changes: 3 additions & 9 deletions adapters/kafkastreamer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Producer struct {

var _ workflow.Producer = (*Producer)(nil)

func (p *Producer) Send(ctx context.Context, recordID int64, statusType int, headers map[workflow.Header]string) error {
func (p *Producer) Send(ctx context.Context, foreignID string, statusType int, headers map[workflow.Header]string) error {
for ctx.Err() == nil {
ctx, cancel := context.WithTimeout(ctx, p.WriterTimeout)
defer cancel()
Expand All @@ -56,9 +56,8 @@ func (p *Producer) Send(ctx context.Context, recordID int64, statusType int, hea
})
}

key := strconv.FormatInt(int64(recordID), 10)
msg := kafka.Message{
Key: []byte(key),
Key: []byte(foreignID),
Value: []byte(strconv.FormatInt(int64(statusType), 10)),
Headers: kHeaders,
}
Expand Down Expand Up @@ -128,11 +127,6 @@ func (c *Consumer) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err
// Append the message to the commit slice to ensure we send all messages that have been processed
commit = append(commit, m)

recordID, err := strconv.ParseInt(string(m.Key), 10, 64)
if err != nil {
return nil, nil, err
}

statusType, err := strconv.ParseInt(string(m.Value), 10, 64)
if err != nil {
return nil, nil, err
Expand All @@ -145,7 +139,7 @@ func (c *Consumer) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, err

event := &workflow.Event{
ID: m.Offset,
ForeignID: recordID,
ForeignID: string(m.Key),
Type: int(statusType),
Headers: headers,
CreatedAt: m.Time,
Expand Down
Loading

0 comments on commit 81ae418

Please sign in to comment.