diff --git a/internal/streams/dot.go b/internal/streams/dot.go index c54a733a2..870122dcc 100644 --- a/internal/streams/dot.go +++ b/internal/streams/dot.go @@ -58,7 +58,7 @@ func humanBytes(i int) string { type node struct { ID uint32 `json:"id"` Codec map[string]any `json:"codec"` - Parent uint32 `json:"parent"` + Parents []uint32 `json:"parents"` Childs []uint32 `json:"childs"` Bytes int `json:"bytes"` //Packets uint32 `json:"packets"` @@ -124,9 +124,11 @@ func (c *conn) appendDOT(dot []byte, group string) []byte { dot = recv.appendDOT(dot, "node") } for _, send := range c.Senders { - dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.Parent, c.ID, humanBytes(send.Bytes)) - //dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes)) - //dot = send.appendDOT(dot, "node") + for _, parentID := range send.Parents { + dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", parentID, c.ID, humanBytes(send.Bytes)) + //dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes)) + //dot = send.appendDOT(dot, "node") + } } return dot } diff --git a/pkg/core/node.go b/pkg/core/node.go index a9959c3de..e091f5b37 100644 --- a/pkg/core/node.go +++ b/pkg/core/node.go @@ -29,7 +29,7 @@ type Node struct { id uint32 childs []*Node - parent *Node + parents []*Node mu sync.Mutex } @@ -44,45 +44,87 @@ func (n *Node) AppendChild(child *Node) { n.childs = append(n.childs, child) n.mu.Unlock() - child.parent = n + child.mu.Lock() + if child.parents == nil { + child.parents = []*Node{n} + } else { + child.parents = append(child.parents, n) + } + child.mu.Unlock() } func (n *Node) RemoveChild(child *Node) { - n.mu.Lock() - for i, ch := range n.childs { - if ch == child { - n.childs = append(n.childs[:i], n.childs[i+1:]...) - break - } - } - n.mu.Unlock() + if child == nil { + return + } + + n.mu.Lock() + for i, ch := range n.childs { + if ch == child { + n.childs = append(n.childs[:i], n.childs[i+1:]...) + break + } + } + n.mu.Unlock() + + child.mu.Lock() + for i, p := range child.parents { + if p == n { + child.parents = append(child.parents[:i], child.parents[i+1:]...) + break + } + } + child.mu.Unlock() } func (n *Node) Close() { - if parent := n.parent; parent != nil { - parent.RemoveChild(n) - - if len(parent.childs) == 0 { - parent.Close() - } - } else { - for _, childs := range n.childs { - childs.Close() - } - } + n.mu.Lock() + if n.parents != nil && len(n.parents) > 0 { + parents := n.parents + n.mu.Unlock() + + for _, parent := range parents { + if parent != nil { + parent.RemoveChild(n) + } + } + } else { + children := n.childs + n.childs = nil + n.mu.Unlock() + + for _, child := range children { + if child != nil { + child.Close() + } + } + } } func MoveNode(dst, src *Node) { - src.mu.Lock() - childs := src.childs - src.childs = nil - src.mu.Unlock() - - dst.mu.Lock() - dst.childs = childs - dst.mu.Unlock() - - for _, child := range childs { - child.parent = dst - } + src.mu.Lock() + childs := make([]*Node, len(src.childs)) + copy(childs, src.childs) + src.childs = nil + src.mu.Unlock() + + dst.mu.Lock() + dst.childs = childs + dst.mu.Unlock() + + for _, child := range childs { + child.mu.Lock() + for i, p := range child.parents { + if p == src { + child.parents = append(child.parents[:i], child.parents[i+1:]...) + break + } + } + if child.parents == nil { + child.parents = []*Node{dst} + } else { + child.parents = append(child.parents, dst) + } + child.mu.Unlock() + } } diff --git a/pkg/core/track.go b/pkg/core/track.go index 8bc653749..e65240a62 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -131,13 +131,13 @@ func (s *Sender) WithParent(parent *Receiver) *Sender { } func (s *Sender) Start() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.buf == nil || s.done != nil { - return - } - s.done = make(chan struct{}) + s.mu.Lock() + if s.buf == nil || s.done != nil { + s.mu.Unlock() + return + } + s.done = make(chan struct{}) + s.mu.Unlock() go func() { for packet := range s.buf { @@ -196,7 +196,7 @@ func (s *Sender) MarshalJSON() ([]byte, error) { v := struct { ID uint32 `json:"id"` Codec *Codec `json:"codec"` - Parent uint32 `json:"parent,omitempty"` + Parents []*uint32 `json:"parents,omitempty"` Bytes int `json:"bytes,omitempty"` Packets int `json:"packets,omitempty"` Drops int `json:"drops,omitempty"` @@ -207,8 +207,10 @@ func (s *Sender) MarshalJSON() ([]byte, error) { Packets: s.Packets, Drops: s.Drops, } - if s.parent != nil { - v.Parent = s.parent.id + if s.parents != nil { + for _, parent := range s.parents { + v.Parents = append(v.Parents, &parent.id) + } } return json.Marshal(v) }