RDataFrame & Dask

ROOT Version: 6.28/04
Platform: el8.x86_64
Compiler: GCC 11.2.0

Hello,

I’m trying to set up a simple example using RDataFrame and the Dask plug-in. Without Dask on the same input data the job completes successfully, but with Dask an error message appears which isn’t sufficiently descriptive to allow much debugging. I’d appreciate any advice you can give! The reproduction instructions are below.

Thanks in advance and best wishes,

James Catmore

The following works seamlessly:

import ROOT
df = ROOT.RDataFrame("CollectionTree", "./DAOD_PHYSLITE.33080395._000001.pool.root.1")
df_two_muons = df.Filter("AnalysisMuonsAuxDyn.pt.size() > 1", "At least two muons")
ROOT.gSystem.AddDynamicPath("./commontools/.")
ROOT.gROOT.ProcessLine(".include ./commontools");
ROOT.gInterpreter.AddIncludePath("./commontools");
ROOT.gSystem.Load("./commontools/example_cxx.so")
df_two_muons = df_two_muons.Define("m_mumu", 
                                "ExampleInvariantMass(AnalysisMuonsAuxDyn.pt,AnalysisMuonsAuxDyn.eta,AnalysisMuonsAuxDyn.phi,AnalysisMuonsAuxDyn.charge,105.66)"
                                  )
c = ROOT.TCanvas()
hist = df_two_muons.Histo1D(("m_mumu","m_mumu",100,0,150000.0), "m_mumu")  
hist.Draw()
c.Draw()

The C++ example.cxx and example.h are:

#define helperFunctions_cxx


#include <ROOT/RVec.hxx>
#include "TLorentzVector.h"

using VecF_t = const ROOT::RVec<float>&;
using VecI_t = const ROOT::RVec<int>&;

// Compute the invariant mass for oppositely charged pairs of objects
// Note that this function "sees" one event at a time, so the entries in each vector are for a single event
ROOT::RVec<float> ExampleInvariantMass(VecF_t& pt, VecF_t& eta, VecF_t& phi, VecI_t& charge, float mass) {
  // Construct oppositely charged pairs. 
  // This is done by constructing index pairs where each index is the same length as the vectors
  // and then checking whether they are oppositely charged. Selected index pairs are then used later
  // to select the correct values from the other vectors, to calculate the mass
  std::vector<std::pair<unsigned int, unsigned int> > indices{};
  unsigned int length{(unsigned int)charge.size()};
  for (unsigned int i=0; i<length; ++i) {
    for (unsigned int j=i+1; j<length; ++j) {
      if (charge[i]!=charge[j]) indices.emplace_back(std::make_pair(i,j));
    }
  }
  // Calculate mass for each pair
  ROOT::RVec<float> masses{};
  for (unsigned int k=0; k<indices.size(); ++k) {
    TLorentzVector p1;
    TLorentzVector p2;
    unsigned int index1{indices[k].first};
    unsigned int index2{indices[k].second};
    p1.SetPtEtaPhiM(pt[index1], eta[index1], phi[index1], mass);
    p2.SetPtEtaPhiM(pt[index2], eta[index2], phi[index2], mass);
    masses.push_back((p1 + p2).M());
  }
  return masses;
}
#ifndef example_h
#define example_h

#include "ROOT/RDF/RInterface.hxx"
#include <ROOT/RDataSource.hxx>
#include <ROOT/RCsvDS.hxx>
#include <iostream>
#include "TLorentzVector.h"
#include "TParameter.h"
#include <ROOT/RVec.hxx>
using VecF_t = const ROOT::RVec<float>&;
using VecI_t = const ROOT::RVec<int>&;

VecF_t ExampleInvariantMass(VecF_t& pt, VecF_t& eta, VecF_t& phi, VecI_t& charge, VecF_t& e);
#endif

The input can be found here:

/afs/cern.ch/work/j/catmore/public/DAOD_PHYSLITE.33080395._000001.pool.root.1

Now, I make the following adjustments:

(1) Provide a method to create a Dask connection:

from dask.distributed import LocalCluster, Client
# Point RDataFrame calls to Dask RDataFrame object
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
def create_connection(n_workers):
    cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1, processes=True, memory_limit="5GiB")
    try:
        client = Client(cluster,timeout='2s')
    except TimeoutError:
        pass
    return client

(2) Set up the RDataFrame as follows:

connection = create_connection(64)
df = RDataFrame("CollectionTree", "./DAOD_PHYSLITE.33080395._000001.pool.root.1", daskclient=connection))

Then the job crashes as follows:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
/tmp/ipykernel_2552559/3239614451.py in <module>
      1 c = ROOT.TCanvas()
      2 hist = df_two_muons.Histo1D(("m_mumu","m_mumu",100,0,150000.0), "m_mumu")
----> 3 hist.Draw()
      4 c.Draw()

/storage/shared/root_install/lib/DistRDF/Proxy.py in _call_action_result(self, *args, **kwargs)
    196         result of the current action node.
    197         """
--> 198         return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
    199 
    200     def create_variations(self) -> VariationsProxy:

/storage/shared/root_install/lib/DistRDF/Proxy.py in GetValue(self)
    188         returning the value.
    189         """
--> 190         execute_graph(self.proxied_node)
    191         return self.proxied_node.value
    192 

/storage/shared/root_install/lib/DistRDF/Proxy.py in execute_graph(node)
     55             # All the information needed to reconstruct the computation graph on
     56             # the workers is contained in the head node
---> 57             node.get_head().execute_graph()
     58 
     59 

/storage/shared/root_install/lib/DistRDF/HeadNode.py in execute_graph(self)
    205         # Execute graph distributedly and return the aggregated results from all
    206         # tasks
--> 207         returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
    208         # Perform any extra checks that may be needed according to the
    209         # type of the head node

/storage/shared/root_install/lib/DistRDF/Backends/Dask/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
    192         progress(final_results)
    193 
--> 194         return final_results.compute()
    195 
    196     def distribute_unique_paths(self, paths):

/storage/shared/root_install/lib/DistRDF/Backends/Dask/Backend.py in dask_mapper()
    169             Utils.declare_shared_libraries(shared_libs_on_ex)
    170 
--> 171             return mapper(current_range)
    172 
    173         dmapper = dask.delayed(dask_mapper)

/storage/shared/root_install/lib/DistRDF/Backends/Base.py in distrdf_mapper()
    115             mergeables = None
    116     except ROOT.std.exception as e:
--> 117         raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
    118 
    119     return TaskResult(mergeables, rdf_plus.entries_in_trees)

RuntimeError: C++ exception thrown:
	runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

Hi @jcatmore,

Thanks for reporting this issue; I’m sure @vpadulan can shed some light on the topic.

Cheers,
J.

Dear @jcatmore ,

Sorry for the late reply, taking a look at this now.

The problem lies within these lines. Here you are modifying the include path on your system and then loading a library. This works locally specifically because you only have one process (and one machine). When you turn your execution to a distributed one, then some attention must be paid towards making sure that all the different processes (on the different nodes) use the same environment and libraries.

Practically speaking, you need to make sure that gSystem.Load and the other functions are also called on the different workers of your cluster. In your case, your cluster is LocalCluster, so this is just your machine, but still there are different processes running and they don’t see the same environment unless you tell them to (like you do in the case of a single process).

You are right in that the error thrown is not sufficiently explanatory, a tiny bit of information is given by the line

RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash

Which tells you that the compilation failed (somehow). I would have expected to see also some stacktrace coming from the different processes, that would have told you that you are missing symbols of the ./commontools/example_cxx.so library.

A tentative solution

Now, to give you some practical help, one thing we could try is telling RDataFrame to always initialize every distributed task with some code that runs the loading of the library, we can do this via the initialize function.


initialize = ROOT.RDF.Experimental.Distributed.initialize

def myinit():
  ROOT.gSystem.AddDynamicPath("./commontools/.")
  ROOT.gROOT.ProcessLine(".include ./commontools")
  ROOT.gInterpreter.AddIncludePath("./commontools")
  ROOT.gSystem.Load("./commontools/example_cxx.so")

initialize(myinit)
# proceed as usual

Let me know if it helps,
Vincenzo

Hi @vpadulan ,

many thanks for the clear explanation and solution. It worked perfectly, first time :slight_smile:

Best wishes,

James.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.