Skip to content

Commit

Permalink
Sources now have independent ProductRegistry
Browse files Browse the repository at this point in the history
Sources can now update their ProductRegistry without affecting the rest of the system. The system will pick up the changes when they are needed.
  • Loading branch information
Dr15Jones committed Jan 30, 2025
1 parent 78ffd9a commit a52b578
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 73 deletions.
21 changes: 9 additions & 12 deletions FWCore/Framework/interface/InputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Some examples of InputSource subclasses are:
#include "DataFormats/Provenance/interface/RunAuxiliary.h"
#include "DataFormats/Provenance/interface/RunID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProcessingController.h"
Expand All @@ -43,7 +44,6 @@ namespace edm {
class ParameterSetDescription;
class ProcessContext;
class ProcessHistoryRegistry;
class ProductRegistry;
class SignallingProductRegistry;
class StreamContext;
class ModuleCallingContext;
Expand Down Expand Up @@ -159,11 +159,11 @@ namespace edm {
/// issue an event report
void issueReports(EventID const& eventID, StreamID streamID);

/// Register any produced products
virtual void registerProducts(SignallingProductRegistry&);
/// Register any produced products into source's registry
virtual void registerProducts();

/// Accessors for product registry
std::shared_ptr<ProductRegistry const> productRegistry() const { return get_underlying_safe(productRegistry_); }
ProductRegistry const& productRegistry() const { return productRegistry_; }

/// Accessors for process history registry.
ProcessHistoryRegistry const& processHistoryRegistry() const { return *processHistoryRegistry_; }
Expand Down Expand Up @@ -198,9 +198,6 @@ namespace edm {
/// Returns nullptr if no resource shared between the Source and a DelayedReader
std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader();

/// switch to a different ProductRegistry.
void switchTo(std::shared_ptr<ProductRegistry> iOther) { productRegistry_ = iOther; }

/// Accessor for maximum number of events to be read.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }
Expand All @@ -226,8 +223,8 @@ namespace edm {
/// Accessor for global process identifier
std::string const& processGUID() const { return processGUID_; }

/// Called by framework at beginning of job
void doBeginJob();
/// Called by framework at beginning of job. The argument is the full product registry
void doBeginJob(edm::ProductRegistry const&);

/// Called by framework at end of job
void doEndJob();
Expand Down Expand Up @@ -357,7 +354,7 @@ namespace edm {
/// To set the current time, as seen by the input source
void setTimestamp(Timestamp const& theTime) { time_ = theTime; }

ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
ProductRegistry& productRegistryUpdate() { return productRegistry_; }
ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
ItemTypeInfo state() const { return state_; }
void setRunAuxiliary(RunAuxiliary* rp) {
Expand Down Expand Up @@ -397,7 +394,7 @@ namespace edm {
void decreaseRemainingEventsBy(int iSkipped);

///Begin protected makes it easier to do template programming
virtual void beginJob();
virtual void beginJob(edm::ProductRegistry const&);

private:
bool eventLimitReached() const { return remainingEvents_ == 0; }
Expand Down Expand Up @@ -451,7 +448,7 @@ namespace edm {
std::chrono::time_point<std::chrono::steady_clock> processingStart_;
ProcessingMode processingMode_;
ModuleDescription const moduleDescription_;
edm::propagate_const<std::shared_ptr<ProductRegistry>> productRegistry_;
ProductRegistry productRegistry_;
edm::propagate_const<std::unique_ptr<ProcessHistoryRegistry>> processHistoryRegistry_;
edm::propagate_const<std::shared_ptr<BranchIDListHelper>> branchIDListHelper_;
edm::propagate_const<std::shared_ptr<ProcessBlockHelper>> processBlockHelper_;
Expand Down
11 changes: 1 addition & 10 deletions FWCore/Framework/interface/InputSourceDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,9 @@ namespace edm {
class ThinnedAssociationsHelper;

struct InputSourceDescription {
InputSourceDescription()
: moduleDescription_(),
productRegistry_(nullptr),
actReg_(),
maxEvents_(-1),
maxLumis_(-1),
allocations_(nullptr) {}
InputSourceDescription() : moduleDescription_(), actReg_(), maxEvents_(-1), maxLumis_(-1), allocations_(nullptr) {}

InputSourceDescription(ModuleDescription const& md,
std::shared_ptr<ProductRegistry> preg,
std::shared_ptr<BranchIDListHelper> branchIDListHelper,
std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
Expand All @@ -38,7 +31,6 @@ namespace edm {
int maxSecondsUntilRampdown,
PreallocationConfiguration const& allocations)
: moduleDescription_(md),
productRegistry_(preg),
branchIDListHelper_(branchIDListHelper),
processBlockHelper_(processBlockHelper),
thinnedAssociationsHelper_(thinnedAssociationsHelper),
Expand All @@ -49,7 +41,6 @@ namespace edm {
allocations_(&allocations) {}

ModuleDescription moduleDescription_;
std::shared_ptr<ProductRegistry> productRegistry_;
std::shared_ptr<BranchIDListHelper> branchIDListHelper_;
std::shared_ptr<ProcessBlockHelper> processBlockHelper_;
std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper_;
Expand Down
4 changes: 1 addition & 3 deletions FWCore/Framework/interface/maker/InputSourceFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ namespace edm {

static InputSourceFactory const* get();

std::unique_ptr<InputSource> makeInputSource(ParameterSet const&,
SignallingProductRegistry&,
InputSourceDescription const&) const;
std::unique_ptr<InputSource> makeInputSource(ParameterSet const&, InputSourceDescription const&) const;

private:
InputSourceFactory();
Expand Down
19 changes: 7 additions & 12 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ namespace edm {
std::unique_ptr<InputSource> makeInput(unsigned int moduleIndex,
ParameterSet& params,
CommonParams const& common,
std::shared_ptr<SignallingProductRegistry> preg,
std::shared_ptr<BranchIDListHelper> branchIDListHelper,
std::shared_ptr<ProcessBlockHelper> const& processBlockHelper,
std::shared_ptr<ThinnedAssociationsHelper> thinnedAssociationsHelper,
Expand Down Expand Up @@ -166,7 +165,6 @@ namespace edm {
moduleIndex);

InputSourceDescription isdesc(md,
preg,
branchIDListHelper,
processBlockHelper,
thinnedAssociationsHelper,
Expand All @@ -182,8 +180,7 @@ namespace edm {
//even if we have an exception, send the signal
std::shared_ptr<int> sentry(nullptr, [areg, &md](void*) { areg->postSourceConstructionSignal_(md); });
convertException::wrap([&]() {
input = std::unique_ptr<InputSource>(
InputSourceFactory::get()->makeInputSource(*main_input, *preg, isdesc).release());
input = InputSourceFactory::get()->makeInputSource(*main_input, isdesc);
input->preEventReadFromSourceSignal_.connect(std::cref(areg->preEventReadFromSourceSignal_));
input->postEventReadFromSourceSignal_.connect(std::cref(areg->postEventReadFromSourceSignal_));
});
Expand Down Expand Up @@ -489,7 +486,6 @@ namespace edm {
tbb::task_group group;

// initialize the input source
auto tempReg = std::make_shared<SignallingProductRegistry>();
auto sourceID = ModuleDescription::getUniqueID();

group.run([&, this]() {
Expand All @@ -500,12 +496,11 @@ namespace edm {
items.initModules(*parameterSet, tns, preallocations_, &processContext_, moduleTypeResolverMaker_.get());
});

group.run([&, this, tempReg]() {
group.run([&, this]() {
ServiceRegistry::Operate operate(serviceToken_);
input_ = makeInput(sourceID,
*parameterSet,
*common,
/*items.preg(),*/ tempReg,
items.branchIDListHelper(),
get_underlying_safe(processBlockHelper_),
items.thinnedAssociationsHelper(),
Expand All @@ -515,9 +510,7 @@ namespace edm {
});

group.wait();
items.preg()->addFromInput(*tempReg);
input_->switchTo(items.preg());

items.preg()->addFromInput(input_->productRegistry());
{
auto const& tns = ServiceRegistry::instance().get<service::TriggerNamesService>();
schedule_ = items.finishSchedule(std::move(*madeModules),
Expand Down Expand Up @@ -726,7 +719,7 @@ namespace edm {
espController_->finishConfiguration();
actReg_->eventSetupConfigurationSignal_(esp_->recordsToResolverIndices(), processContext_);
try {
convertException::wrap([&]() { input_->doBeginJob(); });
convertException::wrap([&]() { input_->doBeginJob(*preg_); });
} catch (cms::Exception& ex) {
ex.addContext("Calling beginJob for the source");
throw;
Expand Down Expand Up @@ -1014,7 +1007,6 @@ namespace edm {

void EventProcessor::readFile() {
FDEBUG(1) << " \treadFile\n";
size_t size = preg_->size();
SendSourceTerminationSignalIfException sentry(actReg_.get());

if (streamRunActive_ > 0) {
Expand All @@ -1027,6 +1019,9 @@ namespace edm {
}

fb_ = input_->readFile();
//incase the input's registry changed
const size_t size = preg_->size();
preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
if (size < preg_->size()) {
principalCache_.adjustIndexesAfterProductRegistryAddition();
}
Expand Down
7 changes: 3 additions & 4 deletions FWCore/Framework/src/InputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ namespace edm {
maxSecondsUntilRampdown_(desc.maxSecondsUntilRampdown_),
processingMode_(RunsLumisAndEvents),
moduleDescription_(desc.moduleDescription_),
productRegistry_(desc.productRegistry_),
processHistoryRegistry_(new ProcessHistoryRegistry),
branchIDListHelper_(desc.branchIDListHelper_),
processBlockHelper_(desc.processBlockHelper_),
Expand Down Expand Up @@ -205,7 +204,7 @@ namespace edm {
"Calling InputSource::readRunAuxiliary_");
}

void InputSource::doBeginJob() { this->beginJob(); }
void InputSource::doBeginJob(edm::ProductRegistry const& iReg) { this->beginJob(iReg); }

void InputSource::doEndJob() { endJob(); }

Expand All @@ -217,7 +216,7 @@ namespace edm {
return std::pair<SharedResourcesAcquirer*, std::recursive_mutex*>(nullptr, nullptr);
}

void InputSource::registerProducts(SignallingProductRegistry&) {}
void InputSource::registerProducts() {}

// Return a dummy file block.
std::shared_ptr<FileBlock> InputSource::readFile() {
Expand Down Expand Up @@ -436,7 +435,7 @@ namespace edm {
"Calling InputSource::reverseState__");
}

void InputSource::beginJob() {}
void InputSource::beginJob(ProductRegistry const&) {}

void InputSource::endJob() {}

Expand Down
3 changes: 1 addition & 2 deletions FWCore/Framework/src/InputSourceFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace edm {
}

std::unique_ptr<InputSource> InputSourceFactory::makeInputSource(ParameterSet const& conf,
SignallingProductRegistry& reg,
InputSourceDescription const& desc) const

{
Expand All @@ -40,7 +39,7 @@ namespace edm {
<< "Try running EdmPluginDump to obtain a list of available Plugins.";
}

wm->registerProducts(reg);
wm->registerProducts();

FDEBUG(1) << "InputSourceFactory: created input source " << modtype << std::endl;

Expand Down
4 changes: 2 additions & 2 deletions FWCore/Integration/plugins/PutOrMergeTestSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace edmtest {
PutOrMergeTestSource(ParameterSet const&, InputSourceDescription const&);

/// Register any produced products
void registerProducts(SignallingProductRegistry&) final;
void registerProducts() final;

private:
ItemTypeInfo getNextItemType() final;
Expand Down Expand Up @@ -86,7 +86,7 @@ PutOrMergeTestSource::PutOrMergeTestSource(ParameterSet const& iPS, InputSourceD
historyID_ = history.id();
}

void PutOrMergeTestSource::registerProducts(SignallingProductRegistry&) {
void PutOrMergeTestSource::registerProducts() {
edm::ParameterSet dummyPset;
dummyPset.registerIt();

Expand Down
4 changes: 2 additions & 2 deletions FWCore/Integration/plugins/ThrowingSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace edm {
explicit ThrowingSource(ParameterSet const&, InputSourceDescription const&);
~ThrowingSource() noexcept(false) override;

void beginJob() override;
void beginJob(ProductRegistry const&) override;
void endJob() override;
void beginLuminosityBlock(edm::LuminosityBlock&) override;
void beginRun(edm::Run&) override;
Expand Down Expand Up @@ -63,7 +63,7 @@ namespace edm {

void ThrowingSource::produce(edm::Event&) {}

void ThrowingSource::beginJob() {
void ThrowingSource::beginJob(edm::ProductRegistry const&) {
if (whenToThrow_ == kBeginJob)
throw cms::Exception("TestThrow") << "ThrowingSource::beginJob";
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Sources/interface/IDGeneratorSourceBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace edm {
virtual bool setRunAndEventInfo(EventID& id, TimeValue_t& time, EventAuxiliary::ExperimentType& etype) = 0;
virtual bool noFiles() const;
virtual size_t fileIndex() const;
void beginJob() override;
void beginJob(ProductRegistry const&) override;

std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Sources/interface/PuttableSourceBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ namespace edm {
// ---------- member functions ---------------------------
using ProducerBase::registerProducts;
using ProducerBase::resolvePutIndicies;
void registerProducts(SignallingProductRegistry&) final;
void registerProducts() final;

bool hasAbilityToProduceInBeginRuns() const final { return true; }

bool hasAbilityToProduceInBeginLumis() const final { return true; }

protected:
//If inheriting class overrides, they need to call this function as well
void beginJob() override;
void beginJob(edm::ProductRegistry const&) override;

private:
void doBeginLumi(LuminosityBlockPrincipal& lbp, ProcessContext const*) override;
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Sources/src/IDGeneratorSourceBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ namespace edm {
}

template <typename BASE>
void IDGeneratorSourceBase<BASE>::beginJob() {
BASE::beginJob();
void IDGeneratorSourceBase<BASE>::beginJob(ProductRegistry const& iReg) {
BASE::beginJob(iReg);
// Initialize cannot be called from the constructor, because it is a virtual function
// that needs to be invoked from a derived class if the derived class overrides it.
initialize(eventID_, presentTime_, timeBetweenEvents_);
Expand Down
16 changes: 9 additions & 7 deletions FWCore/Sources/src/PuttableSourceBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "FWCore/Framework/interface/Run.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/ExceptionHelpers.h"
#include "FWCore/Framework/interface/SignallingProductRegistry.h"

using namespace edm;
//
Expand All @@ -38,15 +39,16 @@ using namespace edm;
PuttableSourceBase::PuttableSourceBase(ParameterSet const& iPSet, InputSourceDescription const& iISD)
: InputSource(iPSet, iISD) {}

void PuttableSourceBase::registerProducts(SignallingProductRegistry& iReg) {
registerProducts(this, &iReg, moduleDescription());
void PuttableSourceBase::registerProducts() {
SignallingProductRegistry reg;
registerProducts(this, &reg, moduleDescription());
productRegistryUpdate().addFromInput(reg);
}

void PuttableSourceBase::beginJob() {
auto r = productRegistry();
auto const runLookup = r->productLookup(InRun);
auto const lumiLookup = r->productLookup(InLumi);
auto const eventLookup = r->productLookup(InEvent);
void PuttableSourceBase::beginJob(edm::ProductRegistry const& r) {
auto const runLookup = r.productLookup(InRun);
auto const lumiLookup = r.productLookup(InLumi);
auto const eventLookup = r.productLookup(InEvent);
auto const& processName = moduleDescription().processName();
auto const& moduleLabel = moduleDescription().moduleLabel();

Expand Down
Loading

0 comments on commit a52b578

Please sign in to comment.