Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add templated utility class to centralize common pattern #1840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ cc_library(
"src/lib/IlmThread/IlmThreadMutex.h",
"src/lib/IlmThread/IlmThreadNamespace.h",
"src/lib/IlmThread/IlmThreadPool.h",
"src/lib/IlmThread/IlmThreadProcessGroup.h",
"src/lib/IlmThread/IlmThreadSemaphore.h",
],
copts = select({
Expand Down
1 change: 1 addition & 0 deletions src/lib/IlmThread/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ openexr_define_library(IlmThread
IlmThreadMutex.h
IlmThreadNamespace.h
IlmThreadPool.h
IlmThreadProcessGroup.h
IlmThreadSemaphore.h
DEPENDENCIES
OpenEXR::Config
Expand Down
154 changes: 154 additions & 0 deletions src/lib/IlmThread/IlmThreadProcessGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//
// SPDX-License-Identifier: BSD-3-Clause
// Copyright (c) Contributors to the OpenEXR Project.
//

#ifndef INCLUDED_ILM_THREAD_PROCESS_GROUP_H
#define INCLUDED_ILM_THREAD_PROCESS_GROUP_H

//-----------------------------------------------------------------------------
//
// Class ProcessGroup is a templated inline helper for constraining
// task contexts to a number of threads. It maintains a list of
// contexts and then can hand them out one at a time, waiting for a
// previous thread request to finish before handing out more,
// preventing over-subscription / allocation of contexts.
//
//-----------------------------------------------------------------------------

#include "IlmThreadConfig.h"
#include "IlmThreadExport.h"
#include "IlmThreadNamespace.h"
#include "IlmThreadSemaphore.h"

#include "Iex.h"

#include <atomic>
#include <string>
#include <type_traits>
#include <vector>

ILMTHREAD_INTERNAL_NAMESPACE_HEADER_ENTER

template <typename P,
std::enable_if_t <
std::is_default_constructible <P>::value &&
std::is_same <decltype (P {}.next), P *>::value, bool> = true>
class ProcessGroup
{
public:
using Process = P;

ProcessGroup (unsigned int numThreads)
: _sem (numThreads)
, _avail_head (nullptr)
, _first_failure (nullptr)
{
_fixed_pool.resize (numThreads);
for ( unsigned int i = 0; i < numThreads; ++i )
{
if (i == (numThreads - 1))
_fixed_pool[i].next = nullptr;
else
_fixed_pool[i].next = &(_fixed_pool[i+1]);
}
_avail_head = &(_fixed_pool[0]);
}

ProcessGroup (const ProcessGroup&) = delete;
ProcessGroup& operator= (const ProcessGroup&) = delete;
ProcessGroup (ProcessGroup&&) = default;
ProcessGroup& operator= (ProcessGroup&&) = delete;
~ProcessGroup()
{
std::string *cur = _first_failure.load ();
delete cur;
}

void push (Process *p)
{
Process* oldhead = _avail_head.load (std::memory_order_relaxed);

do
{
p->next = oldhead;
} while (!_avail_head.compare_exchange_weak (
oldhead, p,
std::memory_order_release,
std::memory_order_relaxed));

// notify someone else there's one available
_sem.post ();
}

// called by the thread dispatching work units, may block
Process* pop ()
{
Process* ret = nullptr;

// we do not have to worry about ABA problems as
// we have a static pool of items we own, we're just
// putting them here and popping them off.

// used for honoring the numThreads, as pop
// should only be called by the one thread
// waiting to submit thread calls
_sem.wait ();

ret = _avail_head.load (std::memory_order_acquire);

Process* newhead;
do
{
if (!ret)
std::cerr << "GACK: serious failure case???" << std::endl;

newhead = ret->next;
} while ( !_avail_head.compare_exchange_weak(
ret, newhead, std::memory_order_acquire));

return ret;
}

void record_failure (const char *e)
{
// should we construct a list of failures if there are
// more than one? seems less confusing to just report
// the first we happened to record

std::string *cur = _first_failure.load ();
if (!cur)
{
std::string *msg = new std::string (e);
if (! _first_failure.compare_exchange_strong (cur, msg))
delete msg;
}
}

void throw_on_failure ()
{
std::string *cur = _first_failure.load ();
_first_failure.store (nullptr);

if (cur)
{
std::string msg (*cur);
delete cur;

throw IEX_NAMESPACE::IoExc (msg);
}
}
private:
Semaphore _sem;

std::vector<Process> _fixed_pool;

std::atomic<Process *> _avail_head;

std::atomic<std::string *> _first_failure;
};


ILMTHREAD_INTERNAL_NAMESPACE_HEADER_EXIT

#endif // INCLUDED_ILM_THREAD_POOL_H
Loading
Loading