Skip to content

Commit

Permalink
Update UDP: Ensure propagation of control message to pool.Message
Browse files Browse the repository at this point in the history
This commit enhances the UDP functionality, ensuring proper dissemination
of control messages to pool.Message for improved network coordination
and responsiveness
  • Loading branch information
jkralik committed Nov 15, 2023
1 parent c93120b commit ce37a93
Show file tree
Hide file tree
Showing 10 changed files with 923 additions and 319 deletions.
2 changes: 1 addition & 1 deletion dtls/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Session) Run(cc *client.Conn) (err error) {
return fmt.Errorf("cannot read from connection: %w", err)
}
readBuf = readBuf[:readLen]
err = cc.Process(readBuf)
err = cc.Process(nil, readBuf)
if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions message/pool/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/net"
"go.uber.org/atomic"
)

Expand All @@ -26,6 +27,7 @@ type Message struct {
// Context context of request.
ctx context.Context
msg message.Message
controlMessage *net.ControlMessage // control message for UDP
hijacked atomic.Bool
isModified bool
valueBuffer []byte
Expand Down Expand Up @@ -73,6 +75,22 @@ func (r *Message) SetMessage(message message.Message) {
r.isModified = true
}

func (r *Message) SetControlMessage(cm *net.ControlMessage) {
r.controlMessage = cm
}

func (r *Message) ControlMessage() *net.ControlMessage {
return r.controlMessage
}

// UpsertControlMessage set value only when origin value is not set.
func (r *Message) UpsertControlMessage(cm *net.ControlMessage) {
if r.controlMessage != nil {
return
}
r.SetControlMessage(cm)
}

// SetMessageID only 0 to 2^16-1 are valid.
func (r *Message) SetMessageID(mid int32) {
r.msg.MessageID = mid
Expand Down Expand Up @@ -120,6 +138,7 @@ func (r *Message) Reset() {
r.valueBuffer = r.origValueBuffer
r.body = nil
r.isModified = false
r.controlMessage = nil
if cap(r.bufferMarshal) > 1024 {
r.bufferMarshal = make([]byte, 256)
}
Expand Down Expand Up @@ -568,6 +587,7 @@ func (r *Message) Clone(msg *Message) error {
msg.ResetOptionsTo(r.Options())
msg.SetType(r.Type())
msg.SetMessageID(r.MessageID())
msg.SetControlMessage(r.ControlMessage())

if r.Body() != nil {
buf := bytes.NewBuffer(nil)
Expand Down
Loading

0 comments on commit ce37a93

Please sign in to comment.