Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vmercierfr committed Apr 16, 2024
1 parent 3c2aca2 commit c89fb58
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 71 deletions.
6 changes: 3 additions & 3 deletions internal/infra/postgresql/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestPartitionName(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.partition.GeneratePartition(tc.time)
result, _ := tc.partition.GeneratePartition(tc.time)
assert.Equal(t, tc.expected.Schema, result.Schema, "Schema don't match")
assert.Equal(t, tc.expected.Name, result.Name, "Table name don't match")
assert.Equal(t, tc.expected.LowerBound, result.LowerBound, "Lower bound don't match")
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestRetentionTableNames(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tables := tc.partition.GetRetentionPartitions(tc.time)
tables, _ := tc.partition.GetRetentionPartitions(tc.time)

assert.DeepEqual(t, tables, tc.expected)
})
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestPreProvisionedTableNames(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tables := tc.partition.GetPreProvisionedPartitions(tc.time)
tables, _ := tc.partition.GetPreProvisionedPartitions(tc.time)

assert.DeepEqual(t, tables, tc.expected)
})
Expand Down
71 changes: 49 additions & 22 deletions internal/infra/postgresql/partitionconfiguration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgresql

import (
"errors"
"fmt"
"time"
)
Expand All @@ -20,6 +21,8 @@ const (
daysInAweek int = 7
)

var ErrUnsupportedInterval = errors.New("unsupported partition interval")

type PartitionConfiguration struct {
Schema string `mapstructure:"schema" validate:"required"`
Table string `mapstructure:"table" validate:"required"`
Expand All @@ -30,7 +33,7 @@ type PartitionConfiguration struct {
CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"`
}

func (p PartitionConfiguration) GeneratePartition(forDate time.Time) Partition {
func (p PartitionConfiguration) GeneratePartition(forDate time.Time) (Partition, error) {
var suffix string

var upperBound, lowerBound any
Expand All @@ -49,6 +52,8 @@ func (p PartitionConfiguration) GeneratePartition(forDate time.Time) Partition {
case YearlyInterval:
suffix = forDate.Format("2006")
lowerBound, upperBound = getYearlyBounds(forDate)
default:
return Partition{}, ErrUnsupportedInterval
}

partition := Partition{
Expand All @@ -59,65 +64,87 @@ func (p PartitionConfiguration) GeneratePartition(forDate time.Time) Partition {
LowerBound: lowerBound,
}

return partition
return partition, nil
}

func (p PartitionConfiguration) GetRetentionPartitions(forDate time.Time) []Partition {
func (p PartitionConfiguration) GetRetentionPartitions(forDate time.Time) ([]Partition, error) {
partitions := make([]Partition, p.Retention)

for i := 1; i <= p.Retention; i++ {
prevDate := p.getPrevDate(forDate, i)
partitions[i-1] = p.GeneratePartition(prevDate)
prevDate, err := p.getPrevDate(forDate, i)
if err != nil {
return nil, fmt.Errorf("could not compute previous date: %w", err)
}

partition, err := p.GeneratePartition(prevDate)
if err != nil {
return nil, fmt.Errorf("could not generate partition: %w", err)
}

partitions[i-1] = partition
}

return partitions
return partitions, nil
}

func (p PartitionConfiguration) GetPreProvisionedPartitions(forDate time.Time) []Partition {
func (p PartitionConfiguration) GetPreProvisionedPartitions(forDate time.Time) ([]Partition, error) {
partitions := make([]Partition, p.PreProvisioned)

for i := 1; i <= p.PreProvisioned; i++ {
nextDate := p.getNextDate(forDate, i)
partitions[i-1] = p.GeneratePartition(nextDate)
nextDate, err := p.getNextDate(forDate, i)
if err != nil {
return nil, fmt.Errorf("could not compute next date: %w", err)
}

partition, err := p.GeneratePartition(nextDate)
if err != nil {
return nil, fmt.Errorf("could not generate partition: %w", err)
}

partitions[i-1] = partition
}

return partitions
return partitions, nil
}

func (p PartitionConfiguration) getPrevDate(forDate time.Time, i int) time.Time {
func (p PartitionConfiguration) getPrevDate(forDate time.Time, i int) (t time.Time, err error) {
switch p.Interval {
case DailyInterval:
return forDate.AddDate(0, 0, -i)
t = forDate.AddDate(0, 0, -i)
case WeeklyInterval:
return forDate.AddDate(0, 0, -i*daysInAweek)
t = forDate.AddDate(0, 0, -i*daysInAweek)
case MonthlyInterval:
year, month, _ := forDate.Date()

return time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, forDate.Location())
t = time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, forDate.Location())
case YearlyInterval:
year, _, _ := forDate.Date()

return time.Date(year-i, 1, 1, 0, 0, 0, 0, forDate.Location())
t = time.Date(year-i, 1, 1, 0, 0, 0, 0, forDate.Location())
default:
return time.Time{} // TODO Raise an error
return time.Time{}, ErrUnsupportedInterval
}

return t, nil
}

func (p PartitionConfiguration) getNextDate(forDate time.Time, i int) time.Time {
func (p PartitionConfiguration) getNextDate(forDate time.Time, i int) (t time.Time, err error) {
switch p.Interval {
case DailyInterval:
return forDate.AddDate(0, 0, i)
t = forDate.AddDate(0, 0, i)
case WeeklyInterval:
return forDate.AddDate(0, 0, i*daysInAweek)
t = forDate.AddDate(0, 0, i*daysInAweek)
case MonthlyInterval:
year, month, _ := forDate.Date()

return time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, forDate.Location())
t = time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, forDate.Location())
case YearlyInterval:
year, _, _ := forDate.Date()

return time.Date(year+i, 1, 1, 0, 0, 0, 0, forDate.Location())
t = time.Date(year+i, 1, 1, 0, 0, 0, 0, forDate.Location())
default:
return time.Time{} // TODO Raise an error
return time.Time{}, ErrUnsupportedInterval
}

return t, nil
}
5 changes: 4 additions & 1 deletion pkg/ppm/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func (p *PPM) checkPartitionsConfiguration(partition postgresql.PartitionConfigu

currentTime := time.Now()

expectedPartitions := getExpectedPartitions(partition, currentTime)
expectedPartitions, err := getExpectedPartitions(partition, currentTime)
if err != nil {
return fmt.Errorf("could not generate expected partitions: %w", err)
}

foundPartitions, err := p.db.ListPartitions(postgresql.Table{Schema: partition.Schema, Name: partition.Table})
if err != nil {
Expand Down
38 changes: 24 additions & 14 deletions pkg/ppm/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,40 @@ func TestCheckPartitions(t *testing.T) {

var tables []postgresql.Partition

var partition postgresql.Partition

for i := 0; i <= p.Retention; i++ {
switch p.Interval {
case postgresql.DailyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, 0, -i)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, 0, -i))
case postgresql.WeeklyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, 0, -i*7)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, 0, -i*7))
case postgresql.MonthlyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, -i, 0)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, -i, 0))
case postgresql.YearlyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(-i, 0, 0)))
partition, _ = p.GeneratePartition(time.Now().AddDate(-i, 0, 0))
default:
t.Errorf("unuspported partition interval in retention table mock")
}

tables = append(tables, partition)
}

for i := 0; i <= p.PreProvisioned; i++ {
switch p.Interval {
case postgresql.DailyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, 0, i)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, 0, i))
case postgresql.WeeklyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, 0, i*7)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, 0, i*7))
case postgresql.MonthlyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(0, i, 0)))
partition, _ = p.GeneratePartition(time.Now().AddDate(0, i, 0))
case postgresql.YearlyInterval:
tables = append(tables, p.GeneratePartition(time.Now().AddDate(i, 0, 0)))
partition, _ = p.GeneratePartition(time.Now().AddDate(i, 0, 0))
default:
t.Errorf("unuspported partition interval in preprovisonned table mock")
}

tables = append(tables, partition)
}
postgreSQLMock.On("ListPartitions", postgresql.Table{Schema: p.Schema, Name: p.Table}).Return(tables, nil).Once()
}
Expand All @@ -106,29 +112,33 @@ func TestCheckMissingPartitions(t *testing.T) {
PreProvisioned: 2,
}

todayPartition, _ := partition.GeneratePartition(today)
yesterdayPartition, _ := partition.GeneratePartition(yesterday)
tomorrowPartition, _ := partition.GeneratePartition(tomorrow)

testCases := []struct {
name string
tables []postgresql.Partition
}{
{
"Missing Yesterday retention partition",
[]postgresql.Partition{
partition.GeneratePartition(today),
partition.GeneratePartition(yesterday),
todayPartition,
yesterdayPartition,
},
},
{
"Missing Tomorrow partition",
[]postgresql.Partition{
partition.GeneratePartition(today),
partition.GeneratePartition(tomorrow),
todayPartition,
tomorrowPartition,
},
},
{
"Missing Today partition",
[]postgresql.Partition{
partition.GeneratePartition(yesterday),
partition.GeneratePartition(tomorrow),
yesterdayPartition,
tomorrowPartition,
},
},
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/ppm/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@ import (

var ErrPartitionCleanupFailed = errors.New("at least one partition contains could not be cleaned")

func getExpectedPartitions(partition postgresql.PartitionConfiguration, currentTime time.Time) (partitions []postgresql.Partition) {
partitions = append(partitions, partition.GetRetentionPartitions(currentTime)...)
partitions = append(partitions, partition.GeneratePartition(currentTime))
partitions = append(partitions, partition.GetPreProvisionedPartitions(currentTime)...)

return
}

func (p PPM) CleanupPartitions() error {
currentTime := time.Now()
partitionContainAnError := false

for name, config := range p.partitions {
p.logger.Info("Cleaning partition", "partition", name)

expectedPartitions := getExpectedPartitions(config, currentTime)
expectedPartitions, err := getExpectedPartitions(config, currentTime)
if err != nil {
return fmt.Errorf("could not generate expected partitions: %w", err)
}

parentTable := postgresql.Table{Schema: config.Schema, Name: config.Table}

foundPartitions, err := p.db.ListPartitions(parentTable)
Expand Down
32 changes: 19 additions & 13 deletions pkg/ppm/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ var OneDayPartitionConfiguration = postgresql.PartitionConfiguration{
var ErrFake = errors.New("fake error")

func TestCleanupPartitions(t *testing.T) {
yearBeforePartition := OneDayPartitionConfiguration.GeneratePartition(dayBeforeYesterday.AddDate(-1, 0, 0))
dayBeforeYesterdayPartition := OneDayPartitionConfiguration.GeneratePartition(dayBeforeYesterday)
yesterdayPartition := OneDayPartitionConfiguration.GeneratePartition(yesterday)
currentPartition := OneDayPartitionConfiguration.GeneratePartition(today)
tomorrowPartition := OneDayPartitionConfiguration.GeneratePartition(tomorrow)
dayAfterTomorrowPartition := OneDayPartitionConfiguration.GeneratePartition(dayAfterTomorrow)
yearBeforePartition, _ := OneDayPartitionConfiguration.GeneratePartition(dayBeforeYesterday.AddDate(-1, 0, 0))
dayBeforeYesterdayPartition, _ := OneDayPartitionConfiguration.GeneratePartition(dayBeforeYesterday)
yesterdayPartition, _ := OneDayPartitionConfiguration.GeneratePartition(yesterday)
currentPartition, _ := OneDayPartitionConfiguration.GeneratePartition(today)
tomorrowPartition, _ := OneDayPartitionConfiguration.GeneratePartition(tomorrow)
dayAfterTomorrowPartition, _ := OneDayPartitionConfiguration.GeneratePartition(dayAfterTomorrow)

detachPartitionConfiguration := OneDayPartitionConfiguration
detachPartitionConfiguration.CleanupPolicy = "detach"
Expand Down Expand Up @@ -114,27 +114,33 @@ func TestCleanupPartitionsFailover(t *testing.T) {

for _, config := range configuration {
table := postgresql.Table{Schema: config.Schema, Name: config.Table}

dayBeforeYesterdayPartition, _ := config.GeneratePartition(dayBeforeYesterday)
yesterdayPartitionPartition, _ := config.GeneratePartition(yesterday)
todayPartitionPartition, _ := config.GeneratePartition(today)
tomorrowPartitionPartition, _ := config.GeneratePartition(tomorrow)

partitions := []postgresql.Partition{
config.GeneratePartition(dayBeforeYesterday), // This partition should be removed by the cleanup
config.GeneratePartition(yesterday),
config.GeneratePartition(today),
config.GeneratePartition(tomorrow),
dayBeforeYesterdayPartition, // This partition should be removed by the cleanup
yesterdayPartitionPartition,
todayPartitionPartition,
tomorrowPartitionPartition,
}

postgreSQLMock.On("ListPartitions", table).Return(partitions, nil).Once()
}

// Undetachable partition will return an error on detach operation
undetachablePartition := undetachablePartitionConfiguration.GeneratePartition(dayBeforeYesterday)
undetachablePartition, _ := undetachablePartitionConfiguration.GeneratePartition(dayBeforeYesterday)
postgreSQLMock.On("DetachPartition", undetachablePartition).Return(ErrFake).Once()

// Undropable partition will return an error on drop operation
undropablePartition := undropablePartitionConfiguration.GeneratePartition(dayBeforeYesterday)
undropablePartition, _ := undropablePartitionConfiguration.GeneratePartition(dayBeforeYesterday)
postgreSQLMock.On("DetachPartition", undropablePartition).Return(nil).Once()
postgreSQLMock.On("DeletePartition", undropablePartition).Return(ErrFake).Once()

// Detach and drop will succeed
successPartition := successPartitionConfiguration.GeneratePartition(dayBeforeYesterday)
successPartition, _ := successPartitionConfiguration.GeneratePartition(dayBeforeYesterday)
postgreSQLMock.On("DetachPartition", successPartition).Return(nil).Once()
postgreSQLMock.On("DeletePartition", successPartition).Return(nil).Once()

Expand Down
24 changes: 24 additions & 0 deletions pkg/ppm/ppm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ppm

import (
"context"
"fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -34,3 +35,26 @@ func New(context context.Context, logger slog.Logger, db PostgreSQLClient, parti
logger: logger,
}
}

func getExpectedPartitions(partition postgresql.PartitionConfiguration, currentTime time.Time) (partitions []postgresql.Partition, err error) {
retentions, err := partition.GetRetentionPartitions(currentTime)
if err != nil {
return partitions, fmt.Errorf("could not generate retention partitions: %w", err)
}

current, err := partition.GeneratePartition(currentTime)
if err != nil {
return partitions, fmt.Errorf("could not generate current partition: %w", err)
}

future, err := partition.GetPreProvisionedPartitions(currentTime)
if err != nil {
return partitions, fmt.Errorf("could not generate preProvisionned partition: %w", err)
}

partitions = append(partitions, retentions...)
partitions = append(partitions, current)
partitions = append(partitions, future...)

return
}
Loading

0 comments on commit c89fb58

Please sign in to comment.