Skip to content

Commit

Permalink
chore: delete controllers table (#3637)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 5, 2024
1 parent 25ee4ef commit 2cefbb2
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 232 deletions.
37 changes: 1 addition & 36 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ func New(

// Parallel tasks.
parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5)
parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5)
parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5)
parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10)

// This should be a singleton task, but because this is the task that
Expand All @@ -331,7 +329,6 @@ func New(
parallelTask(svc.expireStaleLeases, "expire-stale-leases", time.Second*2, time.Second, time.Second*5)

// Singleton tasks use leases to only run on a single controller.
singletonTask(svc.reapStaleControllers, "reap-stale-controllers", time.Second*2, time.Second*20, time.Second*20)
singletonTask(svc.reapStaleRunners, "reap-stale-runners", time.Second*2, time.Second, time.Second*10)
singletonTask(svc.reapCallEvents, "reap-call-events", time.Minute*5, time.Minute, time.Minute*30)
singletonTask(svc.reapAsyncCalls, "reap-async-calls", time.Second*5, time.Second, time.Second*5)
Expand Down Expand Up @@ -374,7 +371,7 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr
}

func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) {
status, err := s.dal.GetStatus(ctx)
status, err := s.dal.GetStatus(ctx, dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()})
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
Expand Down Expand Up @@ -1454,38 +1451,6 @@ func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error)
return time.Second * 1, nil
}

// Periodically remove stale (ie. have not heartbeat recently) controllers from the database.
func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleControllers(context.Background(), s.config.ControllerTimeout)
if err != nil {
return 0, fmt.Errorf("failed to delete stale controllers: %w", err)
} else if count > 0 {
logger.Debugf("Reaped %d stale controllers", count)
}
return time.Second * 10, nil
}

// Periodically update the DB with the current state of the controller.
func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error) {
_, err := s.dal.UpsertController(ctx, s.key, s.config.Advertise.String())
if err != nil {
return 0, fmt.Errorf("failed to heartbeat controller: %w", err)
}
return time.Second * 3, nil
}

func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) {
controllers, err := s.dal.GetActiveControllers(ctx)
if err != nil {
return 0, err
}
for _, listener := range s.controllerListListeners {
listener.UpdatedControllerList(ctx, controllers)
}
return time.Second * 5, nil
}

func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error {
logger := log.FromContext(ctx)
type moduleStateEntry struct {
Expand Down
32 changes: 2 additions & 30 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,7 @@ type DAL struct {
DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification]
}

func (d *DAL) GetActiveControllers(ctx context.Context) ([]dalmodel.Controller, error) {
controllers, err := d.db.GetActiveControllers(ctx)
if err != nil {
return nil, libdal.TranslatePGError(err)
}
return slices.Map(controllers, func(in dalsql.Controller) dalmodel.Controller {
return dalmodel.Controller{
Key: in.Key,
Endpoint: in.Endpoint,
}
}), nil
}

func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error) {
controllers, err := d.GetActiveControllers(ctx)
if err != nil {
return dalmodel.Status{}, fmt.Errorf("could not get control planes: %w", libdal.TranslatePGError(err))
}
func (d *DAL) GetStatus(ctx context.Context, controller dalmodel.Controller) (dalmodel.Status, error) {
runners, err := d.db.GetActiveRunners(ctx)
if err != nil {
return dalmodel.Status{}, fmt.Errorf("could not get active runners: %w", libdal.TranslatePGError(err))
Expand Down Expand Up @@ -149,7 +132,7 @@ func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error) {
return dalmodel.Status{}, fmt.Errorf("could not parse runners: %w", err)
}
return dalmodel.Status{
Controllers: controllers,
Controllers: []dalmodel.Controller{controller},
Deployments: statusDeployments,
Runners: domainRunners,
}, nil
Expand Down Expand Up @@ -306,12 +289,6 @@ func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, e
return count, err
}

// KillStaleControllers deletes controllers that have not had heartbeats for the given duration.
func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int64, error) {
count, err := d.db.KillStaleControllers(ctx, sqltypes.Duration(age))
return count, err
}

// DeregisterRunner deregisters the given runner.
func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error {
count, err := d.db.DeregisterRunner(ctx, key)
Expand Down Expand Up @@ -676,11 +653,6 @@ func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr stri
return nil
}

func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error) {
id, err := d.db.UpsertController(ctx, key, addr)
return id, libdal.TranslatePGError(err)
}

func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error) {
rows, err := d.db.GetActiveRunners(ctx)
if err != nil {
Expand Down
51 changes: 0 additions & 51 deletions backend/controller/dal/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,6 @@ WHERE d.key = sqlc.arg('key')::deployment_key;
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: UpsertController :one
INSERT INTO controllers (key, endpoint)
VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET state = 'live',
endpoint = $2,
last_seen = NOW() AT TIME ZONE 'utc'
RETURNING id;

-- name: KillStaleControllers :one
-- Mark any controller entries that haven't been updated recently as dead.
WITH matches AS (
UPDATE controllers
SET state = 'dead'
WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL
RETURNING 1)
SELECT COUNT(*)
FROM matches;

-- name: GetActiveControllers :many
SELECT *
FROM controllers c
WHERE c.state <> 'dead'
ORDER BY c.key;

-- name: SucceedAsyncCall :one
UPDATE async_calls
SET
Expand Down
71 changes: 0 additions & 71 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions backend/controller/dal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
Expand Down Expand Up @@ -57,20 +56,8 @@ type Reconciliation struct {

type ControllerState string

// Controller states.
const (
ControllerStateLive = ControllerState(sql.ControllerStateLive)
ControllerStateDead = ControllerState(sql.ControllerStateDead)
)

type RequestOrigin string

const (
RequestOriginIngress = RequestOrigin(sql.OriginIngress)
RequestOriginCron = RequestOrigin(sql.OriginCron)
RequestOriginPubsub = RequestOrigin(sql.OriginPubsub)
)

type Deployment struct {
Key model.DeploymentKey
Language string
Expand Down
3 changes: 0 additions & 3 deletions backend/controller/sql/databasetesting/devel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB,
), deleted_leases AS (
DELETE FROM leases
RETURNING 1
), deleted_controllers AS (
DELETE FROM controllers
RETURNING 1
), deleted_runners AS (
DELETE FROM runners
RETURNING 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- migrate:up
DROP TABLE controllers;

-- migrate:down

0 comments on commit 2cefbb2

Please sign in to comment.