Skip to content

Commit

Permalink
Merge pull request #6 from vindosVP/feat/processing
Browse files Browse the repository at this point in the history
feat: processor
  • Loading branch information
vindosVP authored Mar 28, 2024
2 parents de4c837 + 86013e9 commit 334f00a
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 11 deletions.
21 changes: 14 additions & 7 deletions cmd/gophermart/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import (
"flag"
"github.com/caarlos0/env/v10"
"log"
"time"
)

type Config struct {
RunAddr string `env:"RUN_ADDRESS"`
LogLevel string `env:"LOG_LEVEL"`
AccrualSysAddr string `env:"ACCRUAL_SYSTEM_ADDRESS"`
DBURI string `env:"DATABASE_URI"`
JWTSecret string `env:"JWT_SECRET"`
RunAddr string `env:"RUN_ADDRESS"`
LogLevel string `env:"LOG_LEVEL"`
AccrualSysAddr string `env:"ACCRUAL_SYSTEM_ADDRESS"`
DBURI string `env:"DATABASE_URI"`
JWTSecret string `env:"JWT_SECRET"`
RequestInterval time.Duration `env:"REQUEST_INTERVAL"`
}

func New() *Config {

flagCfg := &Config{}
flag.StringVar(&flagCfg.RunAddr, "a", ":8080", "run address")
var reqInterval int
flag.StringVar(&flagCfg.RunAddr, "a", ":8081", "run address")
flag.StringVar(&flagCfg.LogLevel, "l", "debug", "log level")
flag.StringVar(&flagCfg.AccrualSysAddr, "r", ":8081", "accrual system address")
flag.StringVar(&flagCfg.AccrualSysAddr, "r", "http://localhost:8080", "accrual system address")
flag.StringVar(&flagCfg.DBURI, "d", "postgres://postgres:postgres@localhost:5432/loyalty-system?sslmode=disable", "database uri")
flag.StringVar(&flagCfg.JWTSecret, "s", "super-secret", "jwt secret")
flag.IntVar(&reqInterval, "w", 5, "accrual request interval")
flag.Parse()

envCfg := &Config{}
Expand Down Expand Up @@ -50,6 +54,9 @@ func New() *Config {
if cfg.JWTSecret == "" {
cfg.JWTSecret = flagCfg.JWTSecret
}
if cfg.RequestInterval == 0 {
cfg.RequestInterval = time.Duration(reqInterval)
}

return cfg
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ require (
github.com/jackc/pgx/v5 v5.5.3
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.19.0
golang.org/x/crypto v0.21.0
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-resty/resty/v2 v2.12.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
Expand All @@ -27,9 +28,9 @@ require (
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
45 changes: 45 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.19.0 h1:ol+5Fu+cSq9JD7SoSqe04GMI92cbn0+wvQ3bZ8b/AU4=
github.com/go-playground/validator/v10 v10.19.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-resty/resty/v2 v2.12.0 h1:rsVL8P90LFvkUYq/V5BTVe203WfRIU4gvcf+yfzJzGA=
github.com/go-resty/resty/v2 v2.12.0/go.mod h1:o0yGPrkS3lOe1+eFajk6kBW8ScXzwU3hD69/gt2yB/0=
github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw=
github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand Down Expand Up @@ -45,22 +47,65 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type WithdrawRequest struct {
}

type WithdrawalOrder struct {
OrderID string `json:"number"`
OrderID string `json:"order"`
Sum float64 `json:"sum"`
ProcessedAt string `json:"processed_at"`
}
Expand Down
161 changes: 161 additions & 0 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package processor

import (
"context"
"errors"
"fmt"
"github.com/go-resty/resty/v2"
"github.com/vindosVP/loyalty-system/internal/models"
"github.com/vindosVP/loyalty-system/pkg/logger"
"go.uber.org/zap"
"strconv"
"sync"
"time"
)

var ErrTooManyRequests = errors.New("too many requests")

type Storage interface {
GetUnprocessedOrders(ctx context.Context) ([]int, error)
UpdateOrder(ctx context.Context, id int, status string, sum float64) (*models.Order, error)
UpdateOrderStatus(ctx context.Context, id int, status string) (*models.Order, error)
}

type Processor struct {
RequestInterval time.Duration
ServerAddress string
Done <-chan struct{}
Storage Storage
Client *resty.Client
}

type job struct {
id int
serverAddress string
client *resty.Client
order int
storage Storage
}

type result struct {
id int
order int
err error
}

type accrualResponse struct {
Order string `json:"order"`
Status string `json:"status"`
Accrual float64 `json:"accrual,omitempty"`
}

func New(RequestInterval time.Duration, ServerAddress string, Storage Storage) *Processor {
return &Processor{
RequestInterval: RequestInterval,
ServerAddress: ServerAddress,
Storage: Storage,
Client: resty.New(),
}
}

func (p *Processor) Run() {
tick := time.NewTicker(p.RequestInterval * time.Second)
defer tick.Stop()

for {
select {
case <-p.Done:
return
case <-tick.C:
p.requestAccruals()
}
}
}

func (p *Processor) requestAccruals() {
ctx := context.Background()
orders, err := p.Storage.GetUnprocessedOrders(ctx)
if err != nil {
logger.Log.Error("Failed to get unprocessed orders", zap.Error(err))
return
}
jobs := p.generateJobs(orders)
results := make(chan result)
go listenResults(results)
p.startWorkers(jobs, results, 10)
}

func listenResults(results <-chan result) {
for res := range results {
if res.err != nil {
logger.Log.Error("worker failed", zap.Error(res.err), zap.Int("id", res.id))
} else {
logger.Log.Info("worker finished", zap.Int("id", res.id))
}
}
}

func (p *Processor) startWorkers(jobs <-chan job, results chan<- result, workers int) {
wg := sync.WaitGroup{}
logger.Log.Info(fmt.Sprintf("Starting %d workers", workers))
for i := 1; i <= workers; i++ {
wg.Add(1)
go worker(jobs, results, &wg)
}
wg.Wait()
close(results)
}

func worker(jobs <-chan job, results chan<- result, wg *sync.WaitGroup) {
for j := range jobs {
err := processOrder(j.client, j.serverAddress, j.order, j.storage)
results <- result{j.id, j.order, err}
}
wg.Done()
}

func (p *Processor) generateJobs(orders []int) chan job {
jobs := make(chan job)
go func() {
id := 0
for _, order := range orders {
jobs <- job{
id: id,
serverAddress: p.ServerAddress,
client: p.Client,
order: order,
storage: p.Storage,
}
id++
}
defer close(jobs)
}()
return jobs
}

func processOrder(client *resty.Client, serverAddress string, order int, storage Storage) error {
var response accrualResponse
url := fmt.Sprintf("%s/api/orders/%s", serverAddress, strconv.Itoa(order))
resp, err := client.R().SetResult(&response).Get(url)
if err != nil {
logger.Log.Error("Failed to get accruals by order", zap.Int("orderId", order), zap.Error(err))
}
if resp.StatusCode() != 200 {
if resp.StatusCode() == 429 {
logger.Log.Error("Failed to get accruals by order", zap.Int("orderId", order), zap.Error(ErrTooManyRequests))
}
logger.Log.Error("Failed to get accruals by order", zap.Int("orderId", order), zap.Int("statusCode", resp.StatusCode()))
return err
}
id, _ := strconv.Atoi(response.Order)
if response.Status == models.OrderStatusProcessed {
_, err = storage.UpdateOrder(context.Background(), id, response.Status, response.Accrual)
} else {
_, err = storage.UpdateOrderStatus(context.Background(), id, response.Status)
}

if err != nil {
return fmt.Errorf("storage.UpdateOrder: %w", err)
}
return nil
}
36 changes: 36 additions & 0 deletions internal/repos/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,39 @@ func (or *OrdersRepo) GetUsersWithdrawals(ctx context.Context, userID int) ([]*m
}
return orders, nil
}

func (or *OrdersRepo) GetUnprocessedOrders(ctx context.Context) ([]int, error) {
query := "select id from orders where status = $1 or status = $2"
rows, err := or.pool.Query(ctx, query, models.OrderStatusNew, models.OrderStatusProcessing)
if err != nil {
return nil, fmt.Errorf("or.pool.Query: %w", err)
}
var ids []int
for rows.Next() {
var id int
err := rows.Scan(&id)
if err != nil {
return nil, fmt.Errorf("rows.Scan: %w", err)
}
ids = append(ids, id)
}
return ids, nil
}

func (or *OrdersRepo) UpdateOrder(ctx context.Context, id int, status string, sum float64) (*models.Order, error) {
query := "update orders set status = $1, sum = $2 where id = $3"
_, err := or.pool.Exec(ctx, query, status, sum, id)
if err != nil {
return nil, fmt.Errorf("or.pool.Exec: %w", err)
}
return or.GetByID(ctx, id)
}

func (or *OrdersRepo) UpdateOrderStatus(ctx context.Context, id int, status string) (*models.Order, error) {
query := "update orders set status = $1 where id = $3"
_, err := or.pool.Exec(ctx, query, status, id)
if err != nil {
return nil, fmt.Errorf("or.pool.Exec: %w", err)
}
return or.GetByID(ctx, id)
}
3 changes: 3 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/vindosVP/loyalty-system/internal/database"
"github.com/vindosVP/loyalty-system/internal/handlers"
"github.com/vindosVP/loyalty-system/internal/middleware"
"github.com/vindosVP/loyalty-system/internal/processor"
"github.com/vindosVP/loyalty-system/internal/repos"
"github.com/vindosVP/loyalty-system/internal/storage"
"github.com/vindosVP/loyalty-system/pkg/logger"
Expand Down Expand Up @@ -45,6 +46,8 @@ func Run(cfg *config.Config) error {
r.Get("/api/user/withdrawals", handlers.GetUsersWithdrawals(s))
})

p := processor.New(cfg.RequestInterval, cfg.AccrualSysAddr, s)
go p.Run()
logger.Log.Info("Server started", zap.String("Address", cfg.RunAddr))
err = http.ListenAndServe(cfg.RunAddr, r)
if err != nil {
Expand Down
Loading

0 comments on commit 334f00a

Please sign in to comment.