Skip to content

Commit

Permalink
refactor: make durations a persistent yet ephemeral database entity f…
Browse files Browse the repository at this point in the history
…or query filter speedup (resolve #716)
  • Loading branch information
muety committed Feb 20, 2025
1 parent abf269a commit 924f561
Show file tree
Hide file tree
Showing 15 changed files with 1,010 additions and 672 deletions.
1,234 changes: 650 additions & 584 deletions coverage/coverage.out

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
keyValueRepository repositories.IKeyValueRepository
diagnosticsRepository repositories.IDiagnosticsRepository
metricsRepository *repositories.MetricsRepository
durationRepository *repositories.DurationRepository
)

var (
Expand Down Expand Up @@ -171,6 +172,7 @@ func main() {
keyValueRepository = repositories.NewKeyValueRepository(db)
diagnosticsRepository = repositories.NewDiagnosticsRepository(db)
metricsRepository = repositories.NewMetricsRepository(db)
durationRepository = repositories.NewDurationRepository(db)

// Services
mailService = mail.NewMailService()
Expand All @@ -180,7 +182,7 @@ func main() {
languageMappingService = services.NewLanguageMappingService(languageMappingRepository)
projectLabelService = services.NewProjectLabelService(projectLabelRepository)
heartbeatService = services.NewHeartbeatService(heartbeatRepository, languageMappingService)
durationService = services.NewDurationService(heartbeatService)
durationService = services.NewDurationService(durationRepository, heartbeatService)
summaryService = services.NewSummaryService(summaryRepository, heartbeatService, durationService, aliasService, projectLabelService)
aggregationService = services.NewAggregationService(userService, summaryService, heartbeatService)
reportService = services.NewReportService(summaryService, userService, mailService)
Expand Down
7 changes: 5 additions & 2 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
)

type gormMigrationFunc func(db *gorm.DB) error
type GormMigrationFunc func(db *gorm.DB) error

type migrationFunc struct {
f func(db *gorm.DB, cfg *config.Config) error
Expand All @@ -23,7 +23,7 @@ var (
postMigrations migrationFuncs
)

func GetMigrationFunc(cfg *config.Config) gormMigrationFunc {
func GetMigrationFunc(cfg *config.Config) GormMigrationFunc {
switch cfg.Db.Dialect {
default:
return func(db *gorm.DB) error {
Expand Down Expand Up @@ -57,6 +57,9 @@ func GetMigrationFunc(cfg *config.Config) gormMigrationFunc {
if err := db.AutoMigrate(&models.LeaderboardItem{}); err != nil && !cfg.Db.AutoMigrateFailSilently {
return err
}
if err := db.AutoMigrate(&models.Duration{}); err != nil && !cfg.Db.AutoMigrateFailSilently {
return err
}
return nil
}
}
Expand Down
32 changes: 32 additions & 0 deletions mocks/duration_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package mocks

import (
"github.com/muety/wakapi/models"
"github.com/stretchr/testify/mock"
"time"
)

type DurationRepositoryMock struct {
BaseRepositoryMock
mock.Mock
}

func (m *DurationRepositoryMock) InsertBatch(d []*models.Duration) error {
args := m.Called(d)
return args.Error(0)
}

func (m *DurationRepositoryMock) GetAllWithin(t time.Time, t2 time.Time, u *models.User) ([]*models.Duration, error) {
args := m.Called(u, t, t2)
return args.Get(0).([]*models.Duration), args.Error(1)
}

func (m *DurationRepositoryMock) DeleteByUser(u *models.User) error {
args := m.Called(u)
return args.Error(0)
}

func (m *DurationRepositoryMock) DeleteByUserBefore(u *models.User, t time.Time) error {
args := m.Called(u, t)
return args.Error(0)
}
4 changes: 2 additions & 2 deletions mocks/duration_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type DurationServiceMock struct {
mock.Mock
}

func (m *DurationServiceMock) Get(time time.Time, time2 time.Time, user *models.User, f *models.Filters) (models.Durations, error) {
args := m.Called(time, time2, user, f)
func (m *DurationServiceMock) Get(time time.Time, time2 time.Time, user *models.User, f *models.Filters, b bool) (models.Durations, error) {
args := m.Called(time, time2, user, f, b)
return args.Get(0).(models.Durations), args.Error(1)
}
11 changes: 7 additions & 4 deletions models/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"unicode"
)

// TODO: support multiple durations per time per user for different heartbeat timeouts
// see discussion at https://github.com/muety/wakapi/issues/675
type Duration struct {
UserID string `json:"user_id"`
Time CustomTime `json:"time" hash:"ignore"`
Duration time.Duration `json:"duration" hash:"ignore"`
UserID string `json:"user_id" gorm:"not null; index:idx_time_duration_user"`
Time CustomTime `json:"time" hash:"ignore" gorm:"not null; index:idx_time_duration_user"`
Duration time.Duration `json:"duration" hash:"ignore" gorm:"not null"`
Project string `json:"project"`
Language string `json:"language"`
Editor string `json:"editor"`
Expand All @@ -22,7 +24,7 @@ type Duration struct {
Branch string `json:"branch"`
Entity string `json:"Entity"`
NumHeartbeats int `json:"-" hash:"ignore"`
GroupHash string `json:"-" hash:"ignore"`
GroupHash string `json:"-" hash:"ignore" gorm:"type:varchar(17)"`
excludeEntity bool `json:"-" hash:"ignore"`
}

Expand All @@ -34,6 +36,7 @@ func (d *Duration) HashInclude(field string, v interface{}) (bool, error) {
field == "Duration" ||
field == "NumHeartbeats" ||
field == "GroupHash" ||
field == "ID" ||
unicode.IsLower(rune(field[0])) {
return false, nil
}
Expand Down
47 changes: 47 additions & 0 deletions repositories/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package repositories
import (
"database/sql"
"errors"
"github.com/duke-git/lancet/v2/slice"
"gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"strings"
)

const chunkSize = 4096

type BaseRepository struct {
db *gorm.DB
}
Expand Down Expand Up @@ -40,6 +46,47 @@ func (r *BaseRepository) GetTableDDLSqlite(tableName string) (result string, err
return result, err
}

func InsertBatchChunked[T any](data []T, model T, db *gorm.DB) error {
// insert in chunks, because otherwise mysql will complain about too many placeholders in prepared query
return db.Transaction(func(tx *gorm.DB) error {
chunks := slice.Chunk[T](data, chunkSize)
for _, chunk := range chunks {
if err := insertBatch[T](chunk, model, db); err != nil {
return err
}
}
return nil
})
}

func insertBatch[T any](data []T, model T, db *gorm.DB) error {
// sqlserver on conflict has bug https://github.com/go-gorm/sqlserver/issues/100
// As a workaround, insert one by one, and ignore duplicate key error
if db.Dialector.Name() == (sqlserver.Dialector{}).Name() {
for _, h := range data {
err := db.Create(h).Error
if err != nil {
if strings.Contains(err.Error(), "Cannot insert duplicate key row in object") {
// ignored
} else {
return err
}
}
}
return nil
}

if err := db.
Clauses(clause.OnConflict{
DoNothing: true,
}).
Model(model).
Create(&data).Error; err != nil {
return err
}
return nil
}

func streamRows[T any](rows *sql.Rows, channel chan *T, db *gorm.DB, onErr func(error)) {
defer close(channel)
defer rows.Close()
Expand Down
56 changes: 56 additions & 0 deletions repositories/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package repositories

import (
"time"

conf "github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"gorm.io/gorm"
)

type DurationRepository struct {
BaseRepository
config *conf.Config
}

func NewDurationRepository(db *gorm.DB) *DurationRepository {
return &DurationRepository{BaseRepository: NewBaseRepository(db), config: conf.Get()}
}

// TODO: refactor to streaming these instead of fetching as a big batch
func (r *DurationRepository) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Duration, error) {
// https://stackoverflow.com/a/20765152/3112139
var durations []*models.Duration
if err := r.db.
Where(&models.Duration{UserID: user.ID}).
Where("time >= ?", from.Local()).
Where("time < ?", to.Local()).
Order("time asc").
Find(&durations).Error; err != nil {
return nil, err
}
return durations, nil
}

func (r *DurationRepository) InsertBatch(durations []*models.Duration) error {
return InsertBatchChunked[*models.Duration](durations, &models.Duration{}, r.db)
}

func (r *DurationRepository) DeleteByUser(user *models.User) error {
if err := r.db.
Where("user_id = ?", user.ID).
Delete(models.Duration{}).Error; err != nil {
return err
}
return nil
}

func (r *DurationRepository) DeleteByUserBefore(user *models.User, t time.Time) error {
if err := r.db.
Where("user_id = ?", user.ID).
Where("time <= ?", t.Local()).
Delete(models.Duration{}).Error; err != nil {
return err
}
return nil
}
28 changes: 1 addition & 27 deletions repositories/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package repositories

import (
"database/sql"
"strings"
"time"

"github.com/duke-git/lancet/v2/slice"
conf "github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"github.com/muety/wakapi/utils"
"gorm.io/driver/sqlserver"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type HeartbeatRepository struct {
Expand All @@ -33,30 +30,7 @@ func (r *HeartbeatRepository) GetAll() ([]*models.Heartbeat, error) {
}

func (r *HeartbeatRepository) InsertBatch(heartbeats []*models.Heartbeat) error {
// sqlserver on conflict has bug https://github.com/go-gorm/sqlserver/issues/100
// As a workaround, insert one by one, and ignore duplicate key error
if r.db.Dialector.Name() == (sqlserver.Dialector{}).Name() {
for _, h := range heartbeats {
err := r.db.Create(h).Error
if err != nil {
if strings.Contains(err.Error(), "Cannot insert duplicate key row in object 'dbo.heartbeats' with unique index 'idx_heartbeats_hash'") {
// ignored
} else {
return err
}
}
}
return nil
}

if err := r.db.
Clauses(clause.OnConflict{
DoNothing: true,
}).
Create(&heartbeats).Error; err != nil {
return err
}
return nil
return InsertBatchChunked[*models.Heartbeat](heartbeats, &models.Heartbeat{}, r.db)
}

func (r *HeartbeatRepository) GetLatestByUser(user *models.User) (*models.Heartbeat, error) {
Expand Down
8 changes: 8 additions & 0 deletions repositories/repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ type IHeartbeatRepository interface {
GetUserProjectStats(*models.User, time.Time, time.Time, int, int) ([]*models.ProjectStats, error)
}

type IDurationRepository interface {
IBaseRepository
InsertBatch([]*models.Duration) error
GetAllWithin(time.Time, time.Time, *models.User) ([]*models.Duration, error)
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}

type IDiagnosticsRepository interface {
IBaseRepository
Insert(diagnostics *models.Diagnostics) (*models.Diagnostics, error)
Expand Down
Loading

0 comments on commit 924f561

Please sign in to comment.