Skip to content

Commit

Permalink
Redis: Block forever (#130)
Browse files Browse the repository at this point in the history
Before this commit redis only blocked for
an hour. After an hour the xread command
returned with empty data. This was unexpected
for the calling functions.

Now, redis blocks forever.

On Shutdown the redis-connection gets killed.
  • Loading branch information
ostcar authored Nov 28, 2020
1 parent c5fd275 commit 9c31f7f
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
6 changes: 3 additions & 3 deletions internal/redis/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func (s *Pool) TestConn() error {
}

// XREAD reads new messages from one stream.
func (s *Pool) XREAD(count, block, stream, id string) (interface{}, error) {
func (s *Pool) XREAD(count, stream, id string) (interface{}, error) {
conn := s.pool.Get()
defer conn.Close()
return conn.Do("XREAD", "COUNT", count, "BLOCK", block, "STREAMS", stream, id)
return conn.Do("XREAD", "COUNT", count, "BLOCK", "0", "STREAMS", stream, id)
}

// BlockingConn implements the redis.Conn interface but does nothing.
type BlockingConn struct{}

// XREAD blocks forever.
func (BlockingConn) XREAD(count, block, stream, id string) (interface{}, error) {
func (BlockingConn) XREAD(count, stream, id string) (interface{}, error) {
select {}
}
2 changes: 1 addition & 1 deletion internal/redis/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package redis

// Connection is the raw connection to a redis server.
type Connection interface {
XREAD(count, block, stream, lastID string) (interface{}, error)
XREAD(count, stream, lastID string) (interface{}, error)
}
2 changes: 1 addition & 1 deletion internal/redis/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var testData = map[string]string{
]`,
}

func (c mockConn) XREAD(count, block, stream, lastID string) (interface{}, error) {
func (c mockConn) XREAD(count, stream, lastID string) (interface{}, error) {
if c.err != nil {
return nil, c.err
}
Expand Down
8 changes: 2 additions & 6 deletions internal/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ const (
// maxMessages desides how many messages are read at once from the stream.
maxMessages = "10"

// blockTimeout is the time in miliseconds, how long the xread command will
// block.
blockTimeout = "3600000" // One Hour

// fieldChangedTopic is the redis key name of the autoupdate stream.
fieldChangedTopic = "ModifiedFields"

Expand All @@ -43,7 +39,7 @@ func (s *Service) Update(closing <-chan struct{}) (map[string]json.RawMessage, e

var data map[string]json.RawMessage
err := closingFunc(closing, func() error {
newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, blockTimeout, fieldChangedTopic, id))
newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, fieldChangedTopic, id))
if err != nil {
return err
}
Expand Down Expand Up @@ -75,7 +71,7 @@ func (s *Service) LogoutEvent(closing <-chan struct{}) ([]string, error) {

var sessionIDs []string
err := closingFunc(closing, func() error {
newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, blockTimeout, logoutTopic, id))
newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, logoutTopic, id))
if err != nil {
return err
}
Expand Down

0 comments on commit 9c31f7f

Please sign in to comment.