ROOT Version: 6.28/04
Platform: el8.x86_64
Compiler: GCC 11.2.0
Hello,
I’m trying to set up a simple example using RDataFrame and the Dask plug-in. Without Dask on the same input data the job completes successfully, but with Dask an error message appears which isn’t sufficiently descriptive to allow much debugging. I’d appreciate any advice you can give! The reproduction instructions are below.
Thanks in advance and best wishes,
James Catmore
The following works seamlessly:
import ROOT
df = ROOT.RDataFrame("CollectionTree", "./DAOD_PHYSLITE.33080395._000001.pool.root.1")
df_two_muons = df.Filter("AnalysisMuonsAuxDyn.pt.size() > 1", "At least two muons")
ROOT.gSystem.AddDynamicPath("./commontools/.")
ROOT.gROOT.ProcessLine(".include ./commontools");
ROOT.gInterpreter.AddIncludePath("./commontools");
ROOT.gSystem.Load("./commontools/example_cxx.so")
df_two_muons = df_two_muons.Define("m_mumu",
"ExampleInvariantMass(AnalysisMuonsAuxDyn.pt,AnalysisMuonsAuxDyn.eta,AnalysisMuonsAuxDyn.phi,AnalysisMuonsAuxDyn.charge,105.66)"
)
c = ROOT.TCanvas()
hist = df_two_muons.Histo1D(("m_mumu","m_mumu",100,0,150000.0), "m_mumu")
hist.Draw()
c.Draw()
The C++ example.cxx and example.h are:
#define helperFunctions_cxx
#include <ROOT/RVec.hxx>
#include "TLorentzVector.h"
using VecF_t = const ROOT::RVec<float>&;
using VecI_t = const ROOT::RVec<int>&;
// Compute the invariant mass for oppositely charged pairs of objects
// Note that this function "sees" one event at a time, so the entries in each vector are for a single event
ROOT::RVec<float> ExampleInvariantMass(VecF_t& pt, VecF_t& eta, VecF_t& phi, VecI_t& charge, float mass) {
// Construct oppositely charged pairs.
// This is done by constructing index pairs where each index is the same length as the vectors
// and then checking whether they are oppositely charged. Selected index pairs are then used later
// to select the correct values from the other vectors, to calculate the mass
std::vector<std::pair<unsigned int, unsigned int> > indices{};
unsigned int length{(unsigned int)charge.size()};
for (unsigned int i=0; i<length; ++i) {
for (unsigned int j=i+1; j<length; ++j) {
if (charge[i]!=charge[j]) indices.emplace_back(std::make_pair(i,j));
}
}
// Calculate mass for each pair
ROOT::RVec<float> masses{};
for (unsigned int k=0; k<indices.size(); ++k) {
TLorentzVector p1;
TLorentzVector p2;
unsigned int index1{indices[k].first};
unsigned int index2{indices[k].second};
p1.SetPtEtaPhiM(pt[index1], eta[index1], phi[index1], mass);
p2.SetPtEtaPhiM(pt[index2], eta[index2], phi[index2], mass);
masses.push_back((p1 + p2).M());
}
return masses;
}
#ifndef example_h
#define example_h
#include "ROOT/RDF/RInterface.hxx"
#include <ROOT/RDataSource.hxx>
#include <ROOT/RCsvDS.hxx>
#include <iostream>
#include "TLorentzVector.h"
#include "TParameter.h"
#include <ROOT/RVec.hxx>
using VecF_t = const ROOT::RVec<float>&;
using VecI_t = const ROOT::RVec<int>&;
VecF_t ExampleInvariantMass(VecF_t& pt, VecF_t& eta, VecF_t& phi, VecI_t& charge, VecF_t& e);
#endif
The input can be found here:
/afs/cern.ch/work/j/catmore/public/DAOD_PHYSLITE.33080395._000001.pool.root.1
Now, I make the following adjustments:
(1) Provide a method to create a Dask connection:
from dask.distributed import LocalCluster, Client
# Point RDataFrame calls to Dask RDataFrame object
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
def create_connection(n_workers):
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1, processes=True, memory_limit="5GiB")
try:
client = Client(cluster,timeout='2s')
except TimeoutError:
pass
return client
(2) Set up the RDataFrame as follows:
connection = create_connection(64)
df = RDataFrame("CollectionTree", "./DAOD_PHYSLITE.33080395._000001.pool.root.1", daskclient=connection))
Then the job crashes as follows:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
/tmp/ipykernel_2552559/3239614451.py in <module>
1 c = ROOT.TCanvas()
2 hist = df_two_muons.Histo1D(("m_mumu","m_mumu",100,0,150000.0), "m_mumu")
----> 3 hist.Draw()
4 c.Draw()
/storage/shared/root_install/lib/DistRDF/Proxy.py in _call_action_result(self, *args, **kwargs)
196 result of the current action node.
197 """
--> 198 return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
199
200 def create_variations(self) -> VariationsProxy:
/storage/shared/root_install/lib/DistRDF/Proxy.py in GetValue(self)
188 returning the value.
189 """
--> 190 execute_graph(self.proxied_node)
191 return self.proxied_node.value
192
/storage/shared/root_install/lib/DistRDF/Proxy.py in execute_graph(node)
55 # All the information needed to reconstruct the computation graph on
56 # the workers is contained in the head node
---> 57 node.get_head().execute_graph()
58
59
/storage/shared/root_install/lib/DistRDF/HeadNode.py in execute_graph(self)
205 # Execute graph distributedly and return the aggregated results from all
206 # tasks
--> 207 returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
208 # Perform any extra checks that may be needed according to the
209 # type of the head node
/storage/shared/root_install/lib/DistRDF/Backends/Dask/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
192 progress(final_results)
193
--> 194 return final_results.compute()
195
196 def distribute_unique_paths(self, paths):
/storage/shared/root_install/lib/DistRDF/Backends/Dask/Backend.py in dask_mapper()
169 Utils.declare_shared_libraries(shared_libs_on_ex)
170
--> 171 return mapper(current_range)
172
173 dmapper = dask.delayed(dask_mapper)
/storage/shared/root_install/lib/DistRDF/Backends/Base.py in distrdf_mapper()
115 mergeables = None
116 except ROOT.std.exception as e:
--> 117 raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
118
119 return TaskResult(mergeables, rdf_plus.entries_in_trees)
RuntimeError: C++ exception thrown:
runtime_error: Template method resolution failed:
ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
runtime_error:
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
All RDF objects that have not run an event loop yet should be considered in an invalid state.