Skip to content

Commit

Permalink
restructure controller-replica communication
Browse files Browse the repository at this point in the history
  • Loading branch information
Kampadais committed Oct 3, 2024
1 parent 9a8f5b4 commit f3cb007
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 147 deletions.
163 changes: 27 additions & 136 deletions pkg/dataconn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@ package dataconn

import (
"errors"
"github.com/longhorn/longhorn-engine/pkg/types"
"github.com/sirupsen/logrus"
"io"
"net"
"time"

"github.com/sirupsen/logrus"

journal "github.com/longhorn/sparse-tools/stats"

"github.com/longhorn/longhorn-engine/pkg/types"
)

var (
//ErrRWTimeout r/w operation timeout
ErrRWTimeout = errors.New("r/w timeout")
const (
queueLength = 4196
)

// Client replica client
Expand All @@ -25,7 +19,8 @@ type Client struct {
send chan *Message
responses chan *Message
seq uint32
messages map[uint32]*Message
messages [queueLength]*Message
SeqChan chan uint32
wires []*Wire
peerAddr string
sharedTimeouts types.SharedTimeouts
Expand All @@ -37,17 +32,21 @@ func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client {
for _, conn := range conns {
wires = append(wires, NewWire(conn))
}

c := &Client{
wires: wires,
peerAddr: conns[0].RemoteAddr().String(),
end: make(chan struct{}, 1024),
requests: make(chan *Message, 1024),
send: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
messages: map[uint32]*Message{},
messages: [queueLength]*Message{},
SeqChan: make(chan uint32, queueLength),
sharedTimeouts: sharedTimeouts,
}
go c.loop()
for i := uint32(0); i < queueLength; i++ {
c.SeqChan <- i
}
c.write()
c.read()
return c
Expand Down Expand Up @@ -99,7 +98,7 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) (
msg.Data = buf
}

c.requests <- &msg
c.handleRequest(&msg)

<-msg.Complete
// Only copy the message if a read is requested
Expand All @@ -112,6 +111,9 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) (
if msg.Type == TypeEOF {
return int(msg.Size), io.EOF
}

c.SeqChan <- msg.Seq

return int(msg.Size), nil
}

Expand All @@ -123,145 +125,34 @@ func (c *Client) Close() {
c.end <- struct{}{}
}

func (c *Client) loop() {
defer close(c.send)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

var clientError error
var ioInflight int
var timeOfLastActivity time.Time

decremented := false
c.sharedTimeouts.Increment()
// Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop.
defer func() {
if !decremented {
c.sharedTimeouts.Decrement()
}
}()

// handleClientError cleans up all in flight messages
// also stores the error so that future requests/responses get errored immediately.
handleClientError := func(err error) {
clientError = err
for _, msg := range c.messages {
c.replyError(msg, err)
}

ioInflight = 0
timeOfLastActivity = time.Time{}
}

for {
select {
case <-c.end:
return
case <-ticker.C:
if timeOfLastActivity.IsZero() || ioInflight == 0 {
continue
}

exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity))
if exceededTimeout > 0 {
decremented = true
logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout)
handleClientError(ErrRWTimeout)
journal.PrintLimited(1000)
}
case req := <-c.requests:
if clientError != nil {
c.replyError(req, clientError)
continue
}

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
if ioInflight == 0 {
// If nothing is in-flight, we should get a fresh timeout.
timeOfLastActivity = time.Now()
}
ioInflight++
}

c.handleRequest(req)
case resp := <-c.responses:
if resp.transportErr != nil {
handleClientError(resp.transportErr)
continue
}

req, pending := c.messages[resp.Seq]
if !pending {
logrus.Warnf("Received response message id %v seq %v type %v for non pending request", resp.ID, resp.Seq, resp.Type)
continue
}

if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap {
ioInflight--
timeOfLastActivity = time.Now()
}

if clientError != nil {
c.replyError(req, clientError)
continue
}

c.handleResponse(resp)
}
}
}

func (c *Client) nextSeq() uint32 {
c.seq++
return c.seq
}

func (c *Client) replyError(req *Message, err error) {
if opErr := journal.RemovePendingOp(req.ID, false); opErr != nil {
logrus.WithError(opErr).WithFields(logrus.Fields{
"seq": req.Seq,
"id": req.ID,
}).Warn("Error removing pending operation")
}
delete(c.messages, req.Seq)
req.Type = TypeError
req.Data = []byte(err.Error())
req.Complete <- struct{}{}
}

func (c *Client) handleRequest(req *Message) {
switch req.Type {
case TypeRead:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpRead, int(req.Size))
case TypeWrite:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpWrite, int(req.Size))
case TypeUnmap:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpUnmap, int(req.Size))
case TypePing:
req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpPing, 0)
}

req.MagicVersion = MagicVersion
req.Seq = c.nextSeq()

req.Seq = <-c.SeqChan

c.messages[req.Seq] = req
c.send <- req
}

func (c *Client) handleResponse(resp *Message) {
if req, ok := c.messages[resp.Seq]; ok {
err := journal.RemovePendingOp(req.ID, true)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"seq": resp.Seq,
"id": req.ID,
}).Warn("Error removing pending operation")
}
delete(c.messages, resp.Seq)
req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}
}
req := c.messages[resp.Seq]

req.Type = resp.Type
req.Size = resp.Size
req.Data = resp.Data
req.Complete <- struct{}{}

}

func (c *Client) write() {
Expand Down Expand Up @@ -290,7 +181,7 @@ func (c *Client) read() {
}
break
}
c.responses <- msg
c.handleResponse(msg)
}
}(wire)
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/dataconn/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,45 @@ import (
"github.com/longhorn/longhorn-engine/pkg/types"
)

const (
threadCount = 256
)

type Server struct {
wire *Wire
requests chan *Message
responses chan *Message
done chan struct{}
data types.DataProcessor
}

func NewServer(conn net.Conn, data types.DataProcessor) *Server {
return &Server{
//init theads
server := &Server{
wire: NewWire(conn),
requests: make(chan *Message, 1024),
responses: make(chan *Message, 1024),
done: make(chan struct{}, 5),
data: data,
}
for i := 0; i < threadCount; i++ {
go func(s *Server) {
for {
msg := <-s.requests
switch msg.Type {
case TypeRead:
s.handleRead(msg)
case TypeWrite:
s.handleWrite(msg)
case TypeUnmap:
s.handleUnmap(msg)
case TypePing:
s.handlePing(msg)
}
}
}(server)
}
return server
}

func (s *Server) Handle() error {
Expand All @@ -43,16 +68,7 @@ func (s *Server) readFromWire(ret chan<- error) {
ret <- err
return
}
switch msg.Type {
case TypeRead:
go s.handleRead(msg)
case TypeWrite:
go s.handleWrite(msg)
case TypeUnmap:
go s.handleUnmap(msg)
case TypePing:
go s.handlePing(msg)
}
s.requests <- msg
ret <- nil
}

Expand Down

0 comments on commit f3cb007

Please sign in to comment.