Skip to content

Commit

Permalink
feat: refactor instance entity and repo; add cleanup infra (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeii authored Jan 17, 2025
1 parent 4e7cc3e commit 40990ac
Show file tree
Hide file tree
Showing 28 changed files with 1,023 additions and 527 deletions.
5 changes: 0 additions & 5 deletions cmd/swat4master/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/core/usecases/addserver"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/core/usecases/getserver"
"github.com/sergeii/swat4master/internal/core/usecases/listservers"
"github.com/sergeii/swat4master/internal/core/usecases/probeserver"
Expand Down Expand Up @@ -44,7 +43,6 @@ func NewUseCaseConfigs(cfg config.Config) UseCaseConfigs {

type Container struct {
AddServer addserver.UseCase
CleanServers cleanservers.UseCase
GetServer getserver.UseCase
ListServers listservers.UseCase
ProbeServer probeserver.UseCase
Expand All @@ -57,7 +55,6 @@ type Container struct {

func NewContainer(
addServerUseCase addserver.UseCase,
cleanServersUseCase cleanservers.UseCase,
getServerUseCase getserver.UseCase,
listServersUseCase listservers.UseCase,
probeServerUseCase probeserver.UseCase,
Expand All @@ -69,7 +66,6 @@ func NewContainer(
) Container {
return Container{
AddServer: addServerUseCase,
CleanServers: cleanServersUseCase,
GetServer: getServerUseCase,
ListServers: listServersUseCase,
ProbeServer: probeServerUseCase,
Expand All @@ -88,7 +84,6 @@ var Module = fx.Module("container",
fx.Provide(reportserver.New),
fx.Provide(renewserver.New),
fx.Provide(removeserver.New),
fx.Provide(cleanservers.New),
fx.Provide(refreshservers.New),
fx.Provide(reviveservers.New),
fx.Provide(probeserver.New),
Expand Down
52 changes: 28 additions & 24 deletions cmd/swat4master/modules/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package cleaner

import (
"context"
"time"

"github.com/jonboulle/clockwork"
"github.com/rs/zerolog"
"go.uber.org/fx"

"github.com/sergeii/swat4master/cmd/swat4master/config"
"github.com/sergeii/swat4master/internal/core/usecases/cleanservers"
"github.com/sergeii/swat4master/internal/metrics"
"github.com/sergeii/swat4master/internal/cleanup"
"github.com/sergeii/swat4master/internal/cleanup/cleaners/instancecleaner"
"github.com/sergeii/swat4master/internal/cleanup/cleaners/servercleaner"
)

type Cleaner struct{}
Expand All @@ -20,8 +20,7 @@ func Run(
stopped chan struct{},
clock clockwork.Clock,
logger *zerolog.Logger,
metrics *metrics.Collector,
uc cleanservers.UseCase,
manager *cleanup.Manager,
cfg config.Config,
) {
ticker := clock.NewTicker(cfg.CleanInterval)
Expand All @@ -42,7 +41,7 @@ func Run(
close(stopped)
return
case <-tickerCh:
clean(ctx, clock, logger, metrics, uc, cfg.CleanRetention)
manager.Clean(ctx)
}
}
}
Expand All @@ -51,16 +50,15 @@ func NewCleaner(
lc fx.Lifecycle,
cfg config.Config,
clock clockwork.Clock,
metrics *metrics.Collector,
uc cleanservers.UseCase,
manager *cleanup.Manager,
logger *zerolog.Logger,
) *Cleaner {
stopped := make(chan struct{})
stop := make(chan struct{})

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go Run(stop, stopped, clock, logger, metrics, uc, cfg) // nolint: contextcheck
go Run(stop, stopped, clock, logger, manager, cfg) // nolint: contextcheck
return nil
},
OnStop: func(context.Context) error {
Expand All @@ -73,24 +71,30 @@ func NewCleaner(
return &Cleaner{}
}

func clean(
ctx context.Context,
clock clockwork.Clock,
logger *zerolog.Logger,
metrics *metrics.Collector,
uc cleanservers.UseCase,
retention time.Duration,
) {
resp, err := uc.Execute(ctx, clock.Now().Add(-retention))
if err != nil {
logger.Error().
Err(err).
Msg("Failed to clean outdated servers")
type Opts struct {
fx.Out

ServerCleanerOpts servercleaner.Opts
InstanceCleanerOpts instancecleaner.Opts
}

func provideCleanerConfigs(cfg config.Config) Opts {
return Opts{
ServerCleanerOpts: servercleaner.Opts{
Retention: cfg.CleanRetention,
},
InstanceCleanerOpts: instancecleaner.Opts{
Retention: cfg.CleanRetention,
},
}
metrics.CleanerRemovals.Add(float64(resp.Count))
metrics.CleanerErrors.Add(float64(resp.Errors))
}

var Module = fx.Module("cleaner",
fx.Provide(cleanup.NewManager),
fx.Provide(provideCleanerConfigs),
fx.Invoke(
servercleaner.New,
instancecleaner.New,
),
fx.Provide(NewCleaner),
)
31 changes: 31 additions & 0 deletions internal/cleanup/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cleanup

import (
"context"
"sync"
)

type Cleaner interface {
Clean(ctx context.Context)
}

type Manager struct {
mutex sync.Mutex
cleaners []Cleaner
}

func NewManager() *Manager {
return &Manager{}
}

func (m *Manager) AddCleaner(c Cleaner) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.cleaners = append(m.cleaners, c)
}

func (m *Manager) Clean(ctx context.Context) {
for _, c := range m.cleaners {
go c.Clean(ctx)
}
}
63 changes: 63 additions & 0 deletions internal/cleanup/cleaners/instancecleaner/instancecleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package instancecleaner

import (
"context"
"time"

"github.com/jonboulle/clockwork"
"github.com/rs/zerolog"

"github.com/sergeii/swat4master/internal/cleanup"
"github.com/sergeii/swat4master/internal/core/entities/filterset"
"github.com/sergeii/swat4master/internal/core/repositories"
"github.com/sergeii/swat4master/internal/metrics"
)

type Opts struct {
Retention time.Duration
}

type InstanceCleaner struct {
opts Opts
instanceRepo repositories.InstanceRepository
clock clockwork.Clock
metrics *metrics.Collector
logger *zerolog.Logger
}

func New(
manager *cleanup.Manager,
opts Opts,
instanceRepo repositories.InstanceRepository,
clock clockwork.Clock,
metrics *metrics.Collector,
logger *zerolog.Logger,
) InstanceCleaner {
cleaner := InstanceCleaner{
opts: opts,
instanceRepo: instanceRepo,
clock: clock,
metrics: metrics,
logger: logger,
}
manager.AddCleaner(&cleaner)
return cleaner
}

func (c InstanceCleaner) Clean(ctx context.Context) {
// Calculate the cutoff time for cleaning instances.
cleanUntil := c.clock.Now().Add(-c.opts.Retention)
fs := filterset.NewInstanceFilterSet().UpdatedBefore(cleanUntil)

c.logger.Info().Stringer("until", cleanUntil).Msg("Starting to clean instances")

count, err := c.instanceRepo.Clear(ctx, fs)
if err != nil {
c.metrics.CleanerErrors.WithLabelValues("instances").Inc()
c.logger.Error().Err(err).Stringer("until", cleanUntil).Msg("Failed to clean instances")
return
}

c.metrics.CleanerRemovals.WithLabelValues("instances").Add(float64(count))
c.logger.Info().Stringer("until", cleanUntil).Int("removed", count).Msg("Finished cleaning instances")
}
111 changes: 111 additions & 0 deletions internal/cleanup/cleaners/instancecleaner/instancecleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package instancecleaner_test

import (
"context"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/sergeii/swat4master/internal/cleanup"
"github.com/sergeii/swat4master/internal/cleanup/cleaners/instancecleaner"
"github.com/sergeii/swat4master/internal/core/entities/filterset"
"github.com/sergeii/swat4master/internal/core/repositories"
"github.com/sergeii/swat4master/internal/metrics"
)

type MockInstanceRepository struct {
mock.Mock
repositories.InstanceRepository
}

func (m *MockInstanceRepository) Clear(ctx context.Context, fs filterset.InstanceFilterSet) (int, error) {
args := m.Called(ctx, fs)
return args.Get(0).(int), args.Error(1) // nolint: forcetypeassert
}

func TestInstanceCleaner_Clean_OK(t *testing.T) {
ctx := context.TODO()

manager := cleanup.NewManager()
collector := metrics.New()
clock := clockwork.NewFakeClock()
logger := zerolog.Nop()
options := instancecleaner.Opts{
Retention: time.Hour,
}

instanceRepo := new(MockInstanceRepository)
instanceRepo.On("Clear", ctx, mock.Anything).Return(37, nil)

cleaner := instancecleaner.New(
manager,
options,
instanceRepo,
clock,
collector,
&logger,
)
cleaner.Clean(ctx)

instanceRepo.AssertCalled(
t,
"Clear",
ctx,
mock.MatchedBy(func(fs filterset.InstanceFilterSet) bool {
updatedBefore, ok := fs.GetUpdatedBefore()
wantUpdatedBefore := ok && updatedBefore.Equal(clock.Now().Add(-time.Hour))
return wantUpdatedBefore
}),
)

cleanerRemovalsWithInstancesValue := testutil.ToFloat64(collector.CleanerRemovals.WithLabelValues("instances"))
assert.Equal(t, float64(37), cleanerRemovalsWithInstancesValue)
cleanerErrorsWithInstancesValue := testutil.ToFloat64(collector.CleanerErrors.WithLabelValues("instances"))
assert.Equal(t, float64(0), cleanerErrorsWithInstancesValue)
}

func TestInstanceCleaner_Clean_RepoError(t *testing.T) {
ctx := context.TODO()

manager := cleanup.NewManager()
collector := metrics.New()
clock := clockwork.NewFakeClock()
logger := zerolog.Nop()
options := instancecleaner.Opts{
Retention: time.Hour,
}

instanceRepo := new(MockInstanceRepository)
instanceRepo.On("Clear", ctx, mock.Anything).Return(0, assert.AnError)

cleaner := instancecleaner.New(
manager,
options,
instanceRepo,
clock,
collector,
&logger,
)
cleaner.Clean(ctx)

instanceRepo.AssertCalled(
t,
"Clear",
ctx,
mock.MatchedBy(func(fs filterset.InstanceFilterSet) bool {
updatedBefore, ok := fs.GetUpdatedBefore()
wantUpdatedBefore := ok && updatedBefore.Equal(clock.Now().Add(-time.Hour))
return wantUpdatedBefore
}),
)

cleanerRemovalsWithInstancesValue := testutil.ToFloat64(collector.CleanerRemovals.WithLabelValues("instances"))
assert.Equal(t, float64(0), cleanerRemovalsWithInstancesValue)
cleanerErrorsWithInstancesValue := testutil.ToFloat64(collector.CleanerErrors.WithLabelValues("instances"))
assert.Equal(t, float64(1), cleanerErrorsWithInstancesValue)
}
Loading

0 comments on commit 40990ac

Please sign in to comment.