Skip to content

Commit

Permalink
Fix negative failed collections count
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Oct 23, 2024
1 parent 6282a77 commit f43bd36
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 5 deletions.
15 changes: 14 additions & 1 deletion odc/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,20 @@ void Controller::extractRequirements(const CommonParams& common, Session& sessio
}

// TODO: should n_current be set to 0 and increased as collections are launched instead?
session.mCollections[c->getName()] = CollectionInfo{c->getName(), zone, agentGroup, topoParent, topoPath, n, n, nmin, nCores, numTasks, numTasksTotal, std::unordered_map<uint64_t, uint64_t>()};
session.mCollections[c->getName()] = CollectionInfo{
c->getName(),
zone,
agentGroup,
topoParent,
topoPath,
n,
n,
nmin,
nCores,
numTasks,
numTasksTotal,
std::unordered_map<DDSCollection::Id, uint64_t>(),
std::unordered_set<DDSCollection::Id>()};

auto agiIt = session.mAgentGroupInfo.find(agentGroup);
if (agiIt == session.mAgentGroupInfo.end()) {
Expand Down
9 changes: 6 additions & 3 deletions odc/Topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,13 @@ class BasicTopology : public AsioBase<Executor, Allocator>
if (it != mSession.mCollections.end()) {
CollectionInfo& colInfo = it->second;
// one collection failed
colInfo.nCurrent--;
if (colInfo.mFailedRuntimeCollections.find(device.collectionId) == colInfo.mFailedRuntimeCollections.end()) {
colInfo.mFailedRuntimeCollections.insert(device.collectionId);
colInfo.nCurrent--;
}
// check nMin condition
if (CheckNmin(colInfo.nCurrent, colInfo.nMin, runtimeCollection.m_collectionPath, col->getPath(), device.collectionId)) {
IgnoreCollection(device.collectionId);
IgnoreCollectionDevices(device.collectionId);

// TODO: shutdown agent only if it has no tasks left
uint64_t agentId = colInfo.mRuntimeCollectionAgents.at(device.collectionId);
Expand Down Expand Up @@ -334,7 +337,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
}

// precondition: mMtx is locked.
void IgnoreCollection(odc::core::DDSCollection::Id id)
void IgnoreCollectionDevices(odc::core::DDSCollection::Id id)
{
for (auto& device : mStateData) {
if (device.collectionId == id) {
Expand Down
3 changes: 2 additions & 1 deletion odc/TopologyDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ struct CollectionInfo
int nCores;
int32_t numTasks;
int32_t totalTasks;
std::unordered_map<uint64_t, uint64_t> mRuntimeCollectionAgents; ///< runtime collection ID -> agent ID
std::unordered_map<DDSCollection::Id, uint64_t> mRuntimeCollectionAgents; ///< runtime collection ID -> agent ID
std::unordered_set<DDSCollection::Id> mFailedRuntimeCollections;

friend std::ostream& operator<<(std::ostream& os, const CollectionInfo& ci)
{
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ add_nmin_test(nmin_lt_n "" "Status code: ERROR")
# nmin is less than n - recovery should be possible if conditions satisfied and failing processes are hanging before idle
add_nmin_test(nmin_lt_n_hanging_before_idle "" "Status code: ERROR")

# nmin is less than n - recovery not possible if all processes are hanging before idle
add_nmin_test(nmin_lt_n_hanging_before_idle_failure "Status code: ERROR" "")

# nmin is less than n - recovery should be possible if conditions satisfied and failing processes are hanging in init
add_nmin_test(nmin_lt_n_hanging_in_init "" "Status code: ERROR")

Expand Down
63 changes: 63 additions & 0 deletions tests/topos/nmin_lt_n_hanging_before_idle_failure.xml.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<topology name="Example">

<declrequirement name="onlineReq" type="groupname" value="online"/>
<declrequirement name="calibReq" type="groupname" value="calib"/>
<declrequirement name="odc_nmin_Processors" type="custom" value="2"/>

<property name="fmqchan_data1" />
<property name="fmqchan_data2" />

<decltask name="Sampler">
<exe>odc-ex-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P odc --severity trace</exe>
<env reachable="false">@CMAKE_INSTALL_PREFIX@/@PROJECT_INSTALL_BINDIR@/odc-ex-env.sh</env>
<properties>
<name access="write">fmqchan_data1</name>
</properties>
</decltask>

<decltask name="Processor">
<exe>odc-ex-processor --color false --problem hang --problem-state pre-idle --problem-paths main/ProcessorGroup/Processors_.*/Processor_.* --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P odc --severity trace</exe>
<env reachable="false">@CMAKE_INSTALL_PREFIX@/@PROJECT_INSTALL_BINDIR@/odc-ex-env.sh</env>
<properties>
<name access="read">fmqchan_data1</name>
<name access="read">fmqchan_data2</name>
</properties>
</decltask>

<decltask name="Sink">
<exe>odc-ex-sink --color false --channel-config name=data2,type=pull,method=bind -P odc --severity trace</exe>
<env reachable="false">@CMAKE_INSTALL_PREFIX@/@PROJECT_INSTALL_BINDIR@/odc-ex-env.sh</env>
<properties>
<name access="write">fmqchan_data2</name>
</properties>
</decltask>

<declcollection name="SamplersSinks">
<requirements>
<name>calibReq</name>
</requirements>
<tasks>
<name>Sampler</name>
<name>Sink</name>
</tasks>
</declcollection>

<declcollection name="Processors">
<requirements>
<name>onlineReq</name>
<name>odc_nmin_Processors</name>
</requirements>
<tasks>
<name>Processor</name>
<name>Processor</name>
</tasks>
</declcollection>

<main name="main">
<collection>SamplersSinks</collection>
<group name="ProcessorGroup" n="4">
<collection>Processors</collection>
</group>
</main>

</topology>

0 comments on commit f43bd36

Please sign in to comment.