Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gov-dmitry committed Oct 18, 2024
1 parent 07f5d46 commit d252561
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 194 deletions.
28 changes: 15 additions & 13 deletions internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"gorm.io/gorm"
"gorm.io/gorm/logger"

"github.com/goverland-labs/goverland-core-storage/internal/delegate"
"github.com/goverland-labs/goverland-core-storage/protocol/storagepb"

"github.com/goverland-labs/goverland-core-storage/internal/config"
"github.com/goverland-labs/goverland-core-storage/internal/dao"
"github.com/goverland-labs/goverland-core-storage/internal/delegates"
"github.com/goverland-labs/goverland-core-storage/internal/delegate"
"github.com/goverland-labs/goverland-core-storage/internal/ensresolver"
"github.com/goverland-labs/goverland-core-storage/internal/events"
"github.com/goverland-labs/goverland-core-storage/internal/proposal"
Expand Down Expand Up @@ -55,14 +54,12 @@ type Application struct {
voteRepo *vote.Repo
voteService *vote.Service

delegateRepo *delegate.Repo
delegateService *delegate.Service

ensRepo *ensresolver.Repo
ensService *ensresolver.Service

delegatesRepo *delegates.Repo
delegatesService *delegates.Service

eventsRepo *events.Repo

statsService *stats.Service
Expand Down Expand Up @@ -143,7 +140,7 @@ func (a *Application) initDB() error {
a.voteRepo = vote.NewRepo(a.db)
a.eventsRepo = events.NewRepo(a.db)
a.ensRepo = ensresolver.NewRepo(a.db)
a.delegatesRepo = delegates.NewRepo(a.db)
a.delegateRepo = delegate.NewRepo(a.db)

return err
}
Expand Down Expand Up @@ -279,10 +276,19 @@ func (a *Application) initProposal(nc *nats.Conn, pb *natsclient.Publisher) erro
}

func (a *Application) initDelegates(nc *nats.Conn, pb *natsclient.Publisher) error {
service := delegates.NewService(a.delegatesRepo, a.daoService, pb)
a.delegatesService = service
dsConn, err := grpc.NewClient(
a.cfg.InternalAPI.DatasourceSnapshotAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return fmt.Errorf("create connection with datasource snapshot server: %v", err)
}

cs, err := delegates.NewConsumer(nc, service)
delegateClient := delegatepb.NewDelegateClient(dsConn)
service := delegate.NewService(a.delegateRepo, delegateClient, a.daoService, a.ensService, pb)
a.delegateService = service

cs, err := delegate.NewConsumer(nc, service)
if err != nil {
return fmt.Errorf("delegates consumer: %w", err)
}
Expand Down Expand Up @@ -315,10 +321,6 @@ func (a *Application) initVote(nc *nats.Conn, pb *natsclient.Publisher) error {
}
a.manager.AddWorker(process.NewCallbackWorker("vote-consumer", cs.Start))

delegateClient := delegatepb.NewDelegateClient(dsConn)
delegateService := delegate.NewService(delegateClient, a.daoService, a.ensService)
a.delegateService = delegateService

return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package delegates
package delegate

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package delegates
package delegate

import (
"encoding/json"
Expand Down
2 changes: 1 addition & 1 deletion internal/delegates/repo.go → internal/delegate/repo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package delegates
package delegate

import (
"fmt"
Expand Down
156 changes: 155 additions & 1 deletion internal/delegate/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
protoany "github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/goverland-labs/goverland-datasource-snapshot/protocol/delegatepb"
events "github.com/goverland-labs/goverland-platform-events/events/core"
"github.com/rs/zerolog/log"
"gorm.io/gorm"

"github.com/goverland-labs/goverland-core-storage/internal/dao"
"github.com/goverland-labs/goverland-core-storage/internal/ensresolver"
Expand All @@ -20,6 +24,11 @@ var errNoResolved = errors.New("no addresses resolved")

type DaoProvider interface {
GetByID(id uuid.UUID) (*dao.Dao, error)
GetIDByOriginalID(string) (uuid.UUID, error)
}

type Publisher interface {
PublishJSON(ctx context.Context, subject string, obj any) error
}

type EnsResolver interface {
Expand All @@ -32,13 +41,17 @@ type Service struct {
delegateClient delegatepb.DelegateClient
daoProvider DaoProvider
ensResolver EnsResolver
publisher Publisher
repo *Repo
}

func NewService(dc delegatepb.DelegateClient, daoProvider DaoProvider, ensResolver EnsResolver) *Service {
func NewService(repo *Repo, dc delegatepb.DelegateClient, daoProvider DaoProvider, ensResolver EnsResolver, ep Publisher) *Service {
return &Service{
delegateClient: dc,
daoProvider: daoProvider,
ensResolver: ensResolver,
publisher: ep,
repo: repo,
}
}

Expand Down Expand Up @@ -240,3 +253,144 @@ func (s *Service) resolveAddressesName(addresses []string) (map[string]string, e

return res, nil
}

func (s *Service) handleDelegate(_ context.Context, hr History) error {
if err := s.repo.CallInTx(func(tx *gorm.DB) error {
if hr.OriginalSpaceID == "" {
log.Warn().Msgf("skip processing block %d from %s cause dao id is empty", hr.BlockNumber, hr.ChainID)

return nil
}

// store to history
if err := s.repo.CreateHistory(tx, hr); err != nil {
return fmt.Errorf("repo.CreateHistory: %w", err)
}

// get space id by provided original_space_id
daoID, err := s.daoProvider.GetIDByOriginalID(hr.OriginalSpaceID)
if err != nil {
return fmt.Errorf("dp.GetIDByOriginalID: %w", err)
}

bts, err := s.repo.GetSummaryBlockTimestamp(tx, strings.ToLower(hr.AddressFrom), daoID.String())
if err != nil {
return fmt.Errorf("s.repo.GetSummaryBlockTimestamp: %w", err)
}

// skip this block due to already processed
if bts != 0 && bts >= hr.BlockTimestamp {
log.Warn().Msgf("skip processing block %d from %s due to invalid timestamp", hr.BlockNumber, hr.ChainID)

return nil
}

if hr.Action == actionExpire {
if err := s.repo.UpdateSummaryExpiration(tx, strings.ToLower(hr.AddressFrom), daoID.String(), hr.Delegations.Expiration, hr.BlockTimestamp); err != nil {
return fmt.Errorf("UpdateSummaryExpiration: %w", err)
}

return nil
}

if err := s.repo.RemoveSummary(tx, strings.ToLower(hr.AddressFrom), daoID.String()); err != nil {
return fmt.Errorf("UpdateSummaryExpiration: %w", err)
}

if hr.Action == actionClear {
return nil
}

for _, info := range hr.Delegations.Details {
if err = s.repo.CreateSummary(Summary{
AddressFrom: strings.ToLower(hr.AddressFrom),
AddressTo: strings.ToLower(info.Address),
DaoID: daoID.String(),
Weight: info.Weight,
LastBlockTimestamp: hr.BlockTimestamp,
ExpiresAt: int64(hr.Delegations.Expiration),
}); err != nil {
return fmt.Errorf("createSummary [%s/%s/%s]: %w", hr.AddressFrom, info.Address, daoID.String(), err)
}
}

return nil
}); err != nil {
return fmt.Errorf("repo.CallInTx: %w", err)
}

return nil
}

func (s *Service) handleProposalCreated(ctx context.Context, pr Proposal) error {
// get space id by provided original_space_id
daoID, err := s.daoProvider.GetIDByOriginalID(pr.OriginalDaoID)
if err != nil {
return fmt.Errorf("dp.GetIDByOriginalID: %w", err)
}

// find delegator by author in specific space id
summary, err := s.repo.FindDelegator(daoID.String(), strings.ToLower(pr.Author))
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("repo.FindDelegator: %w", err)
}

// author doesn't have any delegation relations
if summary == nil {
return nil
}

if summary.SelfDelegation() {
return nil
}

// delegation is expired
if summary.Expired() {
return nil
}

// make an event
event := events.DelegatePayload{
Initiator: strings.ToLower(pr.Author),
Delegator: summary.AddressFrom,
DaoID: daoID,
ProposalID: pr.ID,
}
if err = s.publisher.PublishJSON(ctx, events.SubjectDelegateCreateProposal, event); err != nil {
return fmt.Errorf("s.publisher.PublishJSON: %w", err)
}

return nil
}

func (s *Service) handleVotesCreated(ctx context.Context, batch []Vote) error {
summary, err := s.repo.FindDelegatorsByVotes(batch)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("repo.FindDelegatorsByVotes: %w", err)
}

for _, info := range summary {
if info.SelfDelegation() {
continue
}

// delegation is expired
if info.Expired() {
continue
}

// make an event
event := events.DelegatePayload{
Initiator: strings.ToLower(info.AddressTo),
Delegator: info.AddressFrom,
DaoID: uuid.MustParse(info.DaoID),
ProposalID: info.ProposalID,
}

if err = s.publisher.PublishJSON(ctx, events.SubjectDelegateVotingVoted, event); err != nil {
log.Err(err).Msgf("publish delegate voted: %s %s", info.AddressTo, info.ProposalID)
}
}

return nil
}
Loading

0 comments on commit d252561

Please sign in to comment.