Skip to content

[WIP][df] Introduce RDatasetSpec for RNTuple #19341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tree/dataframe/inc/ROOT/RNTupleDS.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,18 @@ class RNTupleDS final : public ROOT::RDF::RDataSource {
std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders;

ULong64_t fSeenEntries = 0; ///< The number of entries so far returned by GetEntryRanges()
ULong64_t fSeenEntriesRange = 0;
ULong64_t fSeenEntriesNonRange = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the name hard to understand. A small comment could help or if it's not too much work, one could rename it to something like fSeenEntriesOutOfRange (if that's what it does).

Copy link
Contributor Author

@martamaja10 martamaja10 Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's not what it does, it counts entries if no global range is requested, I'll add a comment, I could also rename it but I'm not sure what to rename it to to be more evident. It used to be just fSeenEntries which was simply for counting processed entries but now I need to have two separate counters depending on the case. I still need the fSeenEntries to count correctly at the end of processing but depending on whether I have the global ranges or not, I need to count differently first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe fSeenEntriesNonRange --> fSeenEntriesNoGlobalRange and fSeenEntriesRange --> fSeenEntriesWithGlobalRange

ULong64_t counterFileEmpty = 0;

std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call
/// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
/// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily
/// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to.
std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
// Keep track of the scheduled entries - necessary for processing of GlobalEntries
std::vector<std::pair<ULong64_t, ULong64_t>> fOriginalRanges;
/// One element per slot, corresponding to the current range index for that slot, as filled by InitSlot
std::vector<std::size_t> fSlotsToRangeIdxs;

Expand Down
2 changes: 0 additions & 2 deletions tree/dataframe/src/RDataSource.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
ROOT::RDF::RSampleInfo ROOT::RDF::RDataSource::CreateSampleInfo(
unsigned int, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const
{
// Currently not implemented for the generic data source, only works correctly for TTree.
// TODO: Implement the feature also for the generic data source.
return ROOT::RDF::RSampleInfo{};
}

Expand Down
208 changes: 132 additions & 76 deletions tree/dataframe/src/RLoopManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
#include "ROOT/RDF/RFilterBase.hxx"
#include "ROOT/RDF/RLoopManager.hxx"
#include "ROOT/RNTupleProcessor.hxx"
#include "ROOT/RDF/RRangeBase.hxx"
#include "ROOT/RDF/RVariationBase.hxx"
#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
Expand Down Expand Up @@ -317,6 +318,50 @@ auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti
}
} // anonymous namespace

/**
* \brief Helper function to open a file (or the first file from a glob).
* This function is used at construction time of an RDataFrame, to check the
* concrete type of the dataset stored inside the file.
*/
std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
{
// Follow same logic in TChain::Add to find the correct string to look for globbing:
// - If the extension ".root" is present in the file name, pass along the basename.
// - If not, use the "?" token to delimit the part of the string which represents the basename.
// - Otherwise, pass the full filename.
auto &&baseNameAndQuery = [&fileNameGlob]() {
constexpr std::string_view delim{".root"};
if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
it != fileNameGlob.end()) {
auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
} else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
else
return std::make_pair(fileNameGlob, std::string_view{});
}();
// Captured structured bindings variable are only valid since C++20
auto &&baseName = baseNameAndQuery.first;
auto &&query = baseNameAndQuery.second;

std::string fileToOpen{fileNameGlob};
if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
if (expanded.empty())
throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
"' did not match any files."};

fileToOpen = expanded.front() + std::string{query};
}

::TDirectory::TContext ctxt; // Avoid changing gDirectory;
std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
if (!inFile || inFile->IsZombie())
throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");

return inFile;
}

namespace ROOT {
namespace Detail {
namespace RDF {
Expand Down Expand Up @@ -450,41 +495,96 @@ std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::str
*/
void RLoopManager::ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
{
// Change the range of entries to be processed
fBeginEntry = spec.GetEntryRangeBegin();
fEndEntry = spec.GetEntryRangeEnd();

// Store the samples
fSamples = spec.MoveOutSamples();
fSampleMap.clear();

// Create the internal main chain
auto chain = ROOT::Internal::TreeUtils::MakeChainForMT();
for (auto &sample : fSamples) {
const auto &trees = sample.GetTreeNames();
const auto &files = sample.GetFileNameGlobs();
for (std::size_t i = 0ul; i < files.size(); ++i) {
// We need to use `<filename>?#<treename>` as an argument to TChain::Add
// (see https://github.com/root-project/root/pull/8820 for why)
const auto fullpath = files[i] + "?#" + trees[i];
chain->Add(fullpath.c_str());
// ...but instead we use `<filename>/<treename>` as a sample ID (cannot
// change this easily because of backward compatibility: the sample ID
// is exposed to users via RSampleInfo and DefinePerSample).
const auto sampleId = files[i] + '/' + trees[i];
fSampleMap.insert({sampleId, &sample});
auto filesVec = spec.GetFileNameGlobs();
auto inFile = OpenFileWithSanityChecks(
filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
auto datasetName = spec.GetTreeNames();

if (inFile->Get<TTree>(datasetName[0].data())) {
// Change the range of entries to be processed
fBeginEntry = spec.GetEntryRangeBegin();
fEndEntry = spec.GetEntryRangeEnd();

// Store the samples
fSamples = spec.MoveOutSamples();
fSampleMap.clear();

// Create the internal main chain
auto chain = ROOT::Internal::TreeUtils::MakeChainForMT();
for (auto &sample : fSamples) {
const auto &trees = sample.GetTreeNames();
const auto &files = sample.GetFileNameGlobs();
for (std::size_t i = 0ul; i < files.size(); ++i) {
// We need to use `<filename>?#<treename>` as an argument to TChain::Add
// (see https://github.com/root-project/root/pull/8820 for why)
const auto fullpath = files[i] + "?#" + trees[i];
chain->Add(fullpath.c_str());
// ...but instead we use `<filename>/<treename>` as a sample ID (cannot
// change this easily because of backward compatibility: the sample ID
// is exposed to users via RSampleInfo and DefinePerSample).
const auto sampleId = files[i] + '/' + trees[i];
fSampleMap.insert({sampleId, &sample});
#ifdef R__UNIX
// Also add redirected EOS xroot URL when available
if (auto redirectedSampleId = GetRedirectedSampleId(files[i], trees[i]))
fSampleMap.insert({redirectedSampleId.value(), &sample});
// Also add redirected EOS xroot URL when available
if (auto redirectedSampleId = GetRedirectedSampleId(files[i], trees[i]))
fSampleMap.insert({redirectedSampleId.value(), &sample});
#endif
}
}
}
fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
fDataSource->SetNSlots(fNSlots);
for (unsigned int slot{}; slot < fNSlots; slot++) {
for (auto &v : fDatasetColumnReaders[slot])
v.second.reset();

fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());

fDataSource->SetNSlots(fNSlots);
for (unsigned int slot{}; slot < fNSlots; slot++) {
for (auto &v : fDatasetColumnReaders[slot])
v.second.reset();
}
} else if (inFile->Get<ROOT::RNTuple>(datasetName[0].data())) {

// Change the range of entries to be processed
fBeginEntry = spec.GetEntryRangeBegin();
fEndEntry = spec.GetEntryRangeEnd();

// Store the samples
fSamples = spec.MoveOutSamples();
fSampleMap.clear();

std::vector<std::string> fileNames;
std::set<std::string> rntupleNames;

for (auto &sample : fSamples) {
const auto &trees = sample.GetTreeNames();
const auto &files = sample.GetFileNameGlobs();
for (std::size_t i = 0ul; i < files.size(); ++i) {
const auto sampleId = files[i] + '/' + trees[i];
fSampleMap.insert({sampleId, &sample});
fileNames.push_back(files[i]);
rntupleNames.insert(trees[i]);

#ifdef R__UNIX
// Also add redirected EOS xroot URL when available
if (auto redirectedSampleId = GetRedirectedSampleId(files[i], trees[i]))
fSampleMap.insert({redirectedSampleId.value(), &sample});
#endif
}
}

if (rntupleNames.size() == 1) {
fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
fDataSource->SetNSlots(fNSlots);

for (unsigned int slot{}; slot < fNSlots; slot++) {
for (auto &v : fDatasetColumnReaders[slot])
v.second.reset();
}

} else {
throw std::runtime_error(
"More than one RNTuple name was found, please make sure to use RNTuples with the same RnTuple name.");
}
Comment on lines +582 to +584
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a temporary limitation?

Copy link
Contributor Author

@martamaja10 martamaja10 Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this is what we have in any RNTuple chaining at this point.

} else {
throw std::invalid_argument(
"RDataFrame: unsupported data format for dataset. Make sure you use TTree or RNTuple.");
}
}

Expand Down Expand Up @@ -1185,50 +1285,6 @@ void ROOT::Detail::RDF::RLoopManager::SetTTreeLifeline(std::any lifeline)
fTTreeLifeline = std::move(lifeline);
}

/**
* \brief Helper function to open a file (or the first file from a glob).
* This function is used at construction time of an RDataFrame, to check the
* concrete type of the dataset stored inside the file.
*/
std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
{
// Follow same logic in TChain::Add to find the correct string to look for globbing:
// - If the extension ".root" is present in the file name, pass along the basename.
// - If not, use the "?" token to delimit the part of the string which represents the basename.
// - Otherwise, pass the full filename.
auto &&baseNameAndQuery = [&fileNameGlob]() {
constexpr std::string_view delim{".root"};
if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
it != fileNameGlob.end()) {
auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
} else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
else
return std::make_pair(fileNameGlob, std::string_view{});
}();
// Captured structured bindings variable are only valid since C++20
auto &&baseName = baseNameAndQuery.first;
auto &&query = baseNameAndQuery.second;

std::string fileToOpen{fileNameGlob};
if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
if (expanded.empty())
throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
"' did not match any files."};

fileToOpen = expanded.front() + std::string{query};
}

::TDirectory::TContext ctxt; // Avoid changing gDirectory;
std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
if (!inFile || inFile->IsZombie())
throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");

return inFile;
}

std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
const ROOT::RDF::ColumnNames_t &defaultColumns, bool checkFile)
Expand Down
Loading
Loading