Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #46 from itzmeanjan/concurrent-write-on-ws-fix
Browse files Browse the repository at this point in the history
Concurrent safe write on shared network resource
  • Loading branch information
itzmeanjan authored Jan 12, 2021
2 parents 8ee7f15 + d896591 commit 8da500e
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 110 deletions.
105 changes: 72 additions & 33 deletions app/pubsub/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package pubsub
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -24,6 +26,7 @@ type BlockConsumer struct {
Connection *websocket.Conn
PubSub *redis.PubSub
DB *gorm.DB
Lock *sync.Mutex
}

// Subscribe - Subscribe to `block` channel
Expand All @@ -35,26 +38,19 @@ func (b *BlockConsumer) Subscribe() {
// and reads data from subcribed channel, which also gets delivered to client application
func (b *BlockConsumer) Listen() {

// When ever returning from this function's
// execution context, client will be unsubscribed from
// pubsub topic i.e. `block` topic in this case
defer b.Unsubscribe()

for {

// Checking if client is still subscribed to this topic
// or not
//
// If not, we're cancelling this subscription
if b.Request.Type == "unsubscribe" {

if err := b.Connection.WriteJSON(&SubscriptionResponse{
Code: 1,
Message: "Unsubscribed from `block`",
}); err != nil {
log.Printf("[!] Failed to deliver block unsubscription confirmation to client : %s\n", err.Error())
}

if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error())
}
break

}

msg, err := b.PubSub.ReceiveTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -89,34 +85,42 @@ func (b *BlockConsumer) Send(msg string) bool {
user := db.GetUserFromAPIKey(b.DB, b.Request.APIKey)
if user == nil {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
b.Lock.Lock()

if err := b.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error())
}

b.Lock.Unlock()
// -- ends here
return false

}

if !user.Enabled {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
b.Lock.Lock()

if err := b.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Bad API Key",
}); err != nil {
log.Printf("[!] Failed to deliver bad API key message to client : %s\n", err.Error())
}

if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error())
}

b.Lock.Unlock()
// -- ends here
return false

}
Expand All @@ -125,17 +129,21 @@ func (b *BlockConsumer) Send(msg string) bool {
// if client has crossed it's allowed data delivery limit
if !db.IsUnderRateLimit(b.DB, b.UserAddress.Hex()) {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
b.Lock.Lock()

if err := b.Connection.WriteJSON(&SubscriptionResponse{
Code: 0,
Message: "Crossed Allowed Rate Limit",
}); err != nil {
log.Printf("[!] Failed to deliver rate limit crossed message to client : %s\n", err.Error())
}

if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error())
}

b.Lock.Unlock()
// -- ends here
return false

}
Expand All @@ -156,6 +164,7 @@ func (b *BlockConsumer) Send(msg string) bool {
}

return false

}

// SendData - Sending message to client application, connected over websocket
Expand All @@ -164,20 +173,50 @@ func (b *BlockConsumer) Send(msg string) bool {
// connection ( connection might be already closed though )
func (b *BlockConsumer) SendData(data interface{}) bool {

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
b.Lock.Lock()
defer b.Lock.Unlock()

if err := b.Connection.WriteJSON(data); err != nil {
log.Printf("[!] Failed to deliver `block` data to client : %s\n", err.Error())

if err = b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `block` topic : %s\n", err.Error())
}

if err = b.Connection.Close(); err != nil {
log.Printf("[!] Failed to close websocket connection : %s\n", err.Error())
}

return false
}

log.Printf("[+] Delivered `block` data to client\n")
return true

}

// Unsubscribe - Unsubscribe from block data publishing event this client has subscribed to
func (b *BlockConsumer) Unsubscribe() {

if b.PubSub == nil {
log.Printf("[!] Bad attempt to unsubscribe from `%s` topic\n", b.Request.Topic())
return
}

if err := b.PubSub.Unsubscribe(context.Background(), b.Request.Topic()); err != nil {
log.Printf("[!] Failed to unsubscribe from `%s` topic : %s\n", b.Request.Topic(), err.Error())
return
}

resp := &SubscriptionResponse{
Code: 1,
Message: fmt.Sprintf("Unsubscribed from `%s`", b.Request.Topic()),
}

// -- Critical section of code begins
//
// Attempting to write to a network resource,
// shared among multiple go routines
b.Lock.Lock()
defer b.Lock.Unlock()

if err := b.Connection.WriteJSON(resp); err != nil {
log.Printf("[!] Failed to deliver `%s` unsubscription confirmation to client : %s\n", b.Request.Topic(), err.Error())
}

}
12 changes: 9 additions & 3 deletions app/pubsub/consumption.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pubsub

import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
Expand All @@ -13,18 +15,20 @@ type Consumer interface {
Listen()
Send(msg string) bool
SendData(data interface{}) bool
Unsubscribe()
}

// NewBlockConsumer - Creating one new block data consumer, which will subscribe to block
// topic & listen for data being published on this channel, which will eventually be
// delivered to client application over websocket connection
func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *BlockConsumer {
func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *BlockConsumer {
consumer := BlockConsumer{
Client: client,
Request: req,
UserAddress: address,
Connection: conn,
DB: db,
Lock: lock,
}

consumer.Subscribe()
Expand All @@ -37,13 +41,14 @@ func NewBlockConsumer(client *redis.Client, conn *websocket.Conn, req *Subscript
// topic & listen for data being published on this channel & check whether received data
// is what, client is interested in or not, which will eventually be
// delivered to client application over websocket connection
func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *TransactionConsumer {
func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *TransactionConsumer {
consumer := TransactionConsumer{
Client: client,
Request: req,
UserAddress: address,
Connection: conn,
DB: db,
Lock: lock,
}

consumer.Subscribe()
Expand All @@ -56,13 +61,14 @@ func NewTransactionConsumer(client *redis.Client, conn *websocket.Conn, req *Sub
// topic & listen for data being published on this channel & check whether received data
// is what, client is interested in or not, which will eventually be
// delivered to client application over websocket connection
func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address) *EventConsumer {
func NewEventConsumer(client *redis.Client, conn *websocket.Conn, req *SubscriptionRequest, db *gorm.DB, address common.Address, lock *sync.Mutex) *EventConsumer {
consumer := EventConsumer{
Client: client,
Request: req,
UserAddress: address,
Connection: conn,
DB: db,
Lock: lock,
}

consumer.Subscribe()
Expand Down
Loading

0 comments on commit 8da500e

Please sign in to comment.