Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved TaskCollaboration #14

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ API

- Sampler : Added `populate()` method, which populates the internal tile cache in parallel, and subsequently allows `sample()` to be called concurrently.

Fixes
-----

- ValuePlug : Fixed hangs and poor performance caused by plugs depending on upstream plugs with an identical hash (#4978).

API
---

- ValuePlug : Added `Default` CachePolicy and deprecated `Standard`, `TaskIsolation` and `Legacy` policies.
- Process : Added `acquireCollaborativeResult()` method, providing an improved mechanism for multiple threads to collaborate on TBB tasks spawned by a single process they all depend on.

1.3.4.0 (relative to 1.3.3.0)
=======

Expand Down
67 changes: 21 additions & 46 deletions include/Gaffer/Private/IECorePreview/LRUCache.inl
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,12 @@ class Serial
return m_it->cacheEntry;
}

// Returns true if it is OK to call `writable()`.
// This is typically determined by the AcquireMode
// passed to `acquire()`, with special cases for
// recursion.
// Returns true if it is OK to call `writable()`. This is determined
// by the AcquireMode passed to `acquire()`.
bool isWritable() const
{
// Because this policy is serial, it would technically
// always be OK to write. But we return false for recursive
// calls to avoid unnecessary overhead updating the LRU list
// for inner calls.
return m_it->handleCount == 1;
// Because this policy is serial, it is always OK to write
return true;
}

// Executes the functor F. This is used to
Expand Down Expand Up @@ -733,20 +728,19 @@ class TaskParallel

CacheEntry &writable()
{
assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write );
assert( m_itemLock.isWriter() );
return m_item->cacheEntry;
}

// May return false for AcquireMode::Insert if a GetterFunction recurses.
bool isWritable() const
{
return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write;
return m_itemLock.isWriter();
}

template<typename F>
void execute( F &&f )
{
if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write )
if( m_spawnsTasks )
{
// The getter function will spawn tasks. Execute
// it via the TaskMutex, so that other threads trying
Expand Down Expand Up @@ -814,16 +808,9 @@ class TaskParallel
// found a pre-existing item we optimistically take
// just a read lock, because it is faster when
// many threads just need to read from the same
// cached item. We accept WorkerRead locks when necessary,
// to support Getter recursion.
TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead;
if( inserted || mode == FindWritable || mode == InsertWritable )
{
lockType = TaskMutex::ScopedLock::LockType::Write;
}

// cached item.
const bool acquired = m_itemLock.acquireOr(
it->mutex, lockType,
it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable,
// Work accepter
[&binLock, canceller] ( bool workAvailable ) {
// Release the bin lock prior to accepting work, because
Expand All @@ -836,26 +823,15 @@ class TaskParallel
// The only canceller being checked at that
// point will be the one passed to the
// `LRUCache::get()` call that we work in
// service of. This isn't ideal, as it can cause
// UI stalls if one UI element is waiting to
// cancel an operation, but it's tasks have been
// "captured" by collaboration on a compute
// started by another UI element (which hasn't
// requested cancellation). One alternative is
// that we would only accept work if our
// canceller matches the one in use by the
// original caller. This would rule out
// collaboration between UI elements, but would
// still allow diamond dependencies in graph
// evaluation to use collaboration.
// service of.
return (!canceller || !canceller->cancelled());
}
);

if( acquired )
{
if(
m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read &&
!m_itemLock.isWriter() &&
mode == Insert && it->cacheEntry.status() == LRUCache::Uncached
)
{
Expand Down Expand Up @@ -1108,6 +1084,7 @@ Value LRUCache<Key, Value, Policy, GetterKey>::get( const GetterKey &key, const

if( status==Uncached )
{
assert( handle.isWritable() );
Value value = Value();
Cost cost = 0;
try
Expand All @@ -1120,24 +1097,21 @@ Value LRUCache<Key, Value, Policy, GetterKey>::get( const GetterKey &key, const
}
catch( ... )
{
if( handle.isWritable() && m_cacheErrors )
if( m_cacheErrors )
{
handle.writable().state = std::current_exception();
}
throw;
}

if( handle.isWritable() )
{
assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow
assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention.
assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow
assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention.

setInternal( key, handle.writable(), value, cost );
m_policy.push( handle );
setInternal( key, handle.writable(), value, cost );
m_policy.push( handle );

handle.release();
limitCost( m_maxCost );
}
handle.release();
limitCost( m_maxCost );

return value;
}
Expand Down Expand Up @@ -1202,8 +1176,9 @@ bool LRUCache<Key, Value, Policy, GetterKey>::setIfUncached( const Key &key, con
const Status status = cacheEntry.status();

bool result = false;
if( status == Uncached && handle.isWritable() )
if( status == Uncached )
{
assert( handle.isWritable() );
result = setInternal( key, handle.writable(), value, costFunction( value ) );
m_policy.push( handle );

Expand Down
Loading
Loading