Skip to content

Commit

Permalink
fix: do not fail the raft host if there is an invalid event in the log (
Browse files Browse the repository at this point in the history
#4314)

We should add proper pre log proposal validation to prevent these
  • Loading branch information
jvmakine authored Feb 5, 2025
1 parent 6d6d2a7 commit 12c5b87
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
24 changes: 16 additions & 8 deletions backend/schemaservice/changesetstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,31 @@ func TestChangesetState(t *testing.T) {
assert.Equal(t, 0, len(view.GetChangesets()))

t.Run("changeset must have id", func(t *testing.T) {
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{
event := &schema.ChangesetCreatedEvent{
Changeset: &schema.Changeset{
CreatedAt: time.Now(),
Modules: []*schema.Module{module},
Error: "",
},
}})
}
sm := schemaservice.SchemaState{}
err = sm.ApplyEvent(ctx, event)
assert.Error(t, err)
})

t.Run("deployment must must have deployment key", func(t *testing.T) {
nm := reflect.DeepCopy(module)
nm.Runtime = nil
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{
event := &schema.ChangesetCreatedEvent{
Changeset: &schema.Changeset{
Key: key.NewChangesetKey(),
CreatedAt: time.Now(),
Modules: []*schema.Module{nm},
Error: "",
},
}})
}
sm := schemaservice.SchemaState{}
err = sm.ApplyEvent(ctx, event)
assert.Error(t, err)
})

Expand Down Expand Up @@ -101,17 +105,21 @@ func TestChangesetState(t *testing.T) {

t.Run("test commit changeset in bad state", func(t *testing.T) {
// The deployment is not provisioned yet, this should fail
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCommittedEvent{
event := &schema.ChangesetCommittedEvent{
Key: changesetKey,
}})
}
sm := schemaservice.SchemaState{}
err = sm.ApplyEvent(ctx, event)
assert.Error(t, err)
})

t.Run("test prepare changeset in bad state", func(t *testing.T) {
// The deployment is not provisioned yet, this should fail
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetPreparedEvent{
event := &schema.ChangesetPreparedEvent{
Key: changesetKey,
}})
}
sm := schemaservice.SchemaState{}
err = sm.ApplyEvent(ctx, event)
assert.Error(t, err)
})

Expand Down
9 changes: 8 additions & 1 deletion backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/statemachine"
)

Expand Down Expand Up @@ -289,9 +290,15 @@ func (c *schemaStateMachine) Lookup(key struct{}) (SchemaState, error) {
func (c *schemaStateMachine) Publish(msg EventWrapper) error {
c.lock.Lock()
defer c.lock.Unlock()

logger := log.FromContext(c.runningCtx)

err := c.state.ApplyEvent(c.runningCtx, msg.Event)
if err != nil {
return fmt.Errorf("update: %w", err)
// TODO: we need to validate the events before they are
// committed to the log
logger.Errorf(err, "failed to apply event")
return nil
}
// Notify all subscribers using broadcaster
c.notifier.Notify(c.runningCtx)
Expand Down

0 comments on commit 12c5b87

Please sign in to comment.