Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadtest: use slog instead of log #38

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions loadtest/cmd/attacker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"flag"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/kitagry/slogdriver"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -75,12 +76,15 @@ func main() {
flag.DurationVar(&connectTimeout, "connect_timeout", 5*time.Second, "Connecting timeout")
flag.Parse()

log.Printf("minimatch load-testing (rps: %.2f, frontend: %s, match_timeout: %s, redis: %s, connect_timeout: %s)",
rps, frontendAddr, matchTimeout, redisAddr, connectTimeout)
logger := initLogger()

logger.Info(fmt.Sprintf("minimatch load-testing (rps: %.2f, frontend: %s, match_timeout: %s, redis: %s, connect_timeout: %s)",
rps, frontendAddr, matchTimeout, redisAddr, connectTimeout))

redis, err := newRedisClient(redisAddr)
if err != nil {
log.Fatalf("failed to create redis client: %+v", err)
logger.Error(fmt.Sprintf("failed to create redis client: %+v", err), "error", err)
os.Exit(1)
}

ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
Expand All @@ -90,13 +94,13 @@ func main() {
http.Handle("/metrics", promhttp.Handler())
go func() {
addr := ":2112"
log.Printf("prometheus endpoint (/metrics) is listening on %s...", addr)
logger.Info(fmt.Sprintf("prometheus endpoint (/metrics) is listening on %s...", addr))
server := &http.Server{
Addr: addr,
ReadHeaderTimeout: 10 * time.Second, // https://app.deepsource.com/directory/analyzers/go/issues/GO-S2114
}
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to serve prometheus endpoint: %+v", err)
logger.Error(fmt.Sprintf("failed to serve prometheus endpoint: %+v", err), "error", err)
}
}()

Expand All @@ -106,18 +110,18 @@ func main() {
for {
select {
case <-ctx.Done():
log.Printf("shutting down attacker...")
logger.Info("shutting down attacker...")
return
case <-ticker.C:
go createAndWatchTicket(context.Background(), frontendAddr, redis, matchTimeout, connectTimeout)
go createAndWatchTicket(context.Background(), frontendAddr, redis, matchTimeout, connectTimeout, logger)
}
}
}

func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis rueidis.Client, matchTimeout, connectTimeout time.Duration) {
func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis rueidis.Client, matchTimeout, connectTimeout time.Duration, logger *slog.Logger) {
frontendClient, closer, err := newOMFrontendClient(omFrontendAddr)
if err != nil {
log.Printf("failed to create frontend client: %+v", err)
logger.Error(fmt.Sprintf("failed to create frontend client: %+v", err), "error", err)
return
}
defer closer.Close()
Expand All @@ -126,15 +130,21 @@ func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis ruei
SearchFields: &pb.SearchFields{},
}})
if err != nil {
log.Printf("failed to create ticket: %+v", err)
logger.Error(fmt.Sprintf("failed to create ticket: %+v", err), "error", err)
return
}
as, err := watchTickets(ctx, frontendClient, ticket, matchTimeout)
if err != nil && !errors.Is(err, errWatchTicketTimeout) {
log.Printf("failed to watch tickets: %+v", err)
if err != nil {
if !errors.Is(err, errWatchTicketTimeout) {
logger.Error(fmt.Sprintf("failed to watch tickets: %+v", err), "error", err)
}
return
}
if err := waitConnection(ctx, redis, as, ticket, connectTimeout); err != nil && !errors.Is(err, errWatchAssignmentTimeout) {
log.Printf("failed to wait connection: %+v", err)
if err := waitConnection(ctx, redis, as, ticket, connectTimeout); err != nil {
if !errors.Is(err, errWatchAssignmentTimeout) {
logger.Error(fmt.Sprintf("failed to wait connection: %+v", err))
}
return
}
}

Expand All @@ -145,8 +155,7 @@ func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, tick

stream, err := omFrontend.WatchAssignments(ctx, &pb.WatchAssignmentsRequest{TicketId: ticket.Id})
if err != nil {
log.Printf("failed to open watch assignments stream: %+v", err)
return nil, err
return nil, fmt.Errorf("failed to watch assignments: %w", err)
}

respCh := make(chan *pb.Assignment)
Expand All @@ -159,6 +168,9 @@ func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, tick
default:
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if errors.Is(err, context.Canceled) && ctx.Err() != nil {
return
}
Expand Down Expand Up @@ -264,3 +276,14 @@ func newRedisClient(addr string) (rueidis.Client, error) {
func redisKeyAssignment(as *pb.Assignment) string {
return fmt.Sprintf("attacker:as:%s", as.Connection)
}

func initLogger() *slog.Logger {
_, onK8s := os.LookupEnv("KUBERNETES_SERVICE_HOST")
_, onCloudRun := os.LookupEnv("K_CONFIGURATION")
if onK8s || onCloudRun {
return slogdriver.New(os.Stdout, slogdriver.HandlerOptions{
Level: slogdriver.LevelDefault,
})
}
return slog.Default()
}
63 changes: 43 additions & 20 deletions loadtest/cmd/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -14,6 +14,7 @@ import (
cache "github.com/Code-Hex/go-generics-cache"
"github.com/bojand/hri"
"github.com/kelseyhightower/envconfig"
"github.com/kitagry/slogdriver"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
Expand Down Expand Up @@ -54,46 +55,52 @@ var matchProfile = &pb.MatchProfile{
func main() {
var conf config
envconfig.MustProcess("", &conf)
logger := initLogger()

meterProvider, err := newMeterProvider()
if err != nil {
log.Fatalf("failed to create meter provider: %+v", err)
logger.Error(fmt.Sprintf("failed to create meter provider: %+v", err), "error", err)
os.Exit(1)
}
otel.SetMeterProvider(meterProvider)
startPrometheus()
startPrometheus(logger)

redisStore, err := newRedisStateStore(&conf)
if err != nil {
log.Fatalf("failed to create redis store: %+v", err)
logger.Error(fmt.Sprintf("failed to create redis store: %+v", err), "error", err)
os.Exit(1)
}
ticketCache := cache.New[string, *pb.Ticket]()
store := statestore.NewBackendStoreWithTicketCache(redisStore, ticketCache,
statestore.WithTicketCacheTTL(conf.TicketCacheTTL))
assigner, err := newAssigner(&conf, meterProvider)
backend, err := minimatch.NewBackend(store, assigner, minimatch.WithTicketValidationBeforeAssign(conf.TicketValidationEnabled))
assigner, err := newAssigner(&conf, meterProvider, logger)
backend, err := minimatch.NewBackend(store, assigner,
minimatch.WithTicketValidationBeforeAssign(conf.TicketValidationEnabled),
minimatch.WithBackendLogger(logger))
if err != nil {
log.Fatalf("failed to create backend: %+v", err)
logger.Error(fmt.Sprintf("failed to create backend: %+v", err), "error", err)
os.Exit(1)
}
backend.AddMatchFunction(matchProfile, minimatch.MatchFunctionSimple1vs1)

ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer shutdown()
if err := backend.Start(ctx, conf.TickRate); err != nil {
if !errors.Is(err, context.Canceled) {
log.Fatalf("failed to start backend: %+v", err)
logger.Error(fmt.Sprintf("failed to start backend: %+v", err), "error", err)
}
}
}

func newAssigner(conf *config, provider metric.MeterProvider) (minimatch.Assigner, error) {
var assigner minimatch.Assigner = minimatch.AssignerFunc(dummyAssign)
func newAssigner(conf *config, provider metric.MeterProvider, logger *slog.Logger) (minimatch.Assigner, error) {
var assigner minimatch.Assigner = &dummyAssigner{logger: logger}
if conf.OverlappingCheckRedisAddr != "" {
log.Printf("with overlapping match checker (redis: %s)", conf.OverlappingCheckRedisAddr)
logger.Info(fmt.Sprintf("with overlapping match checker (redis: %s)", conf.OverlappingCheckRedisAddr))
rc, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{conf.OverlappingCheckRedisAddr}, DisableCache: true})
if err != nil {
return nil, fmt.Errorf("failed to create redis client: %w", err)
}
as, err := newAssignerWithOverlappingChecker("loadtest:", rc, assigner, provider)
as, err := newAssignerWithOverlappingChecker("loadtest:", rc, assigner, provider, logger)
if err != nil {
return nil, fmt.Errorf("failed to create assigner with overlapping checker: %w", err)
}
Expand Down Expand Up @@ -145,12 +152,16 @@ func newRedisStateStore(conf *config) (statestore.BackendStore, error) {
// Assigner assigns a GameServer to a match.
// For example, it could call Agones' Allocate service.
// For the sake of simplicity, a dummy GameServer name is assigned here.
func dummyAssign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
type dummyAssigner struct {
logger *slog.Logger
}

func (a *dummyAssigner) Assign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
var asgs []*pb.AssignmentGroup
for _, match := range matches {
tids := ticketIDs(match)
conn := hri.Random()
log.Printf("assign '%s' to tickets: %v", conn, tids)
a.logger.Debug(fmt.Sprintf("assign '%s' to tickets: %v", conn, tids))
asgs = append(asgs, &pb.AssignmentGroup{
TicketIds: tids,
Assignment: &pb.Assignment{Connection: conn},
Expand Down Expand Up @@ -178,17 +189,17 @@ func newMeterProvider() (metric.MeterProvider, error) {
return provider, nil
}

func startPrometheus() {
func startPrometheus(logger *slog.Logger) {
http.Handle("/metrics", promhttp.Handler())
go func() {
addr := ":2112"
log.Printf("prometheus endpoint (/metrics) is listening on %s...", addr)
logger.Info(fmt.Sprintf("prometheus endpoint (/metrics) is listening on %s...", addr))
server := &http.Server{
Addr: addr,
ReadHeaderTimeout: 10 * time.Second, // https://app.deepsource.com/directory/analyzers/go/issues/GO-S2114
}
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to serve prometheus endpoint: %+v", err)
logger.Error(fmt.Sprintf("failed to serve prometheus endpoint: %+v", err), "error", err)
}
}()
}
Expand All @@ -198,13 +209,13 @@ type assignerWithOverlappingChecker struct {
redisKeyPrefix string
redisClient rueidis.Client
assigner minimatch.Assigner
logger *slog.Logger

validAssigns metric.Int64Counter
overlappingWithin metric.Int64Counter
overlappingInter metric.Int64Counter
}

func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidis.Client, assigner minimatch.Assigner, provider metric.MeterProvider) (*assignerWithOverlappingChecker, error) {
func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidis.Client, assigner minimatch.Assigner, provider metric.MeterProvider, logger *slog.Logger) (*assignerWithOverlappingChecker, error) {
meter := provider.Meter("github.com/castaneai/minimatch/loadtest")
overlappingWithIn, err := meter.Int64Counter("minimatch.loadtest.overlapping_tickets_within_backend",
metric.WithDescription("Number of times the same Ticket is assigned to multiple Matches within a single backend"))
Expand All @@ -220,6 +231,7 @@ func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidi
redisKeyPrefix: redisKeyPrefix,
redisClient: redisClient,
assigner: assigner,
logger: logger,
overlappingWithin: overlappingWithIn,
overlappingInter: overlappingInter,
}, nil
Expand All @@ -245,8 +257,19 @@ func (a *assignerWithOverlappingChecker) Assign(ctx context.Context, matches []*
a.overlappingInter.Add(ctx, 1)
continue
}
log.Printf("failed to check overlapping with redis: %+v", err)
a.logger.Error(fmt.Sprintf("failed to check overlapping with redis: %+v", err), "error", err)
}
}
return a.assigner.Assign(ctx, matches)
}

func initLogger() *slog.Logger {
_, onK8s := os.LookupEnv("KUBERNETES_SERVICE_HOST")
_, onCloudRun := os.LookupEnv("K_CONFIGURATION")
if onK8s || onCloudRun {
return slogdriver.New(os.Stdout, slogdriver.HandlerOptions{
Level: slogdriver.LevelDefault,
})
}
return slog.Default()
}
Loading
Loading