diff --git a/internal/redis/conn.go b/internal/redis/conn.go index 54931c6a..196d29d3 100644 --- a/internal/redis/conn.go +++ b/internal/redis/conn.go @@ -38,16 +38,16 @@ func (s *Pool) TestConn() error { } // XREAD reads new messages from one stream. -func (s *Pool) XREAD(count, block, stream, id string) (interface{}, error) { +func (s *Pool) XREAD(count, stream, id string) (interface{}, error) { conn := s.pool.Get() defer conn.Close() - return conn.Do("XREAD", "COUNT", count, "BLOCK", block, "STREAMS", stream, id) + return conn.Do("XREAD", "COUNT", count, "BLOCK", "0", "STREAMS", stream, id) } // BlockingConn implements the redis.Conn interface but does nothing. type BlockingConn struct{} // XREAD blocks forever. -func (BlockingConn) XREAD(count, block, stream, id string) (interface{}, error) { +func (BlockingConn) XREAD(count, stream, id string) (interface{}, error) { select {} } diff --git a/internal/redis/interfaces.go b/internal/redis/interfaces.go index 0bb21f80..4bee290e 100644 --- a/internal/redis/interfaces.go +++ b/internal/redis/interfaces.go @@ -2,5 +2,5 @@ package redis // Connection is the raw connection to a redis server. type Connection interface { - XREAD(count, block, stream, lastID string) (interface{}, error) + XREAD(count, stream, lastID string) (interface{}, error) } diff --git a/internal/redis/mock_test.go b/internal/redis/mock_test.go index 380c5611..39cedfd4 100644 --- a/internal/redis/mock_test.go +++ b/internal/redis/mock_test.go @@ -49,7 +49,7 @@ var testData = map[string]string{ ]`, } -func (c mockConn) XREAD(count, block, stream, lastID string) (interface{}, error) { +func (c mockConn) XREAD(count, stream, lastID string) (interface{}, error) { if c.err != nil { return nil, c.err } diff --git a/internal/redis/redis.go b/internal/redis/redis.go index 1f42692f..37b1fd79 100644 --- a/internal/redis/redis.go +++ b/internal/redis/redis.go @@ -13,10 +13,6 @@ const ( // maxMessages desides how many messages are read at once from the stream. maxMessages = "10" - // blockTimeout is the time in miliseconds, how long the xread command will - // block. - blockTimeout = "3600000" // One Hour - // fieldChangedTopic is the redis key name of the autoupdate stream. fieldChangedTopic = "ModifiedFields" @@ -43,7 +39,7 @@ func (s *Service) Update(closing <-chan struct{}) (map[string]json.RawMessage, e var data map[string]json.RawMessage err := closingFunc(closing, func() error { - newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, blockTimeout, fieldChangedTopic, id)) + newID, d, err := autoupdateStream(s.Conn.XREAD(maxMessages, fieldChangedTopic, id)) if err != nil { return err } @@ -75,7 +71,7 @@ func (s *Service) LogoutEvent(closing <-chan struct{}) ([]string, error) { var sessionIDs []string err := closingFunc(closing, func() error { - newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, blockTimeout, logoutTopic, id)) + newID, sIDs, err := logoutStream(s.Conn.XREAD(maxMessages, logoutTopic, id)) if err != nil { return err }