Skip to content

Commit

Permalink
Support running multithreaded: Add an IOSvc, an algorithm for reading…
Browse files Browse the repository at this point in the history
… and writing for functionals in k4FWCore (key4hep#173)
  • Loading branch information
jmcarcell authored May 21, 2024
1 parent 0c3aebf commit 5dc2a63
Show file tree
Hide file tree
Showing 58 changed files with 3,313 additions and 409 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
mkdir -p build install;\
source /cvmfs/${{ matrix.cvmfs_base }}/${{ matrix.ENVIRONMENT }}/setup.sh;\
cd build;\
cmake -DCMAKE_INSTALL_PREFIX=../install -DCMAKE_CXX_STANDARD=17 -DCMAKE_CXX_FLAGS=" -fdiagnostics-color=always " -G Ninja ..;'
cmake -DCMAKE_INSTALL_PREFIX=../install -DCMAKE_CXX_STANDARD=20 -DCMAKE_CXX_FLAGS=" -fdiagnostics-color=always " -G Ninja ..;'
- name: Compile
run: |
docker exec CI_container /bin/bash -c 'cd ./Package;\
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ endif()

add_subdirectory(k4FWCore)
add_subdirectory(k4Interface)
add_subdirectory(python)
if(BUILD_TESTING)
add_subdirectory(test/k4FWCoreTest)
endif()


option(ENABLE_CPACK "Whether or not to use cpack config" OFF)
if(ENABLE_CPACK)
include(cmake/${PROJECT_NAME}CPack.cmake)
Expand Down
6 changes: 6 additions & 0 deletions k4FWCore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ add_custom_command(
COMMAND ${CMAKE_COMMAND} -E copy_directory
${PROJECT_SOURCE_DIR}/k4FWCore/python/k4FWCore
${CMAKE_CURRENT_BINARY_DIR}/genConfDir/k4FWCore)

# This is needed to overwrite the __init__.py, see a long comment in the
# CMakeLists.txt in the test folder
add_custom_command(TARGET k4FWCorePlugins POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy
${PROJECT_SOURCE_DIR}/python/k4FWCore/__init__.py ${PROJECT_BINARY_DIR}/k4FWCore/genConfDir/k4FWCore/__init__.py)
57 changes: 57 additions & 0 deletions k4FWCore/components/IIOSvc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2014-2024 Key4hep-Project.
*
* This file is part of Key4hep.
* See https://key4hep.github.io/key4hep-doc/ for further info.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FWCORE_IIOSVC_H
#define FWCORE_IIOSVC_H

#include "GaudiKernel/IInterface.h"

#include "podio/CollectionBase.h"
#include "podio/ROOTWriter.h"

#include <memory>
#include <vector>

/**
* The interface implemented by any class making IO and reading RawEvent Data
*/
class IIOSvc : virtual public IInterface {
public:
struct EndOfInput : std::logic_error {
EndOfInput() : logic_error("Reached end of input while more data were expected"){};
};

public:
/// InterfaceID
DeclareInterfaceID(IIOSvc, 1, 0);

/**
* @brief Read the next event from the input file
* @return A tuple containing the collections read, the collection names and the frame that owns the collections
*/
virtual std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame>
next() = 0;
virtual std::shared_ptr<std::vector<std::string>> getCollectionNames() const = 0;

virtual std::shared_ptr<podio::ROOTWriter> getWriter() = 0;
virtual void deleteWriter() = 0;
virtual void deleteReader() = 0;
virtual bool checkIfWriteCollection(const std::string& collName) = 0;
};

#endif
159 changes: 159 additions & 0 deletions k4FWCore/components/IOSvc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2014-2024 Key4hep-Project.
*
* This file is part of Key4hep.
* See https://key4hep.github.io/key4hep-doc/ for further info.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "IOSvc.h"

#include "podio/Frame.h"
#include "podio/FrameCategories.h"

#include "k4FWCore/FunctionalUtils.h"
#include "k4FWCore/KeepDropSwitch.h"

#include "GaudiKernel/AnyDataWrapper.h"
#include "GaudiKernel/IEventProcessor.h"

#include <mutex>
#include <tuple>

StatusCode IOSvc::initialize() {
StatusCode sc = Service::initialize();
if (sc.isFailure()) {
error() << "Unable to initialize base class Service." << endmsg;
return sc;
}
if (!m_readingFileNames.empty()) {
m_reader = std::make_unique<podio::ROOTReader>();
try {
m_reader->openFiles(m_readingFileNames);
} catch (std::runtime_error& e) {
error() << "Error when opening files: " << e.what() << endmsg;
return StatusCode::FAILURE;
}
m_entries = m_reader->getEntries(podio::Category::Event);
}

m_switch = KeepDropSwitch(m_outputCommands);

m_incidentSvc = service("IncidentSvc");
if (!m_incidentSvc) {
error() << "Unable to locate IIncidentSvc interface" << endmsg;
return StatusCode::FAILURE;
}
m_incidentSvc->addListener(this, IncidentType::EndEvent);

m_dataSvc = service("EventDataSvc");
if (!m_dataSvc) {
error() << "Unable to locate the EventDataSvc" << endmsg;
return StatusCode::FAILURE;
}

m_hiveWhiteBoard = service("EventDataSvc");

return StatusCode::SUCCESS;
}

StatusCode IOSvc::finalize() { return Service::finalize(); }

std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame> IOSvc::next() {
podio::Frame frame;
{
std::scoped_lock<std::mutex> lock(m_changeBufferLock);
info() << "m_nextEntry = " << m_nextEntry << " m_entries = " << m_entries << endmsg;
if (m_nextEntry < m_entries) {
frame = podio::Frame(std::move(m_reader->readEntry(podio::Category::Event, m_nextEntry)));
} else {
return std::make_tuple(std::vector<std::shared_ptr<podio::CollectionBase>>(), std::vector<std::string>(),
std::move(frame));
}
m_nextEntry++;
if (m_collectionNames.empty()) {
m_collectionNames = frame.getAvailableCollections();
}
}

if (m_nextEntry >= m_entries) {
// if (true) {
auto ep = serviceLocator()->as<IEventProcessor>();
StatusCode sc = ep->stopRun();
if (sc.isFailure()) {
error() << "Error when stopping run" << endmsg;
throw GaudiException("Error when stopping run", name(), StatusCode::FAILURE);
}
info() << "m_nextEntry = " << m_nextEntry << " m_entries = " << m_entries << endmsg;
}

std::vector<std::shared_ptr<podio::CollectionBase>> collections;

for (const auto& name : m_collectionNames) {
auto ptr = const_cast<podio::CollectionBase*>(frame.get(name));
collections.push_back(std::shared_ptr<podio::CollectionBase>(ptr));
}

return std::make_tuple(collections, m_collectionNames, std::move(frame));
}

// After every event if there is still a frame in the TES
// that means it hasn't been written so the collections inside the Frame
// should be removed so that they are deleted when the Frame is deleted
// and not deleted when clearing the store
void IOSvc::handle(const Incident& incident) {
StatusCode code;
if (m_hiveWhiteBoard) {
if (!incident.context().valid()) {
info() << "No context found in IOSvc" << endmsg;
return;
}
debug() << "Setting store to " << incident.context().slot() << endmsg;
code = m_hiveWhiteBoard->selectStore(incident.context().slot());
if (code.isFailure()) {
error() << "Error when setting store" << endmsg;
throw GaudiException("Error when setting store", name(), StatusCode::FAILURE);
}
}
DataObject* p;
code = m_dataSvc->retrieveObject("/Event" + k4FWCore::frameLocation, p);
if (code.isFailure()) {
return;
}

auto frame = dynamic_cast<AnyDataWrapper<podio::Frame>*>(p);
if (!frame) {
error() << "Expected Frame in " << k4FWCore::frameLocation << " but there was something else" << endmsg;
return;
}
for (const auto& coll : frame->getData().getAvailableCollections()) {
DataObject* collPtr;
code = m_dataSvc->retrieveObject("/Event/" + coll, collPtr);
if (code.isSuccess()) {
debug() << "Removing the collection: " << coll << " from the store" << endmsg;
code = m_dataSvc->unregisterObject(collPtr);
}
// else {
// info() << "Collection not found: " << coll << endmsg;
// }
}
}

void IOSvc::setReadingCollectionNames(const std::vector<std::string>& names) { m_collectionNames = names; }

void IOSvc::setReadingFileNames(const std::vector<std::string>& names) { m_readingFileNames = names; }

bool IOSvc::checkIfWriteCollection(const std::string& collName) { return m_switch.isOn(collName); }

DECLARE_COMPONENT(IOSvc)
96 changes: 96 additions & 0 deletions k4FWCore/components/IOSvc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2014-2024 Key4hep-Project.
*
* This file is part of Key4hep.
* See https://key4hep.github.io/key4hep-doc/ for further info.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FWCORE_IOSVC_H
#define FWCORE_IOSVC_H

#include "Gaudi/Property.h"
#include "GaudiKernel/IDataProviderSvc.h"
#include "GaudiKernel/IHiveWhiteBoard.h"
#include "GaudiKernel/IIncidentListener.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/Service.h"

#include "podio/ROOTReader.h"
#include "podio/ROOTWriter.h"

#include "IIOSvc.h"
#include "k4FWCore/KeepDropSwitch.h"

#include <string>
#include <vector>

class IOSvc : public extends<Service, IIOSvc, IIncidentListener> {
using extends::extends;

public:
~IOSvc() override = default;

StatusCode initialize() override;
StatusCode finalize() override;

std::tuple<std::vector<std::shared_ptr<podio::CollectionBase>>, std::vector<std::string>, podio::Frame> next()
override;

std::shared_ptr<std::vector<std::string>> getCollectionNames() const override {
return std::make_shared<std::vector<std::string>>(m_collectionNames);
}

void setReadingCollectionNames(const std::vector<std::string>& names);
void setReadingFileNames(const std::vector<std::string>& names);

protected:
Gaudi::Property<std::vector<std::string>> m_collectionNames{
this, "CollectionNames", {}, "List of collections to read"};
Gaudi::Property<std::vector<std::string>> m_readingFileNames{this, "input", {}, "List of files to read"};
Gaudi::Property<std::string> m_writingFileName{this, "output", {}, "List of files to write output to"};
Gaudi::Property<std::vector<std::string>> m_outputCommands{
this, "outputCommands", {"keep *"}, "A set of commands to declare which collections to keep or drop."};
Gaudi::Property<std::string> m_inputType{this, "ioType", "ROOT", "Type of input file (ROOT, RNTuple)"};

std::mutex m_changeBufferLock;

KeepDropSwitch m_switch;

std::unique_ptr<podio::ROOTReader> m_reader{nullptr};
std::shared_ptr<podio::ROOTWriter> m_writer{nullptr};

std::shared_ptr<podio::ROOTWriter> getWriter() override {
if (!m_writer) {
m_writer = std::make_shared<podio::ROOTWriter>(m_writingFileName.value());
}
return m_writer;
}

// Gaudi doesn't always run the destructor of the Services so we have to
// manually ask for the writer to be deleted so it will call finish()
void deleteWriter() override { m_writer.reset(); }
void deleteReader() override { m_reader.reset(); }

SmartIF<IDataProviderSvc> m_dataSvc;
SmartIF<IIncidentSvc> m_incidentSvc;
SmartIF<IHiveWhiteBoard> m_hiveWhiteBoard;
void handle(const Incident& incident) override;

int m_entries{0};
int m_nextEntry{0};

bool checkIfWriteCollection(const std::string& collName) override;
};

#endif
Loading

0 comments on commit 5dc2a63

Please sign in to comment.