Skip to content

Commit

Permalink
Created seprate queue for game handing and db requests
Browse files Browse the repository at this point in the history
  • Loading branch information
atul24112001 committed Nov 8, 2024
1 parent aaf84c0 commit 140ebbb
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 131 deletions.
152 changes: 44 additions & 108 deletions server/game-manager/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron"
// "github.com/robfig/cron/v3"
)

type NewGame struct {
Expand All @@ -32,7 +34,8 @@ type GameManager struct {
Users map[string]User
NewGame map[string]NewGame
StartedGames map[string]Game
Queue Queue
DbQueue Queue
GameQueue Queue
}

var instance *GameManager
Expand All @@ -42,9 +45,16 @@ func GetInstance() *GameManager {
once.Do(func() {
redisAddr := os.Getenv("REDIS_URL")
opt, _ := redis.ParseURL(redisAddr)
client := redis.NewClient(opt)
queue := Queue{
client: client,
dbClient := redis.NewClient(opt)
gameClient := redis.NewClient(opt)
dbQueue := Queue{
client: dbClient,
queueName: "mari-arena-db-queue",
processingKey: "mari-arena-db-queue:processing",
timeout: 10 * time.Second,
}
gameQueue := Queue{
client: gameClient,
queueName: "mari-arena-queue",
processingKey: "mari-arena-queue:processing",
timeout: 10 * time.Second,
Expand All @@ -53,23 +63,20 @@ func GetInstance() *GameManager {
Users: make(map[string]User),
NewGame: map[string]NewGame{},
StartedGames: map[string]Game{},
Queue: queue,
DbQueue: dbQueue,
GameQueue: gameQueue,
}
ctx := context.Background()

c := cron.New()
c.AddFunc("@hourly", func() {
dbQueue.RetryFailedTasks(ctx)
gameQueue.RetryFailedTasks(ctx)
})

go dbQueue.ProcessQueue(ctx)
go gameQueue.ProcessQueue(ctx)

log.Println("Created redis client")
if err := client.Ping(context.Background()).Err(); err != nil {
for {
_, err := client.Ping(context.Background()).Result()
if err == nil {
break
}
log.Println("Retrying Redis connection...")
time.Sleep(2 * time.Second)
}
}
go func() {
queue.ProcessQueue(context.Background())
}()
})
return instance
}
Expand Down Expand Up @@ -122,7 +129,7 @@ func (gameManger *GameManager) CreateGame(maxUserCount int, winnerPrice int, ent
},
}

err = gameManger.Queue.Enqueue(context.Background(), item)
err = gameManger.DbQueue.Enqueue(context.Background(), item)

if err != nil {
log.Println(err.Error())
Expand All @@ -138,7 +145,6 @@ func (gameManger *GameManager) JoinGame(userId string, gameTypeId string) {
return
}
newGameMap, newGameMapExist := gameManger.NewGame[gameTypeId]
log.Println(newGameMap.Game.Users)
if !newGameMapExist {
cacheGameTypeMap, cacheGameTypeMapExist := gameTypeMap[gameTypeId]

Expand Down Expand Up @@ -203,7 +209,7 @@ func (gameManger *GameManager) JoinGame(userId string, gameTypeId string) {
}

if !newGame.Users[userId] {
err = gameManger.Queue.Enqueue(context.Background(), map[string]interface{}{
err = gameManger.DbQueue.Enqueue(context.Background(), map[string]interface{}{
"type": "add-participant",
"data": map[string]interface{}{
"userId": userId,
Expand Down Expand Up @@ -263,7 +269,7 @@ func (gameManger *GameManager) JoinGame(userId string, gameTypeId string) {
}
}

err = gameManger.Queue.Enqueue(context.Background(), map[string]interface{}{
err = gameManger.DbQueue.Enqueue(context.Background(), map[string]interface{}{
"type": "start-game",
"data": map[string]interface{}{
"gameId": newGame.Id,
Expand All @@ -281,7 +287,7 @@ func (gameManger *GameManager) JoinGame(userId string, gameTypeId string) {
}
}

err := gameManger.Queue.Enqueue(context.Background(), map[string]interface{}{
err := gameManger.DbQueue.Enqueue(context.Background(), map[string]interface{}{
"type": "collect-entry",
"data": map[string]interface{}{
"ids": ids,
Expand Down Expand Up @@ -390,18 +396,27 @@ func (gameManager *GameManager) GameOver(gameId string, userId string) {

if alivePlayers == 0 {
if winnerId != "" {
log.Println("346 gameId", targetGame)
_, err := lib.Pool.Exec(context.Background(), `UPDATE public.games SET status = $2, "winnerId" = $3 WHERE id = $1`, gameId, "completed", winnerId)
err := gameManager.DbQueue.Enqueue(context.Background(), map[string]interface{}{
"type": "end-game",
"data": map[string]string{
"gameId": gameId,
"winnerId": winnerId,
},
})

if err != nil {
log.Println(err.Error())
newLine := fmt.Sprintf("ERROR_UPDATING_GAME-gameId_%s-status_%s-userId_%s-amount-%d\n", gameId, "completed", userId, targetGame.WinnerPrice)
lib.ErrorLogger(newLine, "errors.txt")
return
}

log.Println("353")
_, err = lib.Pool.Exec(context.Background(), `UPDATE public.users SET "solanaBalance" = "solanaBalance" + $2 WHERE id = $1`, winnerId, targetGame.WinnerPrice)
err = gameManager.DbQueue.Enqueue(context.Background(), map[string]interface{}{
"type": "update-balance",
"data": map[string]interface{}{
"winnerId": winnerId,
"amount": targetGame.WinnerPrice,
},
})
if err != nil {
log.Println(err.Error())
newLine := fmt.Sprintf("ERROR_UPDATING_USER_BALANCE-userId_%s-amount_%d\n", winnerId, targetGame.WinnerPrice)
Expand All @@ -410,7 +425,6 @@ func (gameManager *GameManager) GameOver(gameId string, userId string) {
}
}

log.Println("367", alivePlayers, winnerId)
for k, _ := range targetGame.Users {
log.Println("370", k)
participant, exist := gameManager.GetUser(k)
Expand All @@ -435,81 +449,3 @@ func (gameManager *GameManager) GameOver(gameId string, userId string) {
gameManager.DeleteGame(gameId)
}
}

func (gameManager *GameManager) GameOver2(gameId string, userId string) {
targetGame := gameManager.GetGame(gameId)
targetGame.GameOver(userId)

var alivePlayers = 0
var winnerId = ""

// Count alive players and track the player with the highest score among those alive
var highestPoints = 0
for k, user := range targetGame.ScoreBoard {
if user.IsAlive {
alivePlayers += 1
// Only update winnerId if the player is alive and has the highest points
if user.Points > highestPoints {
highestPoints = user.Points
winnerId = k
}
}
}

log.Println("Alive players:", alivePlayers, "Winner ID:", winnerId)

// Proceed only if no players are alive, meaning the game is truly over
if alivePlayers == 0 {
if winnerId != "" {
newBalance := 0

// Update game status and winner
_, err := lib.Pool.Exec(
context.Background(),
"UPDATE public.games SET status = $2, winnerEmail = $3 WHERE id = $1",
gameId, "completed", winnerId,
)
if err != nil {
newLine := fmt.Sprintf("ERROR_UPDATING_GAME-gameId_%s-status_%s\n", gameId, "completed")
lib.ErrorLogger(newLine, "errors.txt")
return
}

// Update winner's balance
err = lib.Pool.QueryRow(
context.Background(),
`UPDATE public.users SET "solanaBalance" = "solanaBalance" + $2 WHERE id = $1 RETURNING "solanaBalance"`,
winnerId, targetGame.WinnerPrice,
).Scan(&newBalance)
if err != nil {
newLine := fmt.Sprintf("ERROR_UPDATING_USER_BALANCE-userId_%s-amount_%d\n", winnerId, targetGame.WinnerPrice)
lib.ErrorLogger(newLine, "errors.txt")
} else {
gameManager.DeleteGame(gameId)
}
}

// Notify all players about the game result
for k := range targetGame.Users {
participant, exist := gameManager.GetUser(k)
if exist {
gameManager.Users[k] = User{
Id: participant.Id,
CurrentGameId: "",
PublicKey: participant.PublicKey,
Ws: participant.Ws,
}
if k == winnerId && winnerId != "" {
participant.SendMessage("winner", map[string]interface{}{
"amount": targetGame.WinnerPrice,
})
} else {
participant.SendMessage("loser", map[string]interface{}{
"amount": targetGame.Entry,
})
}
}
}
}
log.Println("Game over complete. Alive players:", alivePlayers, "Winner ID:", winnerId)
}
69 changes: 47 additions & 22 deletions server/game-manager/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type Queue struct {
timeout time.Duration
}

// Enqueue adds a task to the queue
func (q *Queue) Enqueue(ctx context.Context, item map[string]interface{}) error {

jsonData, err := json.Marshal(item)
Expand All @@ -35,7 +34,6 @@ func (q *Queue) Enqueue(ctx context.Context, item map[string]interface{}) error

}

// Dequeue removes a task from the queue and moves it to the processing list
func (q *Queue) Dequeue(ctx context.Context) (string, error) {
result, err := q.client.BRPopLPush(ctx, q.queueName, q.processingKey, 0).Result()
if err == redis.Nil {
Expand All @@ -44,21 +42,17 @@ func (q *Queue) Dequeue(ctx context.Context) (string, error) {
return result, err
}

// Acknowledge removes a task from the processing list upon successful processing
func (q *Queue) Acknowledge(ctx context.Context, item string) error {
return q.client.LRem(ctx, q.processingKey, 0, item).Err()
}

// RetryFailedTasks moves items back to the main queue if they exceed the processing timeout
func (q *Queue) RetryFailedTasks(ctx context.Context) error {
items, err := q.client.LRange(ctx, q.processingKey, 0, -1).Result()
if err != nil {
return err
}

for _, item := range items {
// Check the item's "processing time" with an arbitrary check using ZSET for better visibility if needed.
// For now we simply dequeue to retry after processing time exceeded
if time.Since(time.Now()) > q.timeout {
if err := q.client.LPush(ctx, q.queueName, item).Err(); err != nil {
return err
Expand All @@ -72,24 +66,19 @@ func (q *Queue) RetryFailedTasks(ctx context.Context) error {
return nil
}

// ProcessQueue continuously processes items and retries failed tasks periodically
func (q *Queue) ProcessQueue(ctx context.Context) {
for {
// Attempt to dequeue a task
item, err := q.Dequeue(ctx)
if err != nil {
// Check for EOF (queue empty) and other errors
if err == io.EOF {
time.Sleep(2 * time.Second)
continue
} else {
// Log and attempt reconnection if necessary
log.Printf("Error dequeuing task: %v", err)
time.Sleep(2 * time.Second)
continue
}
} else if item == "" {
// If no items are in the queue, we wait and continue
time.Sleep(2 * time.Second)
continue
}
Expand All @@ -106,29 +95,27 @@ func (q *Queue) ProcessQueue(ctx context.Context) {
log.Printf("Processing ====== %s", taskType)
switch taskType {
case "create-game":
_, err = lib.Pool.Exec(context.Background(), `INSERT INTO public.games (id, "entryFee", "winningAmount", "gameTypeId", "maxPlayer")
VALUES ($1, $2, $3, $4, $5)
RETURNING id, status, "entryFee", "winningAmount", "maxPlayer"`, taskPayload["id"], taskPayload["entry"], taskPayload["winnerPrice"], taskPayload["gameTypeId"], taskPayload["maxUserCount"])
err = CreateGame(ctx, taskPayload)
case "add-participant":
_, err = lib.Pool.Exec(context.Background(), `INSERT INTO public.participants ("userId", "gameId") VALUES ($1, $2)`, taskPayload["userId"], taskPayload["gameId"])
err = AddParticipant(ctx, taskPayload)
case "start-game":
_, err = lib.Pool.Exec(context.Background(), `UPDATE public.games SET status = $2 WHERE id = $1`, taskPayload["gameId"], "ongoing")
err = StartGame(ctx, taskPayload)
case "collect-entry":
query := fmt.Sprintf(`UPDATE public.users SET "solanaBalance" = "solanaBalance" - $1 WHERE id IN (%s) AND "solanaBalance" >= $1`, taskPayload["ids"])
_, err = lib.Pool.Exec(context.Background(), query, taskPayload["entry"])
err = CollectEntry(ctx, taskPayload)
case "join-game":
GetInstance().JoinGame(taskPayload["userId"].(string), taskPayload["gameTypeId"].(string))
JoinGame(ctx, taskPayload)
case "end-game":
err = EndGame(ctx, taskPayload)
case "update-balance":
err = UpdateBalance(ctx, taskPayload)
}

if err != nil {
// if err := q.RetryFailedTasks(ctx); err != nil {
log.Printf("Failed to retry tasks: %s", err.Error())
// }
} else {
if err := q.Acknowledge(ctx, item); err != nil {
log.Fatalf("Failed to acknowledge task: %v", err)
}

}

}
Expand All @@ -137,3 +124,41 @@ func (q *Queue) ProcessQueue(ctx context.Context) {
func Parse(jsonStr string, result interface{}) error {
return json.Unmarshal([]byte(jsonStr), result)
}

func CreateGame(ctx context.Context, taskPayload map[string]interface{}) error {
_, err := lib.Pool.Exec(ctx, `INSERT INTO public.games (id, "entryFee", "winningAmount", "gameTypeId", "maxPlayer")
VALUES ($1, $2, $3, $4, $5)
RETURNING id, status, "entryFee", "winningAmount", "maxPlayer"`, taskPayload["id"], taskPayload["entry"], taskPayload["winnerPrice"], taskPayload["gameTypeId"], taskPayload["maxUserCount"])
return err
}

func AddParticipant(ctx context.Context, taskPayload map[string]interface{}) error {
_, err := lib.Pool.Exec(ctx, `INSERT INTO public.participants ("userId", "gameId") VALUES ($1, $2)`, taskPayload["userId"], taskPayload["gameId"])
return err
}

func StartGame(ctx context.Context, taskPayload map[string]interface{}) error {
_, err := lib.Pool.Exec(ctx, `UPDATE public.games SET status = $2 WHERE id = $1`, taskPayload["gameId"], "ongoing")
return err
}

func JoinGame(ctx context.Context, taskPayload map[string]interface{}) error {
GetInstance().JoinGame(taskPayload["userId"].(string), taskPayload["gameTypeId"].(string))
return nil
}

func EndGame(ctx context.Context, taskPayload map[string]interface{}) error {
_, err := lib.Pool.Exec(ctx, `UPDATE public.games SET status = $2, "winnerId" = $3 WHERE id = $1`, taskPayload["gameId"], "completed", taskPayload["winnerId"])
return err
}

func CollectEntry(ctx context.Context, taskPayload map[string]interface{}) error {
query := fmt.Sprintf(`UPDATE public.users SET "solanaBalance" = "solanaBalance" - $1 WHERE id IN (%s) AND "solanaBalance" >= $1`, taskPayload["ids"])
_, err := lib.Pool.Exec(ctx, query, taskPayload["entry"])
return err
}

func UpdateBalance(ctx context.Context, taskPayload map[string]interface{}) error {
_, err := lib.Pool.Exec(ctx, `UPDATE public.users SET "solanaBalance" = "solanaBalance" + $2 WHERE id = $1`, taskPayload["winnerId"], taskPayload["amount"])
return err
}
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/redis/go-redis/v9 v9.7.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
Loading

0 comments on commit 140ebbb

Please sign in to comment.