Skip to content

Commit

Permalink
internal: ensure consistency of proto traversals
Browse files Browse the repository at this point in the history
  • Loading branch information
zephyrtronium committed Dec 17, 2020
1 parent 019984e commit ea89a3c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 65 deletions.
138 changes: 75 additions & 63 deletions internal/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,47 +59,56 @@ type protoLink struct {
// equal to this value, and therefore writers should avoid using it.
var logicalDeleted = new(Object)

// protoHead returns the object's first proto.
func (o *Object) protoHead() *Object {
p := (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p))))
if p == logicalDeleted {
// If the head is logically deleted, then someone holds its lock.
// Rather than simply spinning while waiting for it to become valid
// again, we can try to acquire the lock so the runtime can park this
// goroutine. Then, since we hold the lock, we can be certain the node
// is in a valid state and there are no concurrent writers, so we can
// read it normally once.
o.protos.mu.Lock()
p = o.protos.p
o.protos.mu.Unlock()
// protoHead returns the object's first proto and the link to the next.
func (o *Object) protoHead() (p *Object, n *protoLink) {
for {
p = (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p))))
if p == logicalDeleted {
// If the head is logically deleted, then someone holds its lock.
// Rather than simply spinning while waiting for it to become valid
// again, we can try to acquire the lock so the runtime can park
// this goroutine. Then, since we hold the lock, we can be certain
// the node is in a valid state and there are no concurrent
// writers, so we can read it non-atomically.
o.protos.mu.Lock()
p = o.protos.p
o.protos.mu.Unlock()
}
n = (*protoLink)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.n))))
// In order to ensure consistency, we require the current proto to
// match the one we got at the start, otherwise we might return the
// first proto corresponding to one list and the link of another.
if p == (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)))) {
return p, n
}
// If we don't have such a match, then it's fine to just try again,
// because protos are rarely modified.
}
return p
}

// nextR returns the next link and its data. If the link is nil, then this was
// the last element in the list. This method is suitable only when not
// modifying the list, as it may follow links that are being modified.
func (l *protoLink) nextR() (*Object, *protoLink) {
n := (*protoLink)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&l.n))))
if n != nil {
// As a reader, whenever the link is not nil, we can use it.
p := (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&n.p))))
return p, n
// iterR returns the node's data and its next link. p is nil when the iteration
// reaches the end of the list. This method is suitable only when not modifying
// the list, as it may follow links that are being modified. This method must
// not be called on the list head rooted on an object; use protoHead instead.
func (l *protoLink) iterR() (p *Object, n *protoLink) {
if l == nil {
return nil, nil
}
return nil, nil
p = (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&l.p))))
n = (*protoLink)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&l.n))))
return p, n
}

// Protos returns a snapshot of the object's protos. The result is nil if o has
// no protos. This is more efficient than using ForeachProto to construct a
// slice.
func (o *Object) Protos() []*Object {
var r []*Object
p := o.protoHead()
p, n := o.protoHead()
if p == nil {
return nil
}
r = append(r, p)
for p, n := o.protos.nextR(); n != nil; p, n = n.nextR() {
r := []*Object{p} // vast majority of objects have one proto
for p, n = n.iterR(); p != nil; p, n = n.iterR() {
r = append(r, p)
}
return r
Expand All @@ -108,10 +117,11 @@ func (o *Object) Protos() []*Object {
// ForeachProto calls exec on each of the object's protos. exec must not modify
// o's protos list. If exec returns false, then the iteration ceases.
func (o *Object) ForeachProto(exec func(p *Object) bool) {
if p := o.protoHead(); p == nil || !exec(p) {
p, n := o.protoHead()
if p == nil || !exec(p) {
return
}
for p, n := o.protos.nextR(); n != nil; p, n = n.nextR() {
for p, n := n.iterR(); p != nil; p, n = n.iterR() {
if !exec(p) {
return
}
Expand Down Expand Up @@ -185,39 +195,41 @@ func (o *Object) PrependProto(proto *Object) {
// RemoveProto removes all instances of a proto from the object's protos list.
// Comparison is done by identity only.
func (o *Object) RemoveProto(proto *Object) {
// This could be more optimized, but it's a bit too subtle to ensure
// atomicity – and too rare of a call – to justify the effort.
o.protos.mu.Lock()
for atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(proto), unsafe.Pointer(logicalDeleted)) {
if o.protos.n == nil {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), nil)
o.protos.mu.Unlock()
return
var r []*Object
// Note that we can call ForeachProto while the head's mutex is held
// because ForeachProto only locks via protoHead, which in turn only locks
// when o.protos.p is logicalDeleted, which only happens while there's
// another active writer, which requires the lock itself.
o.ForeachProto(func(p *Object) bool {
if p != proto {
r = append(r, p)
}
n := o.protos.n
n.mu.Lock()
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.n)), unsafe.Pointer(n.n))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(n.p))
n.mu.Unlock()
}
if atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p))) == nil {
o.protos.mu.Unlock()
return
}
prev := &o.protos
cur := o.protos.n
for cur != nil {
cur.mu.Lock()
p := (*Object)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&cur.p))))
if p == proto {
next := (*protoLink)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&cur.n))))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&prev.n)), unsafe.Pointer(next))
cur.mu.Unlock()
cur = next
} else {
prev.mu.Unlock()
prev, cur = cur, cur.n
return true
})
// However, we can't call SetProtos while the mutex is held.
switch len(r) {
case 0:
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), nil)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.n)), nil)
case 1:
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(logicalDeleted))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.n)), nil)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(r[0]))
default:
n := &protoLink{p: r[1]}
m := n
for i := 2; i < len(r); i++ {
n.n = &protoLink{p: r[i]}
n = n.n
}
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(logicalDeleted))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.n)), unsafe.Pointer(m))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&o.protos.p)), unsafe.Pointer(r[0]))
}
prev.mu.Unlock()
o.protos.mu.Unlock()
}

// syncSlot is a synchronized slot. Once a particular VM accesses this slot,
Expand Down Expand Up @@ -726,7 +738,7 @@ func (vm *VM) GetSlotSync(obj *Object, slot string) (s SyncSlot, proto *Object)
func (vm *VM) getSlotAncestor(obj *Object, slot string) (sy *syncSlot, proto *Object) {
vm.protoSet.Reset()
vm.protoSet.Add(obj.UniqueID())
obj = obj.protoHead()
obj, link := obj.protoHead()
if obj == nil {
return nil, nil
}
Expand All @@ -736,7 +748,7 @@ func (vm *VM) getSlotAncestor(obj *Object, slot string) (sy *syncSlot, proto *Ob
if vm.protoSet.Add(obj.UniqueID()) {
vm.protoStack = append(vm.protoStack, obj)
}
for p, link := obj.protos.nextR(); link != nil; p, link = link.nextR() {
for p, link := link.iterR(); p != nil; p, link = link.iterR() {
if vm.protoSet.Add(p.UniqueID()) {
vm.protoStack = append(vm.protoStack, p)
}
Expand All @@ -753,14 +765,14 @@ func (vm *VM) getSlotAncestor(obj *Object, slot string) (sy *syncSlot, proto *Ob
}
vm.protoStack = vm.protoStack[:len(vm.protoStack)-1] // actually pop
start = len(vm.protoStack)
p := obj.protoHead()
p, link := obj.protoHead()
if p == nil {
continue
}
if vm.protoSet.Add(p.UniqueID()) {
vm.protoStack = append(vm.protoStack, p)
}
for p, link := obj.protos.nextR(); link != nil; p, link = link.nextR() {
for p, link := link.iterR(); link != nil; p, link = link.iterR() {
if vm.protoSet.Add(p.UniqueID()) {
vm.protoStack = append(vm.protoStack, p)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/slots_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestProtoHead(t *testing.T) {
cases := map[string]func(t *testing.T){
"uncontested": func(t *testing.T) {
obj := vm.ObjectWith(nil, []*Object{vm.BaseObject}, nil, nil)
r := obj.protoHead()
r, _ := obj.protoHead()
if r != vm.BaseObject {
t.Errorf("wrong protoHead: wanted %v, got %v", vm.BaseObject, r)
}
Expand All @@ -28,7 +28,8 @@ func TestProtoHead(t *testing.T) {
sig := new(Object)
go func() {
ch <- sig // Signal that this goroutine is running.
ch <- obj.protoHead()
r, _ := obj.protoHead()
ch <- r
close(ch) // Avoid hanging if protoHead succeeds immediately.
}()
<-ch // Wait for the new goroutine to start.
Expand Down

0 comments on commit ea89a3c

Please sign in to comment.