Skip to content

Commit

Permalink
fix: removed potential race conditions (#26)
Browse files Browse the repository at this point in the history
fix: removed potential race conditions
  • Loading branch information
moul authored Dec 28, 2019
2 parents 9b364ce + 2922be9 commit 216dc9a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
13 changes: 8 additions & 5 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type EventEmitter struct {

func (e *EventEmitter) UnsubscribeAll() {
e.lock.RLock()
subs := e.Subscribers
subs := append([]*eventSubscription(nil), e.Subscribers...)
e.lock.RUnlock()

for _, c := range subs {
Expand All @@ -43,7 +43,7 @@ func (e *EventEmitter) UnsubscribeAll() {

func (e *EventEmitter) Emit(evt Event) {
e.lock.RLock()
subs := e.Subscribers
subs := append([]*eventSubscription(nil), e.Subscribers...)
e.lock.RUnlock()

for _, s := range subs {
Expand Down Expand Up @@ -84,13 +84,16 @@ func (e *EventEmitter) Subscribe(ctx context.Context, handler func(Event)) {

func (e *EventEmitter) unsubscribe(c *eventSubscription) {
e.lock.Lock()
subs := append([]*eventSubscription(nil), e.Subscribers...)
defer e.lock.Unlock()

for i, s := range e.Subscribers {
for i, s := range subs {
if s == c {
c.Cancel()
e.Subscribers[len(e.Subscribers)-1], e.Subscribers[i] = e.Subscribers[i], e.Subscribers[len(e.Subscribers)-1]
e.Subscribers = e.Subscribers[:len(e.Subscribers)-1]

subs[i] = subs[len(subs)-1]
e.Subscribers = subs[:len(subs)-1]

return
}
}
Expand Down
53 changes: 34 additions & 19 deletions stores/basestore/base_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ func (b *BaseStore) AccessController() accesscontroller.Interface {
return b.access
}

func (b *BaseStore) Replicator() replicator.Replicator {
b.lock.RLock()
defer b.lock.RUnlock()

return b.replicator
}

func (b *BaseStore) Cache() datastore.Datastore {
b.lock.RLock()
defer b.lock.RUnlock()

return b.cache
}

// InitBaseStore Initializes the store base
func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, identity *identityprovider.Identity, addr address.Address, options *iface.NewStoreOptions) error {
var err error
Expand All @@ -94,6 +108,7 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
return errors.New("identity required")
}

b.lock.Lock()
b.storeType = "store"
b.id = addr.String()
b.identity = identity
Expand All @@ -114,7 +129,6 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
}
}

b.lock.Lock()
b.oplog, err = ipfslog.NewLog(ipfs, identity, &ipfslog.LogOptions{
ID: b.id,
AccessController: b.access,
Expand All @@ -125,14 +139,14 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
return errors.New("unable to instantiate an IPFS log")
}

b.lock.Lock()
if options.Index == nil {
options.Index = NewBaseIndex
}

b.index = options.Index(b.identity.PublicKey)
b.replicationStatus = replicator.NewReplicationInfo()

b.lock.Lock()
b.stats.snapshot.bytesLoaded = -1

b.replicator = replicator.NewReplicator(ctx, b, options.ReplicationConcurrency)
Expand All @@ -156,7 +170,7 @@ func (b *BaseStore) InitBaseStore(ctx context.Context, ipfs coreapi.CoreAPI, ide
b.options = options
b.lock.Unlock()

go b.replicator.Subscribe(ctx, func(e events.Event) {
go b.Replicator().Subscribe(ctx, func(e events.Event) {
switch e.(type) {
case *replicator.EventLoadAdded:
evt := e.(*replicator.EventLoadAdded)
Expand Down Expand Up @@ -204,7 +218,7 @@ func (b *BaseStore) Close() error {
}

// Replicator teardown logic
b.replicator.Stop()
b.Replicator().Stop()

// Reset replication statistics
b.replicationStatus.Reset()
Expand All @@ -219,7 +233,7 @@ func (b *BaseStore) Close() error {

b.UnsubscribeAll()

err := b.cache.Close()
err := b.Cache().Close()
if err != nil {
return errors.Wrap(err, "unable to close cache")
}
Expand Down Expand Up @@ -257,13 +271,14 @@ func (b *BaseStore) Drop() error {
// TODO: Destroy cache? b.cache.Delete()

// Reset
b.index = b.options.Index(b.identity.PublicKey)
b.lock.Lock()
defer b.lock.Unlock()

b.index = b.options.Index(b.identity.PublicKey)
b.oplog, err = ipfslog.NewLog(b.ipfs, b.identity, &ipfslog.LogOptions{
ID: b.id,
AccessController: b.access,
})
b.lock.Unlock()

if err != nil {
return errors.Wrap(err, "unable to create log")
Expand All @@ -280,7 +295,7 @@ func (b *BaseStore) Load(ctx context.Context, amount int) error {
}

var localHeads, remoteHeads []*entry.Entry
localHeadsBytes, err := b.cache.Get(datastore.NewKey("_localHeads"))
localHeadsBytes, err := b.Cache().Get(datastore.NewKey("_localHeads"))
if err != nil {
return errors.Wrap(err, "unable to get local heads from cache")
}
Expand All @@ -290,7 +305,7 @@ func (b *BaseStore) Load(ctx context.Context, amount int) error {
return errors.Wrap(err, "unable to unmarshal cached local heads")
}

remoteHeadsBytes, err := b.cache.Get(datastore.NewKey("_remoteHeads"))
remoteHeadsBytes, err := b.Cache().Get(datastore.NewKey("_remoteHeads"))
if err != nil && err != datastore.ErrNotFound {
return errors.Wrap(err, "unable to get data from cache")
}
Expand Down Expand Up @@ -408,13 +423,13 @@ func (b *BaseStore) Sync(ctx context.Context, heads []ipfslog.Entry) error {
savedEntriesCIDs = append(savedEntriesCIDs, hash)
}

b.replicator.Load(ctx, savedEntriesCIDs)
b.Replicator().Load(ctx, savedEntriesCIDs)

return nil
}

func (b *BaseStore) LoadMoreFrom(ctx context.Context, amount uint, cids []cid.Cid) {
b.replicator.Load(ctx, cids)
b.Replicator().Load(ctx, cids)
// TODO: can this return an error?
}

Expand All @@ -430,7 +445,7 @@ func (b *BaseStore) SaveSnapshot(ctx context.Context) (cid.Cid, error) {
// JS behavior for the sake of compatibility across implementations
// TODO: avoid using `*entry.Entry`?

unfinished := b.replicator.GetQueue()
unfinished := b.Replicator().GetQueue()

b.lock.RLock()
oplog := b.oplog
Expand Down Expand Up @@ -491,7 +506,7 @@ func (b *BaseStore) SaveSnapshot(ctx context.Context) (cid.Cid, error) {
return cid.Cid{}, errors.Wrap(err, "unable to save log data on store")
}

err = b.cache.Put(datastore.NewKey("snapshot"), []byte(snapshotPath.Cid().String()))
err = b.Cache().Put(datastore.NewKey("snapshot"), []byte(snapshotPath.Cid().String()))
if err != nil {
return cid.Cid{}, errors.Wrap(err, "unable to add snapshot data to cache")
}
Expand All @@ -501,7 +516,7 @@ func (b *BaseStore) SaveSnapshot(ctx context.Context) (cid.Cid, error) {
return cid.Cid{}, errors.Wrap(err, "unable to marshal unfinished cids")
}

err = b.cache.Put(datastore.NewKey("queue"), unfinishedJSON)
err = b.Cache().Put(datastore.NewKey("queue"), unfinishedJSON)
if err != nil {
return cid.Cid{}, errors.Wrap(err, "unable to add unfinished data to cache")
}
Expand All @@ -514,7 +529,7 @@ func (b *BaseStore) SaveSnapshot(ctx context.Context) (cid.Cid, error) {
func (b *BaseStore) LoadFromSnapshot(ctx context.Context) error {
b.Emit(stores.NewEventLoad(b.address, nil))

queueJSON, err := b.cache.Get(datastore.NewKey("queue"))
queueJSON, err := b.Cache().Get(datastore.NewKey("queue"))
if err != nil && err != datastore.ErrNotFound {
return errors.Wrap(err, "unable to get value from cache")
}
Expand All @@ -538,7 +553,7 @@ func (b *BaseStore) LoadFromSnapshot(ctx context.Context) error {
}
}

snapshot, err := b.cache.Get(datastore.NewKey("snapshot"))
snapshot, err := b.Cache().Get(datastore.NewKey("snapshot"))
if err == datastore.ErrNotFound {
return errors.Wrap(err, "not found")
}
Expand Down Expand Up @@ -667,7 +682,7 @@ func (b *BaseStore) AddOperation(ctx context.Context, op operation.Operation, on
return nil, errors.Wrap(err, "unable to marshal entry")
}

err = b.cache.Put(datastore.NewKey("_localHeads"), marshaledEntry)
err = b.Cache().Put(datastore.NewKey("_localHeads"), marshaledEntry)
if err != nil {
return nil, errors.Wrap(err, "unable to add data to cache")
}
Expand Down Expand Up @@ -746,7 +761,7 @@ func (b *BaseStore) replicationLoadComplete(logs []ipfslog.Log) {
}
}
b.replicationStatus.DecreaseQueued(len(logs))
b.replicationStatus.SetBuffered(b.replicator.GetBufferLen())
b.replicationStatus.SetBuffered(b.Replicator().GetBufferLen())
err := b.updateIndex()
if err != nil {
logger().Error("unable to update index", zap.Error(err))
Expand All @@ -762,7 +777,7 @@ func (b *BaseStore) replicationLoadComplete(logs []ipfslog.Log) {
return
}

err = b.cache.Put(datastore.NewKey("_remoteHeads"), headsBytes)
err = b.Cache().Put(datastore.NewKey("_remoteHeads"), headsBytes)
if err != nil {
logger().Error("unable to update heads cache", zap.Error(err))
return
Expand Down

0 comments on commit 216dc9a

Please sign in to comment.