Skip to content

Commit 5f11a59

Browse files
committed
Some database-related changes
1 parent da4a681 commit 5f11a59

File tree

7 files changed

+38
-38
lines changed

7 files changed

+38
-38
lines changed

DAQController.cc

+1
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ int DAQController::Stop(){
184184
fStatus = DAXHelpers::Idle;
185185

186186
fLog->SetRunId(-1);
187+
fOptions.reset();
187188
std::cout<<"Finished end"<<std::endl;
188189
fStatus = DAXHelpers::Idle;
189190
return 0;

Options.cc

+11-15
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,18 @@
44

55
#include <cmath>
66

7-
#include <mongocxx/uri.hpp>
8-
#include <mongocxx/database.hpp>
97
#include <bsoncxx/array/view.hpp>
108
#include <bsoncxx/types.hpp>
119
#include <bsoncxx/json.hpp>
1210
#include <bsoncxx/builder/stream/document.hpp>
1311
#include <bsoncxx/exception/exception.hpp>
1412

1513
Options::Options(std::shared_ptr<MongoLog>& log, std::string options_name, std::string hostname,
16-
std::string suri, std::string dbname, std::string override_opts) :
17-
fLog(log), fDBname(dbname), fHostname(hostname) {
14+
mongocxx::collection* opts_collection, mongocxx::collection* dac_collection,
15+
mongocxx::collection* bm_collection, std::string override_opts) :
16+
fLog(log), fHostname(hostname), fDAC_collection(dac_collection),
17+
fBM_collection(bm_collection) {
1818
bson_value = NULL;
19-
mongocxx::uri uri{suri};
20-
fClient = mongocxx::client{uri};
21-
fDAC_collection = fClient[dbname]["dac_calibration"];
22-
mongocxx::collection opts_collection = fClient[dbname]["options"];
2319
if(Load(options_name, opts_collection, override_opts)!=0)
2420
throw std::runtime_error("Can't initialize options class");
2521
}
@@ -31,11 +27,11 @@ Options::~Options(){
3127
}
3228
}
3329

34-
int Options::Load(std::string name, mongocxx::collection& opts_collection,
30+
int Options::Load(std::string name, mongocxx::collection* opts_collection,
3531
std::string override_opts){
3632
// Try to pull doc from DB
3733
bsoncxx::stdx::optional<bsoncxx::document::value> trydoc;
38-
trydoc = opts_collection.find_one(bsoncxx::builder::stream::document{}<<
34+
trydoc = opts_collection->find_one(bsoncxx::builder::stream::document{}<<
3935
"name" << name.c_str() << bsoncxx::builder::stream::finalize);
4036
if(!trydoc){
4137
fLog->Entry(MongoLog::Warning, "Failed to find your options file '%s' in DB", name.c_str());
@@ -53,7 +49,7 @@ int Options::Load(std::string name, mongocxx::collection& opts_collection,
5349
try{
5450
bsoncxx::array::view include_array = (*trydoc).view()["includes"].get_array().value;
5551
for(bsoncxx::array::element ele : include_array){
56-
auto sd = opts_collection.find_one(bsoncxx::builder::stream::document{} <<
52+
auto sd = opts_collection->find_one(bsoncxx::builder::stream::document{} <<
5753
"name" << ele.get_utf8().value.to_string() <<
5854
bsoncxx::builder::stream::finalize);
5955
if(sd)
@@ -340,7 +336,7 @@ std::vector<uint16_t> Options::GetDAC(int bid, int num_chan, uint16_t default_va
340336
auto q = document{} << std::to_string(bid) << open_document << "$exists" << 1 << close_document << finalize;
341337
auto opts = mongocxx::options::find{};
342338
opts.sort(sort_order.view());
343-
auto cursor = fDAC_collection.find(std::move(q), opts);
339+
auto cursor = fDAC_collection->find(std::move(q), opts);
344340
auto doc = cursor.begin();
345341
if (doc == cursor.end() || doc->find(std::to_string(bid)) == doc->end()) {
346342
fLog->Entry(MongoLog::Local, "No baseline calibrations? You must be new");
@@ -374,14 +370,14 @@ void Options::UpdateDAC(std::map<int, std::vector<uint16_t>>& all_dacs){
374370
auto write_doc = update_doc<<finalize;
375371
mongocxx::options::update options;
376372
options.upsert(true);
377-
fDAC_collection.update_one(std::move(search_doc), std::move(write_doc), options);
373+
fDAC_collection->update_one(std::move(search_doc), std::move(write_doc), options);
378374
return;
379375
}
380376

381377
void Options::SaveBenchmarks(std::map<std::string, std::map<int, long>>& counters,
382378
long bytes, std::string sid, std::map<std::string, double>& times) {
383379
using namespace bsoncxx::builder::stream;
384-
int level = GetInt("benchmark_level", 1);
380+
int level = GetInt("benchmark_level", 0);
385381
if (level == 0) return;
386382
int run_id = GetInt("number", -1);
387383
std::map<std::string, std::map<int, long>> _counters;
@@ -419,6 +415,6 @@ void Options::SaveBenchmarks(std::map<std::string, std::map<int, long>>& counter
419415
auto write_doc = update_doc << finalize;
420416
mongocxx::options::update options;
421417
options.upsert(true);
422-
fClient[fDBname]["redax_benchmarks"].update_one(search_doc.view(), write_doc.view(), options);
418+
fBM_collection->update_one(search_doc.view(), write_doc.view(), options);
423419
return;
424420
}

Options.hh

+3-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <bsoncxx/document/view.hpp>
1111
#include <bsoncxx/document/value.hpp>
1212
#include <mongocxx/collection.hpp>
13-
#include <mongocxx/client.hpp>
1413

1514
struct BoardType{
1615
int link;
@@ -67,7 +66,7 @@ class MongoLog;
6766
class Options{
6867

6968
public:
70-
Options(std::shared_ptr<MongoLog>&, std::string, std::string, std::string, std::string, std::string);
69+
Options(std::shared_ptr<MongoLog>&, std::string, std::string, mongocxx::collection*, mongocxx::collection*, mongocxx::collection*, std::string);
7170
~Options();
7271

7372
int GetInt(std::string, int=-1);
@@ -91,14 +90,13 @@ public:
9190
std::map<std::string, double>&);
9291

9392
private:
94-
int Load(std::string, mongocxx::collection&, std::string);
93+
int Load(std::string, mongocxx::collection*, std::string);
9594
int Override(bsoncxx::document::view);
96-
mongocxx::client fClient;
9795
bsoncxx::document::view bson_options;
9896
bsoncxx::document::value *bson_value;
9997
std::shared_ptr<MongoLog> fLog;
10098
mongocxx::collection fDAC_collection;
101-
std::string fDBname;
99+
mongocxx::collection fBM_collection;
102100
std::string fHostname;
103101
std::string fDetector;
104102
};

dispatcher/MongoConnect.py

+3
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,12 @@ def SendCommand(self, command, hosts, user, detector, mode="", delay=0):
421421
if delay == 0:
422422
docs = doc_base
423423
docs['host'] = hosts[0]+hosts[1] if isinstance(hosts, tuple) else hosts
424+
docs['acknowledged'] = {h:0 for h in docs['host']}
424425
else:
425426
docs = [dict(doc_base.items()), dict(doc_base.items())]
426427
docs[0]['host'], docs[1]['host'] = hosts
428+
docs[0]['acknowledged'] = {h:0 for h in docs[0]['host']}
429+
docs[1]['acknowledged'] = {h:0 for h in docs[1]['host']}
427430
docs[1]['createdAt'] += datetime.timedelta(seconds=delay)
428431
self.collections['command_queue'].insert(docs)
429432
except Exception as e:

docs/databases.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ The control database is used to propagate commands from the dispatcher to the re
122122
"user" : "web",
123123
"host" : ["fdaq00_reader_0"],
124124
"acknowledged" : {
125-
"fdaq00_reader_0" : 1601469970934
125+
"fdaq00_reader_0" : 0
126126
},
127127
"command" : "arm",
128128
"createdAt": <date object>
@@ -135,7 +135,7 @@ The control database is used to propagate commands from the dispatcher to the re
135135
|mode |Options file to use for this run. Corresponds to the 'name' field of the options doc. |
136136
|user |Who started the run? Corresponds to the last person to change the detector_status doc during normal operation. Exceptional stop commands can be automatically issued by various subsystems as well in case of errors.
137137
|host |List of all hosts to which this command is directed. Readers and crate controllers will only process commands addressed to them. |
138-
|acknowledged |Before attempting to process a command all reader and crate controller processes will first acknowledge the command as received. This does not indicate that processing the command was successful! It just indicates the thing tried. The dispatcher has to watch for the appropriate state change of the slave nodes in order to determine if the command achieved its goal. This is a dictionary, with values set to the timestamp (in ms) of when the acknowledgement happened. |
138+
|acknowledged |Before attempting to process a command all reader and crate controller processes will first acknowledge the command as received. This does not indicate that processing the command was successful! It just indicates the thing tried. The dispatcher has to watch for the appropriate state change of the slave nodes in order to determine if the command achieved its goal. This is a dictionary, with values set to the timestamp of when the acknowledgement happened. This dictionary must be populated with 0s before insertion. |
139139
|command |This is the actual command. 'arm' gets the DAQ ready to start. 'start' and 'stop' do what they say on the tin. 'stop' can also be used as a general reset command for a given instance. |
140140

141141
### db.options

helpers/runcommand.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def main(coll):
1212
parser.add_argument('--host', nargs='+', default=[os.uname()[1]], help="Hosts to issue to")
1313

1414
args = parser.parse_args()
15+
if not isinstance(args.host, (list, tuple)):
16+
args.host = [args.host]
1517

1618
doc = {
1719
"command": args.command,
@@ -20,13 +22,14 @@ def main(coll):
2022
"host": args.host,
2123
"user": os.getlogin(),
2224
"run_identifier": '%06i' % args.number,
23-
"createdAt": datetime.datetime.utcnow()
25+
"createdAt": datetime.datetime.utcnow(),
26+
"acknowledged": {h:0 for h in args.host}
2427
}
2528
coll.insert_one(doc)
2629
return
2730

2831
if __name__ == '__main__':
29-
with MongoClient("mongodb://daq:%s@xenon1t-daq:27017/admin" % os.environ['MONGO_PASSWORD_DAQ']) as client:
32+
with MongoClient("mongodb://daq:%s@gw:27020/admin" % os.environ['MONGO_PASSWORD_DAQ']) as client:
3033
try:
3134
main(client['daq']['control'])
3235
except Exception as e:

main.cc

+13-14
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,11 @@ void SignalHandler(int signum) {
2525
return;
2626
}
2727

28-
void UpdateStatus(std::string suri, std::string dbname, std::unique_ptr<DAQController>& controller) {
29-
mongocxx::uri uri(suri);
30-
mongocxx::client c(uri);
31-
mongocxx::collection status = c[dbname]["status"];
28+
void UpdateStatus(std::string suri, mongocxx::collection* collection, dbname, std::unique_ptr<DAQController>& controller) {
3229
using namespace std::chrono;
3330
while (b_run == true) {
3431
try{
35-
controller->StatusUpdate(&status);
32+
controller->StatusUpdate(collection);
3633
}catch(const std::exception &e){
3734
std::cout<<"Can't connect to DB to update."<<std::endl;
3835
std::cout<<e.what()<<std::endl;
@@ -43,14 +40,14 @@ void UpdateStatus(std::string suri, std::string dbname, std::unique_ptr<DAQContr
4340
}
4441

4542
int PrintUsage() {
46-
std::cout<<"Welcome to REDAX readout\nAccepted command-line arguments:\n"
43+
std::cout<<"Welcome to REDAX\nAccepted command-line arguments:\n"
4744
<< "--id <id number>: id number of this readout instance, required\n"
4845
<< "--uri <mongo uri>: full MongoDB URI, required\n"
4946
<< "--db <database name>: name of the database to use, default \"daq\"\n"
5047
<< "--logdir <directory>: where to write the logs, default pwd\n"
5148
<< "--reader: this instance is a reader\n"
5249
<< "--cc: this instance is a crate controller\n"
53-
<< "--arm-delay <delay>: ms to wait between the ARM command and the arming sequence, default 15000\n"
50+
<< "--arm-delay <delay>: ms to wait between the ARM command and the arming sequence, default 5000\n"
5451
<< "--log-retention <value>: how many days to keep logfiles, default 7\n"
5552
<< "--help: print this message\n"
5653
<< "\n";
@@ -126,8 +123,9 @@ int main(int argc, char** argv){
126123
mongocxx::database db = client[dbname];
127124
mongocxx::collection control = db["control"];
128125
mongocxx::collection status = db["status"];
129-
mongocxx::collection options_collection = db["options"];
126+
mongocxx::collection opts_collection = db["options"];
130127
mongocxx::collection dac_collection = db["dac_calibration"];
128+
mongocxx::collection benchmark_collection = db["redax_benchmarks"];
131129

132130
// Logging
133131
auto fLog = std::make_shared<MongoLog>(log_retention, log_dir, suri, dbname, "log", hostname);
@@ -142,16 +140,16 @@ int main(int argc, char** argv){
142140
controller = std::make_unique<CControl_Handler>(fLog, hostname);
143141
else
144142
controller = std::make_unique<DAQController>(fLog, hostname);
145-
std::thread status_update(&UpdateStatus, suri, dbname, std::ref(controller));
143+
std::thread status_update(&UpdateStatus, &status, std::ref(controller));
146144

147145
using namespace bsoncxx::builder::stream;
148146
// Sort oldest to newest
149147
auto opts = mongocxx::options::find_one_and_update{};
150148
opts.sort(document{} << "_id" << 1 << finalize);
151-
auto query = document{} << "host" << hostname << "acknowledged." + hostname <<
152-
open_document << "$exists" << 0 << close_document << finalize;
149+
std::string ack_host = "acknowledged." + hostname;
150+
auto query = document{} << "host" << hostname << ack_host << 0 << finalize;
153151
auto update = document{} << "$currentDate" << open_document <<
154-
"acknowledged."+hostname << true << close_document << finalize;
152+
ack_host << true << close_document << finalize;
155153
using namespace std::chrono;
156154
// Main program loop. Scan the database and look for commands addressed
157155
// to this hostname.
@@ -196,6 +194,7 @@ int main(int argc, char** argv){
196194
fLog->Entry(MongoLog::Local, "Ack to stop took %i us",
197195
duration_cast<microseconds>(now-ack_time).count());
198196
fLog->SetRunId(-1);
197+
fOptions.reset();
199198
} else if(command == "arm"){
200199
// Can only arm if we're idle
201200
if(controller->status() == 0){
@@ -212,8 +211,8 @@ int main(int argc, char** argv){
212211
// Mongocxx types confusing so passing json strings around
213212
std::string mode = doc["mode"].get_utf8().value.to_string();
214213
fLog->Entry(MongoLog::Local, "Getting options doc for mode %s", mode.c_str());
215-
fOptions = std::make_shared<Options>(fLog, mode,
216-
hostname, suri, dbname, override_json);
214+
fOptions = std::make_shared<Options>(fLog, mode, &opts_collection,
215+
&dac_collection, &benchmark_collection, override_json);
217216
int dt = duration_cast<milliseconds>(system_clock::now()-ack_time).count();
218217
fLog->SetRunId(fOptions->GetInt("number", -1));
219218
fLog->Entry(MongoLog::Local, "Took %i ms to load config", dt);

0 commit comments

Comments
 (0)