Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple parents and fix hanging producer #1431

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
10 changes: 6 additions & 4 deletions internal/streams/dot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func humanBytes(i int) string {
type node struct {
ID uint32 `json:"id"`
Codec map[string]any `json:"codec"`
Parent uint32 `json:"parent"`
Parents []uint32 `json:"parents"`
Childs []uint32 `json:"childs"`
Bytes int `json:"bytes"`
//Packets uint32 `json:"packets"`
Expand Down Expand Up @@ -124,9 +124,11 @@ func (c *conn) appendDOT(dot []byte, group string) []byte {
dot = recv.appendDOT(dot, "node")
}
for _, send := range c.Senders {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.Parent, c.ID, humanBytes(send.Bytes))
//dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes))
//dot = send.appendDOT(dot, "node")
for _, parentID := range send.Parents {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", parentID, c.ID, humanBytes(send.Bytes))
//dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes))
//dot = send.appendDOT(dot, "node")
}
}
return dot
}
Expand Down
108 changes: 75 additions & 33 deletions pkg/core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Node struct {

id uint32
childs []*Node
parent *Node
parents []*Node

mu sync.Mutex
}
Expand All @@ -44,45 +44,87 @@ func (n *Node) AppendChild(child *Node) {
n.childs = append(n.childs, child)
n.mu.Unlock()

child.parent = n
child.mu.Lock()
if child.parents == nil {
child.parents = []*Node{n}
} else {
child.parents = append(child.parents, n)
}
child.mu.Unlock()
}

func (n *Node) RemoveChild(child *Node) {
n.mu.Lock()
for i, ch := range n.childs {
if ch == child {
n.childs = append(n.childs[:i], n.childs[i+1:]...)
break
}
}
n.mu.Unlock()
if child == nil {
return
}

n.mu.Lock()
for i, ch := range n.childs {
if ch == child {
n.childs = append(n.childs[:i], n.childs[i+1:]...)
break
}
}
n.mu.Unlock()

child.mu.Lock()
for i, p := range child.parents {
if p == n {
child.parents = append(child.parents[:i], child.parents[i+1:]...)
break
}
}
child.mu.Unlock()
}

func (n *Node) Close() {
if parent := n.parent; parent != nil {
parent.RemoveChild(n)

if len(parent.childs) == 0 {
parent.Close()
}
} else {
for _, childs := range n.childs {
childs.Close()
}
}
n.mu.Lock()
if n.parents != nil && len(n.parents) > 0 {
parents := n.parents
n.mu.Unlock()

for _, parent := range parents {
if parent != nil {
parent.RemoveChild(n)
}
}
} else {
children := n.childs
n.childs = nil
n.mu.Unlock()

for _, child := range children {
if child != nil {
child.Close()
}
}
}
}

func MoveNode(dst, src *Node) {
src.mu.Lock()
childs := src.childs
src.childs = nil
src.mu.Unlock()

dst.mu.Lock()
dst.childs = childs
dst.mu.Unlock()

for _, child := range childs {
child.parent = dst
}
src.mu.Lock()
childs := make([]*Node, len(src.childs))
copy(childs, src.childs)
src.childs = nil
src.mu.Unlock()

dst.mu.Lock()
dst.childs = childs
dst.mu.Unlock()

for _, child := range childs {
child.mu.Lock()
for i, p := range child.parents {
if p == src {
child.parents = append(child.parents[:i], child.parents[i+1:]...)
break
}
}
if child.parents == nil {
child.parents = []*Node{dst}
} else {
child.parents = append(child.parents, dst)
}
child.mu.Unlock()
}
}
22 changes: 12 additions & 10 deletions pkg/core/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ func (s *Sender) WithParent(parent *Receiver) *Sender {
}

func (s *Sender) Start() {
s.mu.Lock()
defer s.mu.Unlock()

if s.buf == nil || s.done != nil {
return
}
s.done = make(chan struct{})
s.mu.Lock()
if s.buf == nil || s.done != nil {
s.mu.Unlock()
return
}
s.done = make(chan struct{})
s.mu.Unlock()

go func() {
for packet := range s.buf {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *Sender) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Parent uint32 `json:"parent,omitempty"`
Parents []*uint32 `json:"parents,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
Drops int `json:"drops,omitempty"`
Expand All @@ -207,8 +207,10 @@ func (s *Sender) MarshalJSON() ([]byte, error) {
Packets: s.Packets,
Drops: s.Drops,
}
if s.parent != nil {
v.Parent = s.parent.id
if s.parents != nil {
for _, parent := range s.parents {
v.Parents = append(v.Parents, &parent.id)
}
}
return json.Marshal(v)
}