-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathping.C
147 lines (120 loc) · 4.02 KB
/
ping.C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// This test creates two array chares: the Pingers and the Pingees
// Each Pinger performs a number of iterations in which it sends to each Pingee
// the array index of the Pinger and the current iteration.
// Each Pingee starts with an empty
// std::unordered_map<int, std::unordered_multiset<int>>>.
// Each time the Pingee is called, it inserts the received array index of the
// Pinger into the std::unordered_multiset<int> retrieved from the
// std::unordered_map at the received iteration.
// At the end, it is checked that the std::unordered_map has an entry at each
// iteration, and that for each iteration, the std::unordered_multiset contains
// one entry for each Pinger.
#include "ping.decl.h"
#include <chrono>
#include <unordered_map>
#include <unordered_set>
/*readonly*/ CProxy_Main mainProxy;
/*readonly*/ CProxy_Pingers pingersProxy;
/*readonly*/ CProxy_Pingees pingeesProxy;
namespace {
// c++14 string literal for milliseconds
constexpr std::chrono::milliseconds operator "" _ms(unsigned long long ms) {
return std::chrono::milliseconds(ms);
}
static constexpr std::chrono::milliseconds spin_time = 10_ms;
static constexpr int number_of_pingers = 10;
static constexpr int number_of_pingees = 1;
static constexpr int number_of_iterations = 4;
// wait `time_to_spin` milliseconds
void spin(const std::chrono::milliseconds& time_to_spin) {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < time_to_spin) {
}
}
} // namespace
class Main : public CBase_Main {
private:
int migrations{0};
public:
Main(CkArgMsg* msg) {
delete msg;
pingersProxy = CProxy_Pingers::ckNew(number_of_pingers);
pingeesProxy = CProxy_Pingees::ckNew(number_of_pingees);
CkStartQD(CkCallback(CkIndex_Main::execute(), mainProxy));
}
void execute() {
CkPrintf("Main is in phase execute\n");
pingersProxy.send_pings();
CkStartQD(CkCallback(CkIndex_Main::check(), mainProxy));
}
void check() {
CkPrintf("Main is in phase check\n");
pingeesProxy.check(migrations);
CkStartQD(CkCallback(CkIndex_Main::exit(), mainProxy));
}
void exit() {
CkPrintf("Main is in phase exit\n");
CkExit();
}
void migrated() {
++migrations;
CkPrintf("Migrations done: %i\n", migrations);
}
void count_errors(const int errors) {
if (errors > 0) {
CkPrintf("Errors: %i\n", errors);
CkAbort("Test failed!\n");
}
}
};
class Pingers : public CBase_Pingers {
public:
Pingers() {}
Pingers(CkMigrateMessage* /*msg*/) {}
void send_pings() {
for (int iteration = 1; iteration <= number_of_iterations; ++iteration) {
spin(spin_time);
pingeesProxy.receive_ping(iteration, thisIndex);
}
}
};
class Pingees : public CBase_Pingees {
private:
std::unordered_map<int, std::unordered_multiset<int>> pings{};
int migrations{0};
int initial_proc{-1};
public:
Pingees() : initial_proc(CkMyPe()) {}
Pingees(CkMigrateMessage* /*msg*/) {}
void receive_ping(const int iteration, const int index_of_pinger) {
pings[iteration].insert(index_of_pinger);
}
void check(const int migrations_recorded_by_main) {
CkAssert(migrations == migrations_recorded_by_main);
// RotateLB should increase proc at each migration
CkAssert(CkMyPe() == ((initial_proc + migrations) % CkNumPes()));
int errors = 0;
for (int i = 1; i <= number_of_iterations; ++i) {
CkAssert(pings.count(i) == 1);
for (int p = 0; p < number_of_pingers; ++p) {
if (pings.at(i).count(p) != 1) {
++errors;
CkPrintf("Pingee %i unexpected count %zu on iteration %i for pinger"
" %i\n", thisIndex, pings.at(i).count(p), i, p);
}
}
}
contribute(sizeof(int), &errors, CkReduction::sum_int,
CkCallback(CkReductionTarget(Main, count_errors), mainProxy));
}
void pup(PUP::er& p) {
p | pings;
p | migrations;
p | initial_proc;
if (p.isUnpacking()) {
++migrations;
contribute(CkCallback(CkReductionTarget(Main, migrated), mainProxy));
}
}
};
#include "ping.def.h"