Skip to content

Commit

Permalink
REVERT - context not close
Browse files Browse the repository at this point in the history
  • Loading branch information
luandnh committed Jul 22, 2023
1 parent 27227c1 commit a0e5158
Showing 1 changed file with 59 additions and 43 deletions.
102 changes: 59 additions & 43 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *ESLConnection) Authenticate(ctx context.Context, password string) error
if am.Get("Reply-Text") != "+OK accepted" {
return errors.New("invalid password")
}
go c.HandleMessage()
go c.receiveLoop()
return nil
}

Expand Down Expand Up @@ -147,6 +147,12 @@ func (c *ESLConnection) Send(cmd string) (*ESLResponse, error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), DEFAULT_TIMEOUT)
defer cancel()

if deadline, ok := ctx.Deadline(); ok {
_ = c.conn.SetWriteDeadline(deadline)
}
_, err := c.conn.Write([]byte(cmd + EndOfMessage))
if err != nil {
return nil, err
Expand All @@ -164,6 +170,8 @@ func (c *ESLConnection) Send(cmd string) (*ESLResponse, error) {
return response, nil
case err := <-c.err:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

Expand All @@ -181,6 +189,12 @@ func (c *ESLConnection) SendEvent(eventHeaders []string) (*ESLResponse, error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), DEFAULT_TIMEOUT)
defer cancel()

if deadline, ok := ctx.Deadline(); ok {
_ = c.conn.SetWriteDeadline(deadline)
}
_, err := c.conn.Write([]byte("sendevent "))
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,6 +226,8 @@ func (c *ESLConnection) SendEvent(eventHeaders []string) (*ESLResponse, error) {
return response, nil
case err := <-c.err:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

Expand Down Expand Up @@ -304,58 +320,58 @@ func (c *ESLConnection) SendMsg(msg map[string]string, uuid, data string) (*ESLR
}
}

// func (c *ESLConnection) receiveLoop() {
// done := make(chan bool)
// go func() {
// for c.runningContext.Err() == nil {
// err := c.doMessage()
// if err != nil {
// c.logger.Warn("err receiving message: %v", err)
// c.err <- err
// done <- true
// break
// }
// }
// }()
// <-done
// c.Close()
// }

// func (c *ESLConnection) doMessage() error {
// msg, err := c.ParseResponse()
// if err != nil {
// return err
// }

// c.responseChanMutex.RLock()
// defer c.responseChanMutex.RUnlock()
// if c.isClosed {
// return errors.New("connection closed, no response channel")
// }

// select {
// case c.responseMessage <- msg:
// case <-c.runningContext.Done():
// return c.runningContext.Err()
// }
// return nil
// }

// HandleMessage - Handle message from channel
func (c *ESLConnection) HandleMessage() {
func (c *ESLConnection) receiveLoop() {
done := make(chan bool)
go func() {
for {
msg, err := c.ParseResponse()
for c.runningContext.Err() == nil {
err := c.doMessage()
if err != nil {
c.logger.Warn("err receiving message: %v", err)
c.err <- err
done <- true
break
}
c.responseMessage <- msg
}
}()
<-done
c.Close()
}

func (c *ESLConnection) doMessage() error {
msg, err := c.ParseResponse()
if err != nil {
return err
}

c.responseChanMutex.RLock()
defer c.responseChanMutex.RUnlock()
if c.isClosed {
return errors.New("connection closed, no response channel")
}

select {
case c.responseMessage <- msg:
case <-c.runningContext.Done():
return c.runningContext.Err()
}
return nil
}

// // HandleMessage - Handle message from channel
// func (c *ESLConnection) HandleMessage() {
// done := make(chan bool)
// go func() {
// for {
// msg, err := c.ParseResponse()
// if err != nil {
// c.logger.Warn("err receiving message: %v", err)
// c.err <- err
// done <- true
// break
// }
// c.responseMessage <- msg
// }
// }()
// <-done
// c.Close()
// }

0 comments on commit a0e5158

Please sign in to comment.