I am new to RDataframe and tried to convert some old code to RDataframe syntax.
It works without implicite MT. But (as I have expected) not with implicite MT. Here I am missing events.
Can someone help me how to do it right? I looked at the tutorials and found the df018_customActions one which seems to be something I need to use. But I do not really understand it.
My code used so far is:
TH2D spectra2d {tfilename.c_str()
, Form("%s;frequency; run #", tfilename.c_str())
, values, min_freq, max_freq, n_entries, 1, n_entries
};
d.Foreach(
[&spectra2d] (const std::vector<double>& f, const std::vector<double>& r, int run)
{
for (size_t i = 0; i < f.size(); ++i) {
spectra2d.SetBinContent(spectra2d.FindBin(f[i], run), r[i]);
}
}
, {"f", "r", "run"}
);
as you say this code cannot work in parallel because the SetBinContent method is not thread safe (and for a good reason!).
What you may do, is to fill N different histograms, one per processing slot (there is one slot per worker thread in the RDF model). The idea is not to impose thread safety with a synchronisation mechanism (a lock around the SetBinContent would work but the code will be tremendously inefficient, de-facto sequential).
Therefore your code may become:
std::vector<TH2D> spectra2d_vec;
const auto nSlots = GetImplicitMTPoolSize();
for (auto i : ROOT::TSeqU(nSlots)) spectra2d_vec.emplace_back(tfilename.c_str()
, Form("%s;frequency; run #", tfilename.c_str())
, values, min_freq, max_freq, n_entries, 1, n_entries);
d.Foreach(
[&spectra2d] (unsigned int slot, const std::vector<double>& f, const std::vector<double>& r, int run)
{
for (size_t i = 0; i < f.size(); ++i) {
spectra2d[slot].SetBinContent(spectra2d.FindBin(f[i], run), r[i]);
}
}
, {"f", "r", "run"}
);
// Merge here your histos.
auto &mergedSpectra = spectra2d_vec[0];
for (auto i : ROOT::TSeqU(1, nSlots)) mergedSpectra.Merge(&spectra2d_vec[i]);
It is strange though that using 2 threads is faster than using 4 threads by saying ROOT::EnableImplicitMT(2) instead of using 0 or 4 by 4 physical CPUs. I guess that is because the hard drive is slowing multiple threads down by reading the data from tree.
The code I use now is:
ROOT::RDataFrame d("spectra1", tfilename);
auto n_slots = ROOT::GetImplicitMTPoolSize();
if (n_slots == 0) n_slots = 1;
std::vector<TH2D> spectra2d_vec;
for (auto&& i : ROOT::TSeqU(n_slots)) {
spectra2d_vec.emplace_back(
TH2D {tfilename.c_str()
, Form("%s;frequency; run #", tfilename.c_str())
, Int_t(*values), *min_freq / harmonic, *max_freq / harmonic, Int_t(*n_entries), 1, Double_t(*n_entries)
}
);
}
d.ForeachSlot(
[&spectra2d_vec] (unsigned int slot, const std::vector<double>& f, const std::vector<double>& r, int run)
{
for (size_t i = 0; i < f.size(); ++i) {
spectra2d_vec[slot].SetBinContent(spectra2d_vec[slot].FindBin(f[i] / harmonic, run), r[i]);
}
}
, {"frequencies", "rates", "run"}
);
for (auto&& i : ROOT::TSeqU(1, n_slots)) spectra2d_vec[0].Add(&spectra2d_vec[i]);
return spectra2d_vec[0];