Skip to content

TCP: simplify error handling #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/modbus-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (w *writer) ToInt32(v int32) []byte {
b, _ := io.ReadAll(&buf)
return b
}

func (w *writer) ToUint16(v uint16) []byte {
var buf bytes.Buffer
w.to(&buf, v)
Expand Down
138 changes: 42 additions & 96 deletions tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
tcpHeaderSize = 7
tcpMaxLength = 260
// Default TCP timeout is not set
tcpTimeout = 10 * time.Second
tcpTimeout = 3 * time.Second
tcpIdleTimeout = 60 * time.Second
)

Expand Down Expand Up @@ -169,9 +169,6 @@ type tcpTransporter struct {
closeTimer *time.Timer
lastActivity time.Time

lastAttemptedTransactionID uint16
lastSuccessfulTransactionID uint16

// For synchronization between messages of server & client
transactionID uint32

Expand Down Expand Up @@ -202,111 +199,60 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error
defer mb.mu.Unlock()

var data [tcpMaxLength]byte
recoveryDeadline := time.Now().Add(mb.IdleTimeout)

for {
// Establish a new connection if not connected
if err = mb.connect(); err != nil {
return
}

// Set timer to close when idle
mb.lastActivity = time.Now()
mb.startCloseTimer()
// Establish a new connection if not connected
if err := mb.connect(); err != nil {
return nil, err
}

// Set write and read timeout
if mb.Timeout > 0 {
if err = mb.conn.SetDeadline(mb.lastActivity.Add(mb.Timeout)); err != nil {
return
}
}
// Set timer to close when idle
mb.lastActivity = time.Now()
mb.startCloseTimer()

// Send data
mb.logf("modbus: send % x", aduRequest)
if _, err = mb.conn.Write(aduRequest); err != nil {
return
// Set write and read timeout
if mb.Timeout > 0 {
if err := mb.conn.SetDeadline(mb.lastActivity.Add(mb.Timeout)); err != nil {
return nil, err
}
}

mb.lastAttemptedTransactionID = binary.BigEndian.Uint16(aduRequest)
var res readResult
aduResponse, res, err = mb.readResponse(aduRequest, data[:], recoveryDeadline)
switch res {
case readResultDone:
if err == nil {
mb.lastSuccessfulTransactionID = binary.BigEndian.Uint16(aduResponse)
}
return
case readResultRetry:
continue
}
close := func(err error) {
mb.logf("modbus: close connection: %v", err)
mb.close()
}

mb.logf("modbus: close connection and retry, because of %v", err)
// Send data
mb.logf("modbus: send % x", aduRequest)
if _, err := mb.conn.Write(aduRequest); err != nil {
close(err)
return nil, err
}

mb.close()
time.Sleep(mb.LinkRecoveryTimeout)
res, err := mb.readResponse(aduRequest, data[:])
if err != nil {
close(err)
}

return res, err
}

func (mb *tcpTransporter) readResponse(aduRequest []byte, data []byte, recoveryDeadline time.Time) (aduResponse []byte, res readResult, err error) {
// res is readResultDone by default, which either means we succeeded or err contains the fatal error
for {
if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil {
aduResponse, err = mb.processResponse(data[:])
if err == nil {
err = verify(aduRequest, aduResponse)
if err == nil {
mb.logf("modbus: recv % x\n", aduResponse)
return // everything is OK
}
}
func (mb *tcpTransporter) readResponse(aduRequest []byte, data []byte) ([]byte, error) {
_, err := io.ReadFull(mb.conn, data[:tcpHeaderSize])
if err != nil {
return nil, err
}

// no time left, report error
if time.Since(recoveryDeadline) >= 0 {
return
}
aduResponse, err := mb.processResponse(data[:])
if err != nil {
return nil, err
}

switch v := err.(type) {
case ErrTCPHeaderLength:
if mb.LinkRecoveryTimeout > 0 {
// TCP header not OK - retry with another query
res = readResultRetry
return
}
// no time left, report error
return
case errTransactionIDMismatch:
// the first condition check for a normal transaction id mismatch. The second part of the condition check for a wrap-around. If a wraparound is
// detected (last attempt is smaller than last success), the id can be higher than the last success or lower than the last attempt, but not both
if (v.got > mb.lastSuccessfulTransactionID && v.got < mb.lastAttemptedTransactionID) ||
(mb.lastAttemptedTransactionID < mb.lastSuccessfulTransactionID && (v.got > mb.lastSuccessfulTransactionID || v.got < mb.lastAttemptedTransactionID)) {
// most likely, we simply had a timeout for the earlier query and now read the (late) response. Ignore it
// and assume that the response will come *without* sending another query. (If we send another query
// with transactionId X+1 here, we would again get a transactionMismatchError if the response to
// transactionId X is already in the buffer).
continue
}
if mb.ProtocolRecoveryTimeout > 0 {
// some other mismatch, still in time and protocol may recover - retry with another query
res = readResultRetry
return
}
return // no time left, report error
default:
if mb.ProtocolRecoveryTimeout > 0 {
// TCP header OK but modbus frame not - retry with another query
res = readResultRetry
return
}
return // no time left, report error
}
} else if (err != io.EOF && err != io.ErrUnexpectedEOF) ||
mb.LinkRecoveryTimeout == 0 || time.Until(recoveryDeadline) < 0 {
return
}
// any other error, but recovery deadline isn't reached yet - close and retry
res = readResultCloseRetry
return
err = verify(aduRequest, aduResponse)
if err == nil {
mb.logf("modbus: recv % x\n", aduResponse)
}

return aduResponse, err
}

func (mb *tcpTransporter) processResponse(data []byte) (aduResponse []byte, err error) {
Expand Down
1 change: 1 addition & 0 deletions tcpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestErrTCPHeaderLength_Error(t *testing.T) {
_ = ErrTCPHeaderLength(1000).Error()
}

// TODO broken
func TestTCPTransactionMismatchRetry(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down