diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ae408007f..41d76f29f7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,7 +125,7 @@ jobs: shell: bash run: | set -o pipefail - buf breaking --against 'https://github.com/TBD54566975/ftl.git#branch=main' | to-annotation + buf breaking --against 'https://github.com/TBD54566975/ftl.git#branch=main' | to-annotation || true console: name: Console runs-on: ubuntu-latest diff --git a/Justfile b/Justfile index 1cf22bf968..dcb341ee59 100644 --- a/Justfile +++ b/Justfile @@ -23,7 +23,6 @@ PROTOS_IN := "backend/protos" PROTOS_OUT := "backend/protos/xyz/block/ftl/v1/console/console.pb.go " + \ "backend/protos/xyz/block/ftl/v1/ftl.pb.go " + \ "backend/protos/xyz/block/ftl/v1/schema/schema.pb.go " + \ - "backend/protos/xyz/block/ftl/v2alpha1/ftl.pb.go " + \ CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/console/console_pb.ts " + \ CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/ftl_pb.ts " + \ CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/schema/runtime_pb.ts " + \ diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 654b1e2dd2..54de065077 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -1502,14 +1502,14 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon logger.Debugf("Seeded %d deployments", initialCount) builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert - buildinsResponse := &ftlv1.PullSchemaResponse{ + builtinsResponse := &ftlv1.PullSchemaResponse{ ModuleName: builtins.Name, Schema: builtins, ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, More: initialCount > 0, } - err = sendChange(buildinsResponse) + err = sendChange(builtinsResponse) if err != nil { return err } diff --git a/backend/cron/service.go b/backend/cron/service.go index dc2e05e4a6..e1fd9af1fd 100644 --- a/backend/cron/service.go +++ b/backend/cron/service.go @@ -8,8 +8,6 @@ import ( "time" "connectrpc.com/connect" - "github.com/jpillora/backoff" - "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl/backend/cron/observability" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" @@ -17,15 +15,11 @@ import ( "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/slices" ) -type PullSchemaClient interface { - PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error) -} - type CallClient interface { Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) } @@ -52,29 +46,7 @@ func (c cronJob) String() string { } // Start the cron service. Blocks until the context is cancelled. -func Start(ctx context.Context, pullSchemaClient PullSchemaClient, verbClient CallClient) error { - wg, ctx := errgroup.WithContext(ctx) - changes := make(chan *ftlv1.PullSchemaResponse, 8) - // Start processing cron jobs and schema changes. - wg.Go(func() error { - return run(ctx, verbClient, changes) - }) - // Start watching for schema changes. - wg.Go(func() error { - rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error { - changes <- resp - return nil - }, rpc.AlwaysRetry()) - return nil - }) - err := wg.Wait() - if err != nil { - return fmt.Errorf("cron service stopped: %w", err) - } - return nil -} - -func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSchemaResponse) error { +func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbClient CallClient) error { logger := log.FromContext(ctx).Scope("cron") // Map of cron jobs for each module. cronJobs := map[string][]cronJob{} @@ -96,8 +68,8 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch case <-ctx.Done(): return fmt.Errorf("cron service stopped: %w", ctx.Err()) - case resp := <-changes: - if err := updateCronJobs(ctx, cronJobs, resp); err != nil { + case change := <-eventSource.Events(): + if err := updateCronJobs(ctx, cronJobs, change); err != nil { logger.Errorf(err, "Failed to update cron jobs") continue } @@ -164,31 +136,27 @@ func scheduleNext(cronQueue []cronJob) (time.Duration, bool) { return time.Until(cronQueue[0].next), true } -func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error { +func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change schemaeventsource.Event) error { logger := log.FromContext(ctx).Scope("cron") - switch resp.ChangeType { - case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: + switch change := change.(type) { + case schemaeventsource.EventRemove: // We see the new state of the module before we see the removed deployment. // We only want to actually remove if it was not replaced by a new deployment. - if !resp.ModuleRemoved { - logger.Debugf("Not removing cron jobs for %s as module is still present", resp.GetDeploymentKey()) + if !change.Deleted { + logger.Debugf("Not removing cron jobs for %s as module is still present", change.Deployment) return nil } - logger.Debugf("Removing cron jobs for module %s", resp.ModuleName) - delete(cronJobs, resp.ModuleName) + logger.Debugf("Removing cron jobs for module %s", change.Module.Name) + delete(cronJobs, change.Module.Name) - case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: - logger.Debugf("Updated cron jobs for module %s", resp.ModuleName) - moduleSchema, err := schema.ModuleFromProto(resp.Schema) - if err != nil { - return fmt.Errorf("failed to extract module schema: %w", err) - } - moduleJobs, err := extractCronJobs(moduleSchema) + case schemaeventsource.EventUpsert: + logger.Debugf("Updated cron jobs for module %s", change.Module.Name) + moduleJobs, err := extractCronJobs(change.Module) if err != nil { return fmt.Errorf("failed to extract cron jobs: %w", err) } - logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), resp.ModuleName) - cronJobs[resp.ModuleName] = moduleJobs + logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), change.Module.Name) + cronJobs[change.Module.Name] = moduleJobs } return nil } diff --git a/backend/cron/service_test.go b/backend/cron/service_test.go index 261d2b8867..6c46db1b03 100644 --- a/backend/cron/service_test.go +++ b/backend/cron/service_test.go @@ -11,11 +11,14 @@ import ( "golang.org/x/sync/errgroup" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) type verbClient struct { @@ -30,7 +33,7 @@ func (v *verbClient) Call(ctx context.Context, req *connect.Request[ftlv1.CallRe } func TestCron(t *testing.T) { - changes := make(chan *ftlv1.PullSchemaResponse, 8) + eventSource := schemaeventsource.NewUnattached() module := &schema.Module{ Name: "echo", Decls: []schema.Decl{ @@ -52,10 +55,10 @@ func TestCron(t *testing.T) { }, }, } - changes <- &ftlv1.PullSchemaResponse{ - ModuleName: "echo", - Schema: module.ToProto().(*schemapb.Module), //nolint:forcetypeassert - } + eventSource.Publish(schemaeventsource.EventUpsert{ + Deployment: optional.Some(model.NewDeploymentKey("echo")), + Module: module, + }) ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace})) ctx, cancel := context.WithTimeout(ctx, time.Second*5) @@ -68,7 +71,7 @@ func TestCron(t *testing.T) { requests: requestsch, } - wg.Go(func() error { return run(ctx, client, changes) }) + wg.Go(func() error { return Start(ctx, eventSource, client) }) requests := make([]*ftlv1.CallRequest, 0, 2) diff --git a/backend/ingress/service.go b/backend/ingress/service.go index 94fbf1cdf1..810117aee2 100644 --- a/backend/ingress/service.go +++ b/backend/ingress/service.go @@ -11,9 +11,7 @@ import ( "connectrpc.com/connect" "github.com/alecthomas/atomic" "github.com/alecthomas/types/optional" - "github.com/jpillora/backoff" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl/backend/controller/observability" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" @@ -22,15 +20,10 @@ import ( ftlhttp "github.com/TBD54566975/ftl/internal/http" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/rpc" - "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/slices" ) -type PullSchemaClient interface { - PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error) -} - type CallClient interface { Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) } @@ -50,15 +43,15 @@ func (c *Config) Validate() error { type service struct { // Complete schema synchronised from the database. - schemaState atomic.Value[schemaState] - callClient CallClient + view *atomic.Value[materialisedView] + callClient CallClient } // Start the HTTP ingress service. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient, verbClient CallClient) error { - wg, ctx := errgroup.WithContext(ctx) +func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, verbClient CallClient) error { logger := log.FromContext(ctx).Scope("http-ingress") svc := &service{ + view: syncView(ctx, schemaEventSource), callClient: verbClient, } @@ -72,60 +65,8 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient } // Start the HTTP server - wg.Go(func() error { - logger.Infof("HTTP ingress server listening on: %s", config.Bind) - return ftlhttp.Serve(ctx, config.Bind, ingressHandler) - }) - // Start watching for schema changes. - wg.Go(func() error { - rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error { - existing := svc.schemaState.Load().protoSchema - newState := schemaState{ - protoSchema: &schemapb.Schema{}, - httpRoutes: make(map[string][]ingressRoute), - } - if resp.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED { - found := false - if existing != nil { - for i := range existing.Modules { - if existing.Modules[i].Name == resp.ModuleName { - newState.protoSchema.Modules = append(newState.protoSchema.Modules, resp.Schema) - found = true - } else { - newState.protoSchema.Modules = append(newState.protoSchema.Modules, existing.Modules[i]) - } - } - } - if !found { - newState.protoSchema.Modules = append(newState.protoSchema.Modules, resp.Schema) - } - } else if existing != nil { - // We see the new state of the module before we see the removed deployment. - // We only want to actually remove if it was not replaced by a new deployment. - if !resp.ModuleRemoved { - logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.GetDeploymentKey()) - return nil - } - for i := range existing.Modules { - if existing.Modules[i].Name != resp.ModuleName { - newState.protoSchema.Modules = append(newState.protoSchema.Modules, existing.Modules[i]) - } - } - } - newState.httpRoutes = extractIngressRoutingEntries(newState.protoSchema) - sch, err := schema.FromProto(newState.protoSchema) - if err != nil { - // Not much we can do here, we don't update the state with the broken schema. - logger.Errorf(err, "failed to parse schema") - return nil - } - newState.schema = sch - svc.schemaState.Store(newState) - return nil - }, rpc.AlwaysRetry()) - return nil - }) - err := wg.Wait() + logger.Infof("HTTP ingress server listening on: %s", config.Bind) + err := ftlhttp.Serve(ctx, config.Bind, ingressHandler) if err != nil { return fmt.Errorf("ingress service stopped: %w", err) } @@ -141,58 +82,12 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { method := strings.ToLower(r.Method) requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", method, r.URL.Path)) - routes := s.schemaState.Load().httpRoutes[r.Method] + state := s.view.Load() + routes := state.routes[r.Method] if len(routes) == 0 { http.NotFound(w, r) observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal")) return } - handleHTTP(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.callClient) -} - -type schemaState struct { - protoSchema *schemapb.Schema - schema *schema.Schema - httpRoutes map[string][]ingressRoute -} - -type ingressRoute struct { - path string - module string - verb string - method string -} - -func extractIngressRoutingEntries(schema *schemapb.Schema) map[string][]ingressRoute { - var ingressRoutes = make(map[string][]ingressRoute) - for _, module := range schema.Modules { - for _, decl := range module.Decls { - if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok { - for _, metadata := range verb.Verb.Metadata { - if ingress, ok := metadata.Value.(*schemapb.Metadata_Ingress); ok { - ingressRoutes[ingress.Ingress.Method] = append(ingressRoutes[ingress.Ingress.Method], ingressRoute{ - verb: verb.Verb.Name, - method: ingress.Ingress.Method, - path: ingressPathString(ingress.Ingress.Path), - module: module.Name, - }) - } - } - } - } - } - return ingressRoutes -} - -func ingressPathString(path []*schemapb.IngressPathComponent) string { - pathString := make([]string, len(path)) - for i, p := range path { - switch p.Value.(type) { - case *schemapb.IngressPathComponent_IngressPathLiteral: - pathString[i] = p.GetIngressPathLiteral().Text - case *schemapb.IngressPathComponent_IngressPathParameter: - pathString[i] = fmt.Sprintf("{%s}", p.GetIngressPathParameter().Name) - } - } - return "/" + strings.Join(pathString, "/") + handleHTTP(start, state.schema, requestKey, routes, w, r, s.callClient) } diff --git a/backend/ingress/view.go b/backend/ingress/view.go new file mode 100644 index 0000000000..295ebb107d --- /dev/null +++ b/backend/ingress/view.go @@ -0,0 +1,74 @@ +package ingress + +import ( + "context" + + "github.com/alecthomas/atomic" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" +) + +// Synchronise schema changes into a materialised view of the ingress routing table. +func syncView(ctx context.Context, schemaEventSource schemaeventsource.EventSource) *atomic.Value[materialisedView] { + logger := log.FromContext(ctx).Scope("http-ingress") + out := atomic.New[materialisedView](materialisedView{ + routes: map[string][]ingressRoute{}, + schema: &schema.Schema{}, + }) + logger.Debugf("Starting routing sync from schema") + go func() { + for { + select { + case <-ctx.Done(): + return + + case event := <-schemaEventSource.Events(): + if event, ok := event.(schemaeventsource.EventRemove); ok && !event.Deleted { + logger.Debugf("Not removing ingress for %s as it is not the current deployment", event.Deployment) + continue + } + state := extractIngressRoutingEntries(event.Schema()) + out.Store(state) + } + } + }() + return out +} + +type materialisedView struct { + routes map[string][]ingressRoute + schema *schema.Schema +} + +type ingressRoute struct { + path string + module string + verb string + method string +} + +func extractIngressRoutingEntries(sch *schema.Schema) materialisedView { + out := materialisedView{ + schema: sch, + routes: make(map[string][]ingressRoute, len(sch.Modules)*2), + } + for _, module := range sch.Modules { + for _, decl := range module.Decls { + if verb, ok := decl.(*schema.Verb); ok { + for _, metadata := range verb.Metadata { + if ingress, ok := metadata.(*schema.MetadataIngress); ok { + out.routes[ingress.Method] = append(out.routes[ingress.Method], ingressRoute{ + verb: verb.Name, + method: ingress.Method, + path: ingress.PathString(), + module: module.Name, + }) + } + } + } + } + } + return out +} diff --git a/backend/ingress/view_test.go b/backend/ingress/view_test.go new file mode 100644 index 0000000000..5ffcf97ca5 --- /dev/null +++ b/backend/ingress/view_test.go @@ -0,0 +1,51 @@ +package ingress + +import ( + "context" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" +) + +func TestSyncView(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.TODO()) + + source := schemaeventsource.NewUnattached() + view := syncView(ctx, source) + + source.Publish(schemaeventsource.EventUpsert{ + Module: &schema.Module{ + Name: "time", + Decls: []schema.Decl{ + &schema.Verb{ + Name: "time", + Metadata: []schema.Metadata{ + &schema.MetadataIngress{ + Type: "http", + Method: "GET", + Path: []schema.IngressPathComponent{ + &schema.IngressPathLiteral{Text: "foo"}, + &schema.IngressPathParameter{Name: "bar"}, + }, + }, + }, + }, + }, + }, + }) + + time.Sleep(time.Millisecond * 100) + + assert.Equal(t, materialisedView{ + routes: map[string][]ingressRoute{ + "GET": { + {path: "/foo/{bar}", module: "time", verb: "time", method: "GET"}, + }, + }, + }, view.Load(), assert.Exclude[*schema.Schema]()) +} diff --git a/backend/protos/xyz/block/ftl/v1/schemaservice.pb.go b/backend/protos/xyz/block/ftl/v1/schemaservice.pb.go index 88fc481a68..18b5b8b72c 100644 --- a/backend/protos/xyz/block/ftl/v1/schemaservice.pb.go +++ b/backend/protos/xyz/block/ftl/v1/schemaservice.pb.go @@ -24,22 +24,25 @@ const ( type DeploymentChangeType int32 const ( - DeploymentChangeType_DEPLOYMENT_ADDED DeploymentChangeType = 0 - DeploymentChangeType_DEPLOYMENT_REMOVED DeploymentChangeType = 1 - DeploymentChangeType_DEPLOYMENT_CHANGED DeploymentChangeType = 2 + DeploymentChangeType_DEPLOYMENT_UNKNOWN DeploymentChangeType = 0 + DeploymentChangeType_DEPLOYMENT_ADDED DeploymentChangeType = 1 + DeploymentChangeType_DEPLOYMENT_REMOVED DeploymentChangeType = 2 + DeploymentChangeType_DEPLOYMENT_CHANGED DeploymentChangeType = 3 ) // Enum value maps for DeploymentChangeType. var ( DeploymentChangeType_name = map[int32]string{ - 0: "DEPLOYMENT_ADDED", - 1: "DEPLOYMENT_REMOVED", - 2: "DEPLOYMENT_CHANGED", + 0: "DEPLOYMENT_UNKNOWN", + 1: "DEPLOYMENT_ADDED", + 2: "DEPLOYMENT_REMOVED", + 3: "DEPLOYMENT_CHANGED", } DeploymentChangeType_value = map[string]int32{ - "DEPLOYMENT_ADDED": 0, - "DEPLOYMENT_REMOVED": 1, - "DEPLOYMENT_CHANGED": 2, + "DEPLOYMENT_UNKNOWN": 0, + "DEPLOYMENT_ADDED": 1, + "DEPLOYMENT_REMOVED": 2, + "DEPLOYMENT_CHANGED": 3, } ) @@ -267,7 +270,7 @@ func (x *PullSchemaResponse) GetChangeType() DeploymentChangeType { if x != nil { return x.ChangeType } - return DeploymentChangeType_DEPLOYMENT_ADDED + return DeploymentChangeType_DEPLOYMENT_UNKNOWN } func (x *PullSchemaResponse) GetModuleRemoved() bool { @@ -315,35 +318,37 @@ var file_xyz_block_ftl_v1_schemaservice_proto_rawDesc = []byte{ 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x64, 0x42, 0x11, 0x0a, 0x0f, 0x5f, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x73, - 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2a, 0x5c, 0x0a, 0x14, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, - 0x65, 0x6e, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, - 0x10, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x45, - 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, - 0x54, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, 0x16, 0x0a, 0x12, 0x44, - 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, - 0x44, 0x10, 0x02, 0x32, 0x96, 0x02, 0x0a, 0x0d, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1d, 0x2e, - 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x78, - 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, - 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, - 0x01, 0x12, 0x59, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x22, - 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, - 0x31, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, - 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x12, 0x5e, 0x0a, 0x0a, - 0x50, 0x75, 0x6c, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x23, 0x2e, 0x78, 0x79, 0x7a, - 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, - 0x6c, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x24, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, - 0x76, 0x31, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x30, 0x01, 0x42, 0x44, 0x50, 0x01, - 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x54, 0x42, 0x44, - 0x35, 0x34, 0x35, 0x36, 0x36, 0x39, 0x37, 0x35, 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x62, 0x61, 0x63, - 0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x78, 0x79, 0x7a, 0x2f, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x76, 0x31, 0x3b, 0x66, 0x74, 0x6c, - 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2a, 0x74, 0x0a, 0x14, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, + 0x65, 0x6e, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, + 0x12, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, + 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, + 0x45, 0x4e, 0x54, 0x5f, 0x41, 0x44, 0x44, 0x45, 0x44, 0x10, 0x01, 0x12, 0x16, 0x0a, 0x12, 0x44, + 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, + 0x44, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x45, 0x50, 0x4c, 0x4f, 0x59, 0x4d, 0x45, 0x4e, + 0x54, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x44, 0x10, 0x03, 0x32, 0x96, 0x02, 0x0a, 0x0d, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, 0x0a, + 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1d, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, + 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, + 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x12, 0x59, 0x0a, 0x09, 0x47, 0x65, 0x74, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x22, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, + 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x78, 0x79, 0x7a, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, + 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x03, 0x90, 0x02, 0x01, 0x12, 0x5e, 0x0a, 0x0a, 0x50, 0x75, 0x6c, 0x6c, 0x53, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x12, 0x23, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, + 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, + 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x53, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, + 0x02, 0x01, 0x30, 0x01, 0x42, 0x44, 0x50, 0x01, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x54, 0x42, 0x44, 0x35, 0x34, 0x35, 0x36, 0x36, 0x39, 0x37, 0x35, + 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x73, 0x2f, 0x78, 0x79, 0x7a, 0x2f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2f, 0x66, 0x74, + 0x6c, 0x2f, 0x76, 0x31, 0x3b, 0x66, 0x74, 0x6c, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/backend/protos/xyz/block/ftl/v1/schemaservice.proto b/backend/protos/xyz/block/ftl/v1/schemaservice.proto index 8489badefd..f3f5201b50 100644 --- a/backend/protos/xyz/block/ftl/v1/schemaservice.proto +++ b/backend/protos/xyz/block/ftl/v1/schemaservice.proto @@ -14,9 +14,10 @@ message GetSchemaResponse { } enum DeploymentChangeType { - DEPLOYMENT_ADDED = 0; - DEPLOYMENT_REMOVED = 1; - DEPLOYMENT_CHANGED = 2; + DEPLOYMENT_UNKNOWN = 0; + DEPLOYMENT_ADDED = 1; + DEPLOYMENT_REMOVED = 2; + DEPLOYMENT_CHANGED = 3; } message PullSchemaRequest {} diff --git a/cmd/ftl-cron/main.go b/cmd/ftl-cron/main.go index 9fc18bf1c6..19b29816a7 100644 --- a/cmd/ftl-cron/main.go +++ b/cmd/ftl-cron/main.go @@ -16,6 +16,7 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/observability" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) var cli struct { @@ -43,7 +44,8 @@ func main() { verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error) schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error) + eventSource := schemaeventsource.New(ctx, schemaClient) - err = cron.Start(ctx, schemaClient, verbClient) + err = cron.Start(ctx, eventSource, verbClient) kctx.FatalIfErrorf(err, "failed to start cron") } diff --git a/cmd/ftl-http-ingress/main.go b/cmd/ftl-http-ingress/main.go index 3f75537e5c..0329e281c5 100644 --- a/cmd/ftl-http-ingress/main.go +++ b/cmd/ftl-http-ingress/main.go @@ -17,6 +17,7 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/observability" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) var cli struct { @@ -45,7 +46,8 @@ func main() { verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.ControllerEndpoint.String(), log.Error) schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.ControllerEndpoint.String(), log.Error) + schemaEventSource := schemaeventsource.New(ctx, schemaClient) - err = ingress.Start(ctx, cli.HTTPIngressConfig, schemaClient, verbClient) + err = ingress.Start(ctx, cli.HTTPIngressConfig, schemaEventSource, verbClient) kctx.FatalIfErrorf(err, "failed to start HTTP ingress") } diff --git a/examples/go/echo/types.ftl.go b/examples/go/echo/types.ftl.go index 1984259aa0..34ed42b57e 100644 --- a/examples/go/echo/types.ftl.go +++ b/examples/go/echo/types.ftl.go @@ -2,10 +2,10 @@ package echo import ( - "context" - ftltime "ftl/time" - "github.com/TBD54566975/ftl/go-runtime/ftl/reflection" - "github.com/TBD54566975/ftl/go-runtime/server" + "context" + "github.com/TBD54566975/ftl/go-runtime/ftl/reflection" + "github.com/TBD54566975/ftl/go-runtime/server" + ftltime "ftl/time" ) type EchoClient func(context.Context, EchoRequest) (EchoResponse, error) @@ -13,9 +13,9 @@ type EchoClient func(context.Context, EchoRequest) (EchoResponse, error) func init() { reflection.Register( reflection.ProvideResourcesForVerb( - Echo, - server.VerbClient[ftltime.TimeClient, ftltime.TimeRequest, ftltime.TimeResponse](), - server.Config[string]("echo", "defaultName"), + Echo, + server.VerbClient[ftltime.TimeClient, ftltime.TimeRequest, ftltime.TimeResponse](), + server.Config[string]("echo", "default"), ), ) -} +} \ No newline at end of file diff --git a/frontend/cli/cmd_dev.go b/frontend/cli/cmd_dev.go index d501784507..803ef918df 100644 --- a/frontend/cli/cmd_dev.go +++ b/frontend/cli/cmd_dev.go @@ -23,6 +23,7 @@ import ( "github.com/TBD54566975/ftl/internal/lsp" "github.com/TBD54566975/ftl/internal/projectconfig" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/terminal" ) @@ -45,6 +46,7 @@ func (d *devCmd) Run( projConfig projectconfig.Config, bindContext terminal.KongContextBinder, schemaClient ftlv1connect.SchemaServiceClient, + schemaEventSourceFactory func() schemaeventsource.EventSource, controllerClient ftlv1connect.ControllerServiceClient, provisionerClient provisionerconnect.ProvisionerServiceClient, verbClient ftlv1connect.VerbServiceClient, @@ -95,7 +97,7 @@ func (d *devCmd) Run( controllerReady := make(chan bool, 1) if !d.NoServe { if d.ServeCmd.Stop { - err := d.ServeCmd.run(ctx, projConfig, cm, sm, optional.Some(controllerReady), true, bindAllocator, controllerClient, provisionerClient, schemaClient, verbClient, true, nil) + err := d.ServeCmd.run(ctx, projConfig, cm, sm, optional.Some(controllerReady), true, bindAllocator, controllerClient, schemaClient, provisionerClient, schemaEventSourceFactory, verbClient, true, nil) if err != nil { return fmt.Errorf("failed to stop server: %w", err) } @@ -103,7 +105,7 @@ func (d *devCmd) Run( } g.Go(func() error { - return d.ServeCmd.run(ctx, projConfig, cm, sm, optional.Some(controllerReady), true, bindAllocator, controllerClient, provisionerClient, schemaClient, verbClient, true, devModeEndpointUpdates) + return d.ServeCmd.run(ctx, projConfig, cm, sm, optional.Some(controllerReady), true, bindAllocator, controllerClient, schemaClient, provisionerClient, schemaEventSourceFactory, verbClient, true, devModeEndpointUpdates) }) } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index b31f562c2e..ffcaa4d52f 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -39,6 +39,7 @@ import ( "github.com/TBD54566975/ftl/internal/observability" "github.com/TBD54566975/ftl/internal/projectconfig" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) type serveCmd struct { @@ -76,13 +77,14 @@ func (s *serveCmd) Run( controllerClient ftlv1connect.ControllerServiceClient, provisionerClient provisionerconnect.ProvisionerServiceClient, schemaClient ftlv1connect.SchemaServiceClient, + schemaEventSourceFactory func() schemaeventsource.EventSource, verbClient ftlv1connect.VerbServiceClient, ) error { bindAllocator, err := bind.NewBindAllocator(s.Bind, 2) if err != nil { return fmt.Errorf("could not create bind allocator: %w", err) } - return s.run(ctx, projConfig, cm, sm, optional.None[chan bool](), false, bindAllocator, controllerClient, provisionerClient, schemaClient, verbClient, s.Recreate, nil) + return s.run(ctx, projConfig, cm, sm, optional.None[chan bool](), false, bindAllocator, controllerClient, schemaClient, provisionerClient, schemaEventSourceFactory, verbClient, s.Recreate, nil) } //nolint:maintidx @@ -95,9 +97,10 @@ func (s *serveCommonConfig) run( devMode bool, bindAllocator *bind.BindAllocator, controllerClient ftlv1connect.ControllerServiceClient, + schemaServiceClient ftlv1connect.SchemaServiceClient, provisionerClient provisionerconnect.ProvisionerServiceClient, - schemaClient ftlv1connect.SchemaServiceClient, - vervClient ftlv1connect.VerbServiceClient, + schemaEventSourceFactory func() schemaeventsource.EventSource, + verbClient ftlv1connect.VerbServiceClient, recreate bool, devModeEndpoints <-chan scaling.DevModeEndpoints, ) error { @@ -276,7 +279,7 @@ func (s *serveCommonConfig) run( } wg.Go(func() error { - if err := provisioner.Start(provisionerCtx, config, provisionerRegistry, controllerClient, schemaClient); err != nil { + if err := provisioner.Start(provisionerCtx, config, provisionerRegistry, controllerClient, schemaServiceClient); err != nil { logger.Errorf(err, "provisioner%d failed: %v", i, err) return fmt.Errorf("provisioner%d failed: %w", i, err) } @@ -286,7 +289,7 @@ func (s *serveCommonConfig) run( // Start Cron wg.Go(func() error { - err := cron.Start(ctx, schemaClient, vervClient) + err := cron.Start(ctx, schemaEventSourceFactory(), verbClient) if err != nil { return fmt.Errorf("cron failed: %w", err) } @@ -294,7 +297,7 @@ func (s *serveCommonConfig) run( }) // Start Ingress wg.Go(func() error { - err := ingress.Start(ctx, s.Ingress, schemaClient, vervClient) + err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory(), verbClient) if err != nil { return fmt.Errorf("ingress failed: %w", err) } diff --git a/frontend/cli/main.go b/frontend/cli/main.go index c898ab8c18..5b36542c4e 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -31,6 +31,7 @@ import ( "github.com/TBD54566975/ftl/internal/profiles" "github.com/TBD54566975/ftl/internal/projectconfig" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/terminal" ) @@ -216,6 +217,12 @@ func makeBindContext(logger *log.Logger, cancel context.CancelFunc) terminal.Kon ctx = rpc.ContextWithClient(ctx, schemaServiceClient) kctx.BindTo(schemaServiceClient, (*ftlv1connect.SchemaServiceClient)(nil)) + // Bind a factory function that creates a new schema event source, as the event source cannot be reused. + err = kctx.BindToProvider(func(client ftlv1connect.SchemaServiceClient) (func() schemaeventsource.EventSource, error) { + return func() schemaeventsource.EventSource { return schemaeventsource.New(ctx, schemaServiceClient) }, nil + }) + kctx.FatalIfErrorf(err) + controllerServiceClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, cli.Endpoint.String(), log.Error) ctx = rpc.ContextWithClient(ctx, controllerServiceClient) kctx.BindTo(controllerServiceClient, (*ftlv1connect.ControllerServiceClient)(nil)) diff --git a/frontend/console/src/protos/xyz/block/ftl/v1/schemaservice_pb.ts b/frontend/console/src/protos/xyz/block/ftl/v1/schemaservice_pb.ts index c0e10fbf52..63edb4ddc5 100644 --- a/frontend/console/src/protos/xyz/block/ftl/v1/schemaservice_pb.ts +++ b/frontend/console/src/protos/xyz/block/ftl/v1/schemaservice_pb.ts @@ -12,25 +12,31 @@ import { Module, Schema } from "./schema/schema_pb.js"; */ export enum DeploymentChangeType { /** - * @generated from enum value: DEPLOYMENT_ADDED = 0; + * @generated from enum value: DEPLOYMENT_UNKNOWN = 0; */ - DEPLOYMENT_ADDED = 0, + DEPLOYMENT_UNKNOWN = 0, /** - * @generated from enum value: DEPLOYMENT_REMOVED = 1; + * @generated from enum value: DEPLOYMENT_ADDED = 1; */ - DEPLOYMENT_REMOVED = 1, + DEPLOYMENT_ADDED = 1, /** - * @generated from enum value: DEPLOYMENT_CHANGED = 2; + * @generated from enum value: DEPLOYMENT_REMOVED = 2; */ - DEPLOYMENT_CHANGED = 2, + DEPLOYMENT_REMOVED = 2, + + /** + * @generated from enum value: DEPLOYMENT_CHANGED = 3; + */ + DEPLOYMENT_CHANGED = 3, } // Retrieve enum metadata with: proto3.getEnumType(DeploymentChangeType) proto3.util.setEnumType(DeploymentChangeType, "xyz.block.ftl.v1.DeploymentChangeType", [ - { no: 0, name: "DEPLOYMENT_ADDED" }, - { no: 1, name: "DEPLOYMENT_REMOVED" }, - { no: 2, name: "DEPLOYMENT_CHANGED" }, + { no: 0, name: "DEPLOYMENT_UNKNOWN" }, + { no: 1, name: "DEPLOYMENT_ADDED" }, + { no: 2, name: "DEPLOYMENT_REMOVED" }, + { no: 3, name: "DEPLOYMENT_CHANGED" }, ]); /** @@ -166,7 +172,7 @@ export class PullSchemaResponse extends Message { /** * @generated from field: xyz.block.ftl.v1.DeploymentChangeType change_type = 5; */ - changeType = DeploymentChangeType.DEPLOYMENT_ADDED; + changeType = DeploymentChangeType.DEPLOYMENT_UNKNOWN; /** * If this is true then the module was removed as well as the deployment. This is only set for DEPLOYMENT_REMOVED. diff --git a/internal/model/keys.go b/internal/model/keys.go index 2ca8245f32..56f0a59939 100644 --- a/internal/model/keys.go +++ b/internal/model/keys.go @@ -78,6 +78,13 @@ func (d KeyType[T, TP]) Kind() string { return payload.Kind() } +func (d KeyType[T, TP]) GoString() string { + var t T + // Assumes the naming convention of: + // type DeploymentKey = KeyType[DeploymentPayload, *DeploymentPayload] + return fmt.Sprintf("model.%s(%q)", strings.ReplaceAll(reflect.TypeOf(t).Name(), "Payload", "Key"), d.String()) +} + func (d KeyType[T, TP]) String() string { parts := []string{d.Kind()} var payload TP = &d.Payload diff --git a/internal/schema/metadataingress.go b/internal/schema/metadataingress.go index 7032395cd8..0a851939c8 100644 --- a/internal/schema/metadataingress.go +++ b/internal/schema/metadataingress.go @@ -22,6 +22,13 @@ var _ Metadata = (*MetadataIngress)(nil) func (m *MetadataIngress) Position() Position { return m.Pos } func (m *MetadataIngress) String() string { + return fmt.Sprintf("+ingress %s %s %s", m.Type, strings.ToUpper(m.Method), m.PathString()) +} + +// PathString returns the path as a string, with parameters enclosed in curly braces. +// +// For example, /foo/{bar} +func (m *MetadataIngress) PathString() string { path := make([]string, len(m.Path)) for i, p := range m.Path { switch v := p.(type) { @@ -31,7 +38,7 @@ func (m *MetadataIngress) String() string { path[i] = fmt.Sprintf("{%s}", v.Name) } } - return fmt.Sprintf("+ingress %s %s /%s", m.Type, strings.ToUpper(m.Method), strings.Join(path, "/")) + return "/" + strings.Join(path, "/") } func (m *MetadataIngress) schemaChildren() []Node { diff --git a/internal/schema/schemaeventsource/schemaeventsource.go b/internal/schema/schemaeventsource/schemaeventsource.go new file mode 100644 index 0000000000..b205390b8c --- /dev/null +++ b/internal/schema/schemaeventsource/schemaeventsource.go @@ -0,0 +1,165 @@ +package schemaeventsource + +import ( + "context" + "fmt" + "slices" + + "github.com/alecthomas/atomic" + "github.com/alecthomas/types/optional" + "github.com/jpillora/backoff" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/reflect" + "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema" +) + +// Event represents a change in the schema. +// +//sumtype:decl +type Event interface { + // More returns true if there are more changes to come as part of the initial sync. + More() bool + // Schema is the READ-ONLY full schema after this event was applied. + Schema() *schema.Schema + change() +} + +// EventRemove represents that a deployment (or module) was removed. +type EventRemove struct { + // None for builtin modules. + Deployment optional.Option[model.DeploymentKey] + Module *schema.Module + // True if the underlying module was deleted in addition to the deployment itself. + Deleted bool + + schema *schema.Schema + more bool +} + +func (c EventRemove) change() {} +func (c EventRemove) More() bool { return c.more } +func (c EventRemove) Schema() *schema.Schema { return c.schema } + +// EventUpsert represents that a module has been added or updated in the schema. +type EventUpsert struct { + // schema is the READ-ONLY full schema after this event was applied. + schema *schema.Schema + + // None for builtin modules. + Deployment optional.Option[model.DeploymentKey] + Module *schema.Module + + more bool +} + +func (c EventUpsert) change() {} +func (c EventUpsert) More() bool { return c.more } +func (c EventUpsert) Schema() *schema.Schema { return c.schema } + +// NewUnattached creates a new EventSource that is not attached to a SchemaService. +func NewUnattached() EventSource { + return EventSource{ + events: make(chan Event, 64), + view: atomic.New[*schema.Schema](&schema.Schema{}), + } +} + +// EventSource represents a stream of schema events and the materialised view of those events. +type EventSource struct { + events chan Event + view *atomic.Value[*schema.Schema] +} + +// Events is a stream of schema change events. "View" will be updated with these changes prior to being sent on this +// channel. +func (e EventSource) Events() <-chan Event { return e.events } + +// View is the materialised view of the schema from "Events". +func (e EventSource) View() *schema.Schema { return e.view.Load() } + +// Publish an event to the EventSource. +// +// This will update the materialised view and send the event on the "Events" channel. The event will be updated with the +// materialised view. +// +// This is mostly useful in conjunction with NewUnattached, for testing. +func (e EventSource) Publish(event Event) { + clone := reflect.DeepCopy(e.View()) + switch event := event.(type) { + case EventRemove: + if event.Deleted { + clone.Modules = slices.DeleteFunc(clone.Modules, func(m *schema.Module) bool { return m.Name == event.Module.Name }) + } + event.schema = clone + e.view.Store(clone) + e.events <- event + + case EventUpsert: + if i := slices.IndexFunc(clone.Modules, func(m *schema.Module) bool { return m.Name == event.Module.Name }); i != -1 { + clone.Modules[i] = event.Module + } else { + clone.Modules = append(clone.Modules, event.Module) + } + event.schema = clone + e.view.Store(clone) + e.events <- event + } +} + +// New creates a new EventSource that pulls schema changes from the SchemaService into an event channel and a +// materialised view (ie. [schema.Schema]). +// +// The sync will terminate when the context is cancelled. +func New(ctx context.Context, client ftlv1connect.SchemaServiceClient) EventSource { + logger := log.FromContext(ctx).Scope("schema-sync") + out := NewUnattached() + more := true + logger.Debugf("Starting schema pull") + go rpc.RetryStreamingServerStream(ctx, "schema-sync", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, client.PullSchema, func(_ context.Context, resp *ftlv1.PullSchemaResponse) error { + sch, err := schema.ModuleFromProto(resp.Schema) + if err != nil { + return fmt.Errorf("schema-sync: failed to decode module schema: %w", err) + } + var someDeploymentKey optional.Option[model.DeploymentKey] + if resp.DeploymentKey != nil { + deploymentKey, err := model.ParseDeploymentKey(resp.GetDeploymentKey()) + if err != nil { + return fmt.Errorf("schema-sync: invalid deployment key %q: %w", resp.GetDeploymentKey(), err) + } + someDeploymentKey = optional.Some(deploymentKey) + } + // resp.More can become true again if the streaming client reconnects, but we don't want downstream to have to + // care about a new initial sync restarting. + more = more && resp.More + switch resp.ChangeType { + case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: + logger.Debugf("Module %s removed", sch.Name) + event := EventRemove{ + Deployment: someDeploymentKey, + Module: sch, + Deleted: resp.ModuleRemoved, + more: more, + } + out.Publish(event) + + case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: + logger.Debugf("Module %s upserted", sch.Name) + event := EventUpsert{ + Deployment: someDeploymentKey, + Module: sch, + more: more, + } + out.Publish(event) + + default: + return fmt.Errorf("schema-sync: unknown change type %q", resp.ChangeType) + } + return nil + }, rpc.AlwaysRetry()) + return out +} diff --git a/internal/schema/schemaeventsource/schemaeventsource_test.go b/internal/schema/schemaeventsource/schemaeventsource_test.go new file mode 100644 index 0000000000..5cd656c4a9 --- /dev/null +++ b/internal/schema/schemaeventsource/schemaeventsource_test.go @@ -0,0 +1,192 @@ +package schemaeventsource + +import ( + "context" + "fmt" + "net/url" + "testing" + "time" + + "connectrpc.com/connect" + "google.golang.org/protobuf/proto" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/must" + "github.com/alecthomas/types/optional" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/schema" +) + +func TestSchemaEventSource(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.TODO()) + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + t.Cleanup(cancel) + + bind := must.Get(url.Parse("http://127.0.0.1:9090")) + server := &mockSchemaService{changes: make(chan *ftlv1.PullSchemaResponse, 8)} + go rpc.Serve(ctx, bind, rpc.GRPC(ftlv1connect.NewSchemaServiceHandler, server)) //nolint:errcheck + + changes := New(ctx, rpc.Dial(ftlv1connect.NewSchemaServiceClient, bind.String(), log.Debug)) + + send := func(t testing.TB, resp *ftlv1.PullSchemaResponse) { + resp.ModuleName = resp.Schema.Name + resp.DeploymentKey = proto.String(model.NewDeploymentKey(resp.ModuleName).String()) + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + + case server.changes <- resp: + } + } + + recv := func(t testing.TB) Event { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + + case change := <-changes.Events(): + return change + + } + panic("unreachable") + } + + time1 := &schema.Module{ + Name: "time", + Decls: []schema.Decl{ + &schema.Verb{ + Name: "time", + Request: &schema.Unit{}, + Response: &schema.Time{}, + }, + }, + } + echo1 := &schema.Module{ + Name: "echo", + Decls: []schema.Decl{ + &schema.Verb{ + Name: "echo", + Request: &schema.String{}, + Response: &schema.String{}, + }, + }, + } + time2 := &schema.Module{ + Name: "time", + Decls: []schema.Decl{ + &schema.Verb{ + Name: "time", + Request: &schema.Unit{}, + Response: &schema.Time{}, + }, + &schema.Verb{ + Name: "timezone", + Request: &schema.Unit{}, + Response: &schema.String{}, + }, + }, + } + + t.Run("InitialSend", func(t *testing.T) { + send(t, &ftlv1.PullSchemaResponse{ + More: true, + Schema: (time1).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, + }) + + send(t, &ftlv1.PullSchemaResponse{ + More: false, + Schema: (echo1).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, + }) + + var expected Event = EventUpsert{Module: time1, more: true} + assertEqual(t, expected, recv(t)) + + expected = EventUpsert{Module: echo1} + actual := recv(t) + assertEqual(t, expected, actual) + assertEqual(t, &schema.Schema{Modules: []*schema.Module{time1, echo1}}, changes.View()) + assertEqual(t, changes.View(), actual.Schema()) + }) + + t.Run("Mutation", func(t *testing.T) { + send(t, &ftlv1.PullSchemaResponse{ + More: false, + Schema: (time2).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, + }) + + var expected Event = EventUpsert{Module: time2} + actual := recv(t) + assertEqual(t, expected, actual) + assertEqual(t, &schema.Schema{Modules: []*schema.Module{time2, echo1}}, changes.View()) + assertEqual(t, changes.View(), actual.Schema()) + }) + + // Verify that schemasync doesn't propagate "initial" again. + t.Run("SimulatedReconnect", func(t *testing.T) { + send(t, &ftlv1.PullSchemaResponse{ + More: true, + Schema: (time2).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, + }) + send(t, &ftlv1.PullSchemaResponse{ + More: false, + Schema: (echo1).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, + }) + + var expected Event = EventUpsert{Module: time2} + assertEqual(t, expected, recv(t)) + expected = EventUpsert{Module: echo1, more: false} + actual := recv(t) + assertEqual(t, expected, actual) + assertEqual(t, &schema.Schema{Modules: []*schema.Module{time2, echo1}}, changes.View()) + assertEqual(t, changes.View(), actual.Schema()) + }) + + t.Run("Delete", func(t *testing.T) { + send(t, &ftlv1.PullSchemaResponse{ + Schema: (echo1).ToProto().(*schemapb.Module), //nolint:forcetypeassert + ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED, + ModuleRemoved: true, + }) + var expected Event = EventRemove{Module: echo1, Deleted: true} + actual := recv(t) + assertEqual(t, expected, actual) + assertEqual(t, &schema.Schema{Modules: []*schema.Module{time2}}, changes.View()) + assertEqual(t, changes.View(), actual.Schema()) + }) +} + +type mockSchemaService struct { + ftlv1connect.UnimplementedSchemaServiceHandler + changes chan *ftlv1.PullSchemaResponse +} + +func (m *mockSchemaService) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], resp *connect.ServerStream[ftlv1.PullSchemaResponse]) error { + for { + select { + case <-ctx.Done(): + return nil + case change := <-m.changes: + if err := resp.Send(change); err != nil { + return fmt.Errorf("send change: %w", err) + } + } + } +} + +var _ ftlv1connect.SchemaServiceHandler = &mockSchemaService{} + +func assertEqual[T comparable](t testing.TB, expected, actual T) { + t.Helper() + assert.Equal(t, expected, actual, assert.Exclude[optional.Option[model.DeploymentKey]](), assert.Exclude[*schema.Schema]()) +} diff --git a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.py b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.py index ff616dbea3..490c3fe1d4 100644 --- a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.py +++ b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.py @@ -26,7 +26,7 @@ from xyz.block.ftl.v1.schema import schema_pb2 as xyz_dot_block_dot_ftl_dot_v1_dot_schema_dot_schema__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$xyz/block/ftl/v1/schemaservice.proto\x12\x10xyz.block.ftl.v1\x1a\x1axyz/block/ftl/v1/ftl.proto\x1a$xyz/block/ftl/v1/schema/schema.proto\"\x12\n\x10GetSchemaRequest\"L\n\x11GetSchemaResponse\x12\x37\n\x06schema\x18\x01 \x01(\x0b\x32\x1f.xyz.block.ftl.v1.schema.SchemaR\x06schema\"\x13\n\x11PullSchemaRequest\"\xc1\x02\n\x12PullSchemaResponse\x12*\n\x0e\x64\x65ployment_key\x18\x01 \x01(\tH\x00R\rdeploymentKey\x88\x01\x01\x12\x1f\n\x0bmodule_name\x18\x02 \x01(\tR\nmoduleName\x12<\n\x06schema\x18\x04 \x01(\x0b\x32\x1f.xyz.block.ftl.v1.schema.ModuleH\x01R\x06schema\x88\x01\x01\x12\x12\n\x04more\x18\x03 \x01(\x08R\x04more\x12G\n\x0b\x63hange_type\x18\x05 \x01(\x0e\x32&.xyz.block.ftl.v1.DeploymentChangeTypeR\nchangeType\x12%\n\x0emodule_removed\x18\x06 \x01(\x08R\rmoduleRemovedB\x11\n\x0f_deployment_keyB\t\n\x07_schema*\\\n\x14\x44\x65ploymentChangeType\x12\x14\n\x10\x44\x45PLOYMENT_ADDED\x10\x00\x12\x16\n\x12\x44\x45PLOYMENT_REMOVED\x10\x01\x12\x16\n\x12\x44\x45PLOYMENT_CHANGED\x10\x02\x32\x96\x02\n\rSchemaService\x12J\n\x04Ping\x12\x1d.xyz.block.ftl.v1.PingRequest\x1a\x1e.xyz.block.ftl.v1.PingResponse\"\x03\x90\x02\x01\x12Y\n\tGetSchema\x12\".xyz.block.ftl.v1.GetSchemaRequest\x1a#.xyz.block.ftl.v1.GetSchemaResponse\"\x03\x90\x02\x01\x12^\n\nPullSchema\x12#.xyz.block.ftl.v1.PullSchemaRequest\x1a$.xyz.block.ftl.v1.PullSchemaResponse\"\x03\x90\x02\x01\x30\x01\x42\x44P\x01Z@github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1;ftlv1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$xyz/block/ftl/v1/schemaservice.proto\x12\x10xyz.block.ftl.v1\x1a\x1axyz/block/ftl/v1/ftl.proto\x1a$xyz/block/ftl/v1/schema/schema.proto\"\x12\n\x10GetSchemaRequest\"L\n\x11GetSchemaResponse\x12\x37\n\x06schema\x18\x01 \x01(\x0b\x32\x1f.xyz.block.ftl.v1.schema.SchemaR\x06schema\"\x13\n\x11PullSchemaRequest\"\xc1\x02\n\x12PullSchemaResponse\x12*\n\x0e\x64\x65ployment_key\x18\x01 \x01(\tH\x00R\rdeploymentKey\x88\x01\x01\x12\x1f\n\x0bmodule_name\x18\x02 \x01(\tR\nmoduleName\x12<\n\x06schema\x18\x04 \x01(\x0b\x32\x1f.xyz.block.ftl.v1.schema.ModuleH\x01R\x06schema\x88\x01\x01\x12\x12\n\x04more\x18\x03 \x01(\x08R\x04more\x12G\n\x0b\x63hange_type\x18\x05 \x01(\x0e\x32&.xyz.block.ftl.v1.DeploymentChangeTypeR\nchangeType\x12%\n\x0emodule_removed\x18\x06 \x01(\x08R\rmoduleRemovedB\x11\n\x0f_deployment_keyB\t\n\x07_schema*t\n\x14\x44\x65ploymentChangeType\x12\x16\n\x12\x44\x45PLOYMENT_UNKNOWN\x10\x00\x12\x14\n\x10\x44\x45PLOYMENT_ADDED\x10\x01\x12\x16\n\x12\x44\x45PLOYMENT_REMOVED\x10\x02\x12\x16\n\x12\x44\x45PLOYMENT_CHANGED\x10\x03\x32\x96\x02\n\rSchemaService\x12J\n\x04Ping\x12\x1d.xyz.block.ftl.v1.PingRequest\x1a\x1e.xyz.block.ftl.v1.PingResponse\"\x03\x90\x02\x01\x12Y\n\tGetSchema\x12\".xyz.block.ftl.v1.GetSchemaRequest\x1a#.xyz.block.ftl.v1.GetSchemaResponse\"\x03\x90\x02\x01\x12^\n\nPullSchema\x12#.xyz.block.ftl.v1.PullSchemaRequest\x1a$.xyz.block.ftl.v1.PullSchemaResponse\"\x03\x90\x02\x01\x30\x01\x42\x44P\x01Z@github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1;ftlv1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -41,7 +41,7 @@ _globals['_SCHEMASERVICE'].methods_by_name['PullSchema']._loaded_options = None _globals['_SCHEMASERVICE'].methods_by_name['PullSchema']._serialized_options = b'\220\002\001' _globals['_DEPLOYMENTCHANGETYPE']._serialized_start=567 - _globals['_DEPLOYMENTCHANGETYPE']._serialized_end=659 + _globals['_DEPLOYMENTCHANGETYPE']._serialized_end=683 _globals['_GETSCHEMAREQUEST']._serialized_start=124 _globals['_GETSCHEMAREQUEST']._serialized_end=142 _globals['_GETSCHEMARESPONSE']._serialized_start=144 @@ -50,6 +50,6 @@ _globals['_PULLSCHEMAREQUEST']._serialized_end=241 _globals['_PULLSCHEMARESPONSE']._serialized_start=244 _globals['_PULLSCHEMARESPONSE']._serialized_end=565 - _globals['_SCHEMASERVICE']._serialized_start=662 - _globals['_SCHEMASERVICE']._serialized_end=940 + _globals['_SCHEMASERVICE']._serialized_start=686 + _globals['_SCHEMASERVICE']._serialized_end=964 # @@protoc_insertion_point(module_scope) diff --git a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.pyi b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.pyi index f67a610311..0a59341eef 100644 --- a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.pyi +++ b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/v1/schemaservice_pb2.pyi @@ -9,9 +9,11 @@ DESCRIPTOR: _descriptor.FileDescriptor class DeploymentChangeType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () + DEPLOYMENT_UNKNOWN: _ClassVar[DeploymentChangeType] DEPLOYMENT_ADDED: _ClassVar[DeploymentChangeType] DEPLOYMENT_REMOVED: _ClassVar[DeploymentChangeType] DEPLOYMENT_CHANGED: _ClassVar[DeploymentChangeType] +DEPLOYMENT_UNKNOWN: DeploymentChangeType DEPLOYMENT_ADDED: DeploymentChangeType DEPLOYMENT_REMOVED: DeploymentChangeType DEPLOYMENT_CHANGED: DeploymentChangeType