Skip to content

Commit

Permalink
chore: rename ModuleContext to DeploymentContext (#3614)
Browse files Browse the repository at this point in the history
Move this to it's own service, and also split out leases
  • Loading branch information
stuartwdouglas authored Dec 4, 2024
1 parent ac4aaf5 commit 411c570
Show file tree
Hide file tree
Showing 54 changed files with 2,044 additions and 1,796 deletions.
85 changes: 39 additions & 46 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ import (
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
leaseconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/deploymentcontext"
"github.com/TBD54566975/ftl/internal/dsn"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/modulecontext"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
Expand Down Expand Up @@ -158,7 +162,8 @@ func Start(
g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewModuleServiceHandler, svc),
rpc.GRPC(deploymentconnect.NewDeploymentServiceHandler, svc),
rpc.GRPC(leaseconnect.NewLeaseServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewSchemaServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewAdminServiceHandler, admin),
Expand Down Expand Up @@ -718,41 +723,43 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{NotReady: &msg}), nil
}

// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.GetModuleContextRequest], resp *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
name := req.Msg.Module
// GetDeploymentContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request[ftldeployment.GetDeploymentContextRequest], resp *connect.ServerStream[ftldeployment.GetDeploymentContextResponse]) error {
depName := req.Msg.Deployment
if !strings.HasPrefix(depName, "dpl-") {
// For hot reload endponts we might not have a deployment key
deps, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return fmt.Errorf("could not get active deployments: %w", err)
}
for _, dep := range deps {
if dep.Module == depName {
depName = dep.Key.String()
break
}
}
}
key, err := model.ParseDeploymentKey(depName)
if err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}
deployment, err := s.dal.GetDeployment(ctx, key)
if err != nil {
return fmt.Errorf("could not get deployment: %w", err)
}
module := deployment.Module

// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
lastChecksum := int64(-1)

dbTypes := map[string]modulecontext.DBType{}
deps, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get deployments: %w", err))
}
for _, dep := range deps {
if dep.Module == name {
for _, decl := range dep.Schema.Decls {
if db, ok := decl.(*schema.Database); ok {
dbType, err := modulecontext.DBTypeFromString(db.Type)
if err != nil {
// Not much we can do here
continue
}
dbTypes[db.Name] = dbType
}
}
break
}
}
for {
h := sha.New()

configs, err := s.cm.MapForModule(ctx, name)
configs, err := s.cm.MapForModule(ctx, module)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
}
secrets, err := s.sm.MapForModule(ctx, name)
secrets, err := s.sm.MapForModule(ctx, module)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get secrets: %w", err))
}
Expand All @@ -767,7 +774,7 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
checksum := int64(binary.BigEndian.Uint64((h.Sum(nil))[0:8]))

if checksum != lastChecksum {
response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).Build().ToProto()
response := deploymentcontext.NewBuilder(module).AddConfigs(configs).AddSecrets(secrets).Build().ToProto()

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
Expand Down Expand Up @@ -798,25 +805,11 @@ func hashConfigurationMap(h hash.Hash, m map[string][]byte) error {
return nil
}

// hashDatabaseConfiguration computes an order invariant checksum on the database
// configuration settings supplied in the map.
func hashDatabaseConfiguration(h hash.Hash, m map[string]modulecontext.Database) error {
keys := maps.Keys(m)
sort.Strings(keys)
for _, k := range keys {
_, err := h.Write(append([]byte(k), []byte(m[k].DSN)...))
if err != nil {
return fmt.Errorf("error hashing database configuration: %w", err)
}
}
return nil
}

// AcquireLease acquires a lease on behalf of a module.
//
// This is a bidirectional stream where each request from the client must be
// responded to with an empty response.
func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]) error {
func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[ftllease.AcquireLeaseRequest, ftllease.AcquireLeaseResponse]) error {
var lease leases.Lease
for {
msg, err := stream.Receive()
Expand All @@ -836,7 +829,7 @@ func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[f
}
defer lease.Release() //nolint:errcheck
}
if err = stream.Send(&ftlv1.AcquireLeaseResponse{}); err != nil {
if err = stream.Send(&ftllease.AcquireLeaseResponse{}); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send lease response: %w", err))
}
}
Expand All @@ -846,7 +839,7 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
return s.callWithRequest(ctx, headers.CopyRequestForForwarding(req), optional.None[model.RequestKey](), optional.None[model.RequestKey](), "")
}

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldeployment.PublishEventRequest]) (*connect.Response[ftldeployment.PublishEventResponse], error) {
// Publish the event.
now := time.Now().UTC()
pubishError := optional.None[string]()
Expand Down Expand Up @@ -881,7 +874,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.P
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
return connect.NewResponse(&ftlv1.PublishEventResponse{}), nil
return connect.NewResponse(&ftldeployment.PublishEventResponse{}), nil
}

func (s *Service) callWithRequest(
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/timeline/events_pubsub_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
deployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)
Expand All @@ -32,7 +32,7 @@ type PubSubPublish struct {
Time time.Time
SourceVerb schema.Ref
Topic string
Request *ftlv1.PublishEventRequest
Request *deployment.PublishEventRequest
Error optional.Option[string]
}

Expand Down
Loading

0 comments on commit 411c570

Please sign in to comment.