Skip to content

Commit

Permalink
Merge pull request #54 from goverland-labs/feature/top-dao-cache
Browse files Browse the repository at this point in the history
Feature/top dao cache
  • Loading branch information
WertND authored Apr 11, 2024
2 parents 53d0ab1 + 026eacf commit ccdc262
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 74 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Added top dao cache worker

## [0.1.9] - 2024-04-10

### Added
Expand Down
5 changes: 4 additions & 1 deletion internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ func (a *Application) initEnsResolver(pb *natsclient.Publisher) error {
func (a *Application) initDao(nc *nats.Conn, pb *natsclient.Publisher) error {
a.daoIDService = dao.NewDaoIDService(a.daoIDRepo)

service, err := dao.NewService(a.daoRepo, a.daoUniqueRepo, a.daoIDService, pb, a.proposalRepo)
topDAOCache := dao.NewTopDAOCache(a.daoRepo)

service, err := dao.NewService(a.daoRepo, a.daoUniqueRepo, a.daoIDService, pb, a.proposalRepo, topDAOCache)
if err != nil {
return fmt.Errorf("dao service: %w", err)
}
Expand All @@ -229,6 +231,7 @@ func (a *Application) initDao(nc *nats.Conn, pb *natsclient.Publisher) error {
a.manager.AddWorker(process.NewCallbackWorker("dao-new-voters-worker", mc.ProcessNew))
a.manager.AddWorker(process.NewCallbackWorker("dao-popular-category-process-worker", pcw.Process))
a.manager.AddWorker(process.NewCallbackWorker("dao-active-votes-worker", avw.Process))
a.manager.AddWorker(process.NewCallbackWorker("top-dao-cache-worker", topDAOCache.Start))

return nil
}
Expand Down
85 changes: 12 additions & 73 deletions internal/dao/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,25 @@ type Service struct {
daoIds map[string]uuid.UUID
daoMu sync.RWMutex

topMu sync.RWMutex
topCache map[int]map[string]topList

repo DataProvider
uniqueRepo UniqueVoterProvider
events Publisher
idProvider DaoIDProvider
proposals ProposalProvider

topDAOCache *TopDAOCache
}

func NewService(r DataProvider, ur UniqueVoterProvider, ip DaoIDProvider, p Publisher, pp ProposalProvider) (*Service, error) {
func NewService(r DataProvider, ur UniqueVoterProvider, ip DaoIDProvider, p Publisher, pp ProposalProvider, topDAOCache *TopDAOCache) (*Service, error) {
return &Service{
repo: r,
uniqueRepo: ur,
events: p,
idProvider: ip,
proposals: pp,
daoIds: make(map[string]uuid.UUID),
daoMu: sync.RWMutex{},
topMu: sync.RWMutex{},
topCache: map[int]map[string]topList{},
repo: r,
uniqueRepo: ur,
events: p,
idProvider: ip,
proposals: pp,
daoIds: make(map[string]uuid.UUID),
daoMu: sync.RWMutex{},
topDAOCache: topDAOCache,
}, nil
}

Expand Down Expand Up @@ -251,66 +249,7 @@ type topList struct {
}

func (s *Service) GetTopByCategories(_ context.Context, limit int) (map[string]topList, error) {
s.topMu.RLock()
cached, ok := s.topCache[limit]
s.topMu.RUnlock()
if ok && len(cached) != 0 {
return makeCopy(cached), nil
}

categories, err := s.repo.GetCategories()
if err != nil {
return nil, fmt.Errorf("get categories: %w", err)
}

list := make(map[string]topList)
for _, category := range categories {
filters := []Filter{
CategoryFilter{Category: category},
PageFilter{Limit: limit, Offset: 0},
OrderByPopularityIndexFilter{},
}

data, err := s.repo.GetByFilters(filters, true)
if err != nil {
return nil, fmt.Errorf("get by category %s: %w", category, err)
}

list[category] = topList{
List: data.Daos,
Total: data.TotalCount,
}
}

s.topMu.Lock()
s.topCache[limit] = list
s.topMu.Unlock()

go func() {
<-time.After(topCachingTTL)

s.topMu.Lock()
delete(s.topCache, limit)
s.topMu.Unlock()
}()

return list, nil
}

func makeCopy(src map[string]topList) map[string]topList {
copied := map[string]topList{}
for k, v := range src {
copied[k] = topList{
List: make([]Dao, len(v.List)),
Total: v.Total,
}

for i := range v.List { // nolint:gosimple
copied[k].List[i] = v.List[i]
}
}

return copied
return s.topDAOCache.GetTopList(uint(limit)), nil
}

func (s *Service) HandleActivitySince(_ context.Context, id uuid.UUID) (*Dao, error) {
Expand Down
103 changes: 103 additions & 0 deletions internal/dao/top_dao_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dao

import (
"context"
"fmt"
"sync"
"time"

"github.com/rs/zerolog/log"
)

const (
topDAOCacheReloadDelay = 5 * time.Minute
topDaoCategoryLimit = 20
)

type TopDAOCache struct {
repo DataProvider

cacheLock sync.RWMutex
cache map[string]topList
}

func NewTopDAOCache(repo DataProvider) *TopDAOCache {
return &TopDAOCache{
repo: repo,
cache: make(map[string]topList),
}
}

func (w *TopDAOCache) Start(ctx context.Context) error {
for {
err := w.reload()
if err != nil {
log.Error().Err(err).Msg("failed to reload top DAO cache")
}

select {
case <-ctx.Done():
return nil
case <-time.After(topDAOCacheReloadDelay):
}
}
}

func (w *TopDAOCache) GetTopList(limit uint) map[string]topList {
w.cacheLock.RLock()
defer w.cacheLock.RUnlock()

return makeCopy(w.cache, limit)
}

func (w *TopDAOCache) reload() error {
categories, err := w.repo.GetCategories()
if err != nil {
return fmt.Errorf("get categories: %w", err)
}

list := make(map[string]topList)
for _, category := range categories {
filters := []Filter{
CategoryFilter{Category: category},
PageFilter{Limit: topDaoCategoryLimit, Offset: 0},
OrderByPopularityIndexFilter{},
}

data, err := w.repo.GetByFilters(filters, true)
if err != nil {
return fmt.Errorf("get by category %s: %w", category, err)
}

list[category] = topList{
List: data.Daos,
Total: data.TotalCount,
}
}

w.cacheLock.Lock()
defer w.cacheLock.Unlock()

w.cache = list

log.Info().Int("category_count", len(w.cache)).Msg("Top DAO cache reloaded")

return nil
}

func makeCopy(src map[string]topList, limit uint) map[string]topList {
copied := map[string]topList{}
for k, v := range src {
newLen := min(limit, uint(len(v.List)))
copied[k] = topList{
List: make([]Dao, newLen),
Total: v.Total,
}

for i := range limit { // nolint:gosimple

Check failure on line 97 in internal/dao/top_dao_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot range over limit (variable of type uint) (typecheck)

Check failure on line 97 in internal/dao/top_dao_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot range over limit (variable of type uint) (typecheck)

Check failure on line 97 in internal/dao/top_dao_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot range over limit (variable of type uint) (typecheck)
copied[k].List[i] = v.List[i]
}
}

return copied
}

0 comments on commit ccdc262

Please sign in to comment.