Home | News | Documentation | Download

Understanding usage of RDataFrame Filter and Define functions on distributed data


Please read tips for efficient and successful posting and posting code

ROOT Version: 6.22
Platform: Ubuntu 20.04
Compiler: GCC >= 7.


Hi. I want to do something like following :

int main(int argc, char** argv) {
Telescope T ; 
T.Configure(argv) ;  // A function which sets up configuration for this simulation run
ROOT::EnableImplicitMT() ;
ROOT::RDataFrame rdf("tree","file.root",{"ShowerHead","ShowerData"}) ; // ShowerHead and ShowerData are branches which hold some custom class instances 
auto rdf_dataset = rdf.Filter(T.SimulateTrigger(ShowerHead,ShowerData)).Define("OutputData",T.GetOutput()) ;  
// T.SimulateTrigger represents call to SimulateTrigger(ShowerHead&, ShowerData&) method of class 
// Telescope which returns true/false. The same functions stores output in case of successful trigger
// which is retrieved by GetOutput method of class Telescope, which is passed as argument in Define() 
 rdf.Snapshot() ;
}

Is this possible ? Will it be thread safe ??

Is your class itself threadsafe?

It seems like you will instantiate exactly one instance of your class, which will be re-used for each event with the SimulateTriggr and GetOutput methods. Presumably SimulateTrigger takes the inputs to return a boolean decision, but also it calculates/stores some kind of information internally for the same instance to be able to return a result using GetOutput() without that methods getting its own inputs. So in general I wouldn’t expect that to be thread-safe. In that case, I think you need to make some wrappers which can instantiate multiple instances of the class (one per thread that might be called), and then you need to use the Slot-specific versions of Define and such to make calls. In that case, it’ll be something more akin to

//TWrapper.SimulateTrigger needs to take as the first parameter the thread # ("slot"), then the actual values
// and make a call to the thread-specific instance of T.SimulateTrigger.
// Storing a vector of class T would probably work, then the thread number is just the index in the vector of T instances.
//Similarly, TWrapper.GetOutput needs to take as input the thread as its lone argument
auto rdf_dataset = rdf.Filter(TWrapper.SimulateTrigger, {"rdfslot_", "ShowerHead", "ShowerData"}).DefineSlot("OutputData", TWrapper.GetOutput) ;  
//or...
auto rdf_dataset = rdf.Filter(TWrapper.SimulateTrigger, {"rdfslot_", "ShowerHead", "ShowerData"}).Define("OutputData", TWrapper.GetOutput, {"rdfslot_"}) 
1 Like

Thanks for the reply.
How do I get thread numbers and the data of tree which a corresponding thread is
operating on ?
Also how does this approach extend to more than 1 machine ?

Inside RDataFrame there is implicitly defined a few variables, two of which are “rdfentry_” and “rdfslot_”, and if you just print those numbers out in some bit of code, like this:

ROOT::EnableImplicitMT() ;
...
auto df = df.Define("junk", "std::cout << rdfslot_ << std::endl; return 0;");
auto min = df.Min("junk") //don't let the graph prune your computation code before execution
auto c = df.Count();
std::out << c.GetValue() << std::endl; //trigger the loop

you will see them.

You should know that you’re not guaranteed an individual tree per thread, RDF will happily parition a single tree into multiple entry ranges and farm these out to the worker threads.

Another thing is, using this approach, you are working on precisely ONE machine, but one with multiple threads (hopefully with corresponding virtual/physical cores, or else this will not benefit you much!). If you want to use multiple machines, you need to either manually partition your work via some batch system or use the Distributed backends coming online for RDataFrame.

In general, make sure you understand what RDF is doing, because you are setting up a computation graph, and variables ‘inside’ that computation graph aren’t usually accessible ‘outside’ the computation graph. For example, you show a call to T.SimulateTrigger(ShowerHead, ShowerData), but neither of those variables ‘exists’ outside the dataframe where you’re referring to them. For this reason, you need to use one of the appropriate interfaces.
JIT:

auto rdf_dataset = rdf.Filter("T.SimulateTrigger(ShowerHead,ShowerData)")...

or

auto rdf_dataset = rdf.Filter(T.SimulateTrigger, {"ShowerHead","ShowerData"})...

Where in the former the string is ‘inserted into the bottle’ where both the function and its inputs are understood, and in the latter, you tell the function which columns of data to pass as inputs to the function.

Okay. So let me say my TWrapper looks like following .

TWrapper : public TObject {
public: 
std:: vector <Telescope > tels ;

TWrapper (int nworkers, <other args>) {
   for(int i=0; i < nworkers ; ++i) {
       tels.push_back(Telescope()) ; 
       tels.back().Init(<other args>) ;
   }
} ;

Bool_t SimulateTrigger(Long64_t slot, Head, Data) {
   return tels[slot].SimulateTrigger(Head,Data);
   } ;

} ; 

int main (argv) 
{ 
     TWrapper wrapper(nworkers,argv)) ; 

     ROOT::EnableImplicitMT() ; 

     ROOT::RDataFrame rdf("tree", "file.root");

     auto rdf_dataset = 
     rdf.Filter(TWrapper.SimulateTrigger, {"rdfslot_", 
   "ShowerHead","ShowerData"}).Define("OutputData", 
    TWrapper.GetOutput, {"rdfslot_"}) ;

   rdf.Snapshot () ;

}

Is this the right one ??
And if this is okay, what modifications do I need to make it multi machine besides using RDF.Experimental.Spark.RDataFrame ?

Practice makes perfect: have you tried running it? Do you get results that make sense with ImpliticitMT enabled/disabled?

Regarding the distributed systems, someone from the ROOT team will have to comment. They may be best handled from the python side, which means encapsulating your C++ code, distributing headers to spark nodes, etc.

Hi @Chinmay ,
sorry for the high latency, I was off last week.

I agree with everything said so far, and the snippet you shared looks good to me at a first look. The alternative is of course to make the Telescope class itself thread-safe, but that’s likely much trickier.

Regarding distributed execution: the distributed RDataFrame interfaces are Python-only, but you can import the C++ code and use it in RDataFrame string expressions in Defines and Filters as usual. @vpadulan should have an example at hand, let’s ping him :slight_smile: When you have something set up we can help ironing out any remaining details.

Cheers,
Enrico

Hi @Chinmay ,
As written by Enrico, distributed RDataFrame is Python-only. This is in fact needed since the distributed backends (like Spark and Dask) usually have Python APIs. Now, in order to send C++ code to the computing nodes that can be then passed to Define and Filter operations, you have two options:

Initialize function paired with gInterpreter.Declare

The ROOT.gInterpreter.Declare function can be used in any Python script to declare C++ code to the ROOT interpreter in the Python session. The function expectes a string containing the C++ code you want to declare.

The distributed RDataFrame module offers the functionality of having a single function that each computing node will run before starting to execute the RDataFrame computations. The function is
ROOT.RDF.Experimental.Distributed.initialize .

You can pair the above together, and create a function that will be executed once per computing node that just calls gInterpreter.Declare, thus having the ROOT interpreter on each node aware of your C++ code.

This currently looks like

import ROOT

initialize = ROOT.RDF.Experimental.Distributed.initialize
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame

def myfun():
    ROOT.gInterpreter.Declare("""
        int get_int(){
            return 111;
        }
        """)

# Signal function to be called once on each node 
initialize(myfun)

df = RDataFrame(100)
# Function is available in calls to Define or Filter
m = df.Define("x","get_int()").Mean("x")
print(f"Value returned by declared function: {m.GetValue()}")

Distribute C++ headers to computing nodes

Another functionality in distributed RDataFrame is to send a copy of a C++ header files containing functions and variables needed in the analysis to each computing node (see the code). The function expects a path to a single header or a directory containing all the headers you need (or an iterable of paths leading to various headers). The files will be sent to the nodes before the analysis and the headers will be declared before the start of the computations to the ROOT interpreter.
Example:

myheader.hpp

#ifndef myheader
#define myheader

bool check_number_less_than_5(int num){
    // Checks if the input number is less than 5
    return num < 5;
}

#endif // myheader

analysis.py

import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame

df = RDataFrame(100)
# Signal the distributed RDataFrame backend instance that there is a header to be sent
df._headnode.backend.distribute_headers("myheader.hpp")
df_filtered = df.Filter("check_number_less_than_5(rdfentry_)")
histo = df_filtered.Histo1D(("distributeheader","distributeheader",10,0,100), "rdfentry_")
c = ROOT.TCanvas("", "", 600, 600)
histo.Draw()
c.Draw()

Hope this can help to setup your usecase, reach back to us if you have problem or feedback.
Cheers,
Vincenzo

1 Like

Thanks for reply…
The Telescope object comes from a (large) compiled library , the source code of which I used to port to nodes using PROOF mechanisms… I was going through PyRDF documentations… exact mechanism to build library from source on nodes is not clear to me… how can I achieve that in PyRDF ?? Can you suggest some example or link ?? ‘distribute_shared_libraries’ seems to be my option, but I think it requires me to have it installed once on each node. Now it is good option for stable code. But during development phase, having to build the code on each node manually before every test run is cumbersome.
On separate problem, I recently made some modification to Telescope library. It now has 2 classes Electronics and PMT. The Electronics class has a TF1 and list of PMT objects. PMT class has a member which is pointer to TF1. When Electronics object is initiated, TF1 pointer member of each object in its PMT list is assigned to TF1 of Electronics object. PMT uses TF1 member of Electronics to evaluate function while feeding it parameter values from own random number generator . So code looks like following:

Class PMT : public TObject
{
   SetPulseShape(TF1* pulseshape)
    {
        fPulseShape = pulseshape ;
     }
   AddPulse(Float_t time) {
    r = fRanGen.Gaus() ;
    fPulseShape -> SetParameter(0,r) ;
    fPulseShape -> Eval() ;
   }
   TF1 *fPulseShape ;
   TRandom2 fRanGen ;
}

Class Electronics : public TObject 
{
    Configure(const char*) ; // function that sets up electronics
    {
        // read the formula from input files and create TF1
        fPulseShape = new TF1("name","formula",xmin,xmax) ; 
        // create PMT objects and add them to fListOfPMTs
        fListOfPMTs[i].SetPulseShape(fPulseShape) ;
    }
    TF1 *fPulseShape ;
    TObjArray fListOfPMTs ; // PMT objects will be added to this list
}

This whole arrangement works fine as standalone single thread process. As soon as I try to work on PROOF, trying to pass TF1 pointer of Electronics object ( separately created on each thread) to PMT objects causes crash i.e. the SetPulseShape call in Configure function of Electronics causes crash. Strangely, the value of fPulseShape pointer of Electronics object when it is created in Configure is different than what is being passed in subsequent SetPulseShape call.
So my question is, is TF1 thread unsafe or there is something else going on ? Will this problem persist with RDataFrame ?

Re: thread-safety, you at least need to call ROOT::EnableThreadSafety for that to work (see Multi-threading - ROOT ), but even then I think TF1 is not thread-safe (@moneta can confirm/deny). RDataFrame cannot do much to prevent thread-unsafe code from being run from multiple-threads, but it provides a simple mechanism to duplicate the state nThreads times and use a different copy of the thread-unsafe state in each thread, effectively making the application thread-safe (see ROOT: ROOT::RDF::RInterface< Proxied, DataSource > Class Template Reference ).

Cheers,
Enrico

Okay…
So is it like each PMT object needs to have its own TF1 object or
I just can’t use thread-local TF1 objects ?? :fearful: If second is the case, any
possible workaround ?

@moneta can you please clarify whether using different TF1 objects from different threads is safe?

Browsing the source code, it looks like TF1 is thread unsafe … But in my case, TFormula can also do my work … Is TFormula thread safe ??

@vpadulan Is there way to port source code to nodes and build it there and then make it available to threads in PyRDF ?

If you are using a thread local copy of TF1 it should work. You should probably call TF1::AddToGlobalList(false) to avoid having multiple functions with the same name registered in the global list of function, available through gROOT.

Otherwise, you can also share a TF1 between different threads, but in that case you should either use the same set of parameters or pass the parameters to each call of TF1::EvalPar.

Best regards

Lorenzo

Thanks for reply.
I have instantiated all the TF1 using TF1(“name”,“formula”,false) constructor
which in my understanding doesn’t add TF1 to globallist.
In fact having TF1 added to globallist caused crash during root session exit (.q command)
even in sequential run.

Now I have developed TFormula based workaround which I hope will run on PROOF as well as
in RDataFrame.

EDIT:
After @moneta’s reply, on hindsight, I have following doubt :slight_smile:
If my input configuration object contains TF1 object as member and configuration object is send to workers from client using

TProof::AddInput(TObject*)

mechanism. Now in SlaveBegin() of my selector, I create new thread local object from configuration object received in input list can such arrangement cause thread issues ?

Hi,
If you are running using PROOF, you will not run in multi-threads, but in multi-processes.
You need to make sure all objects are correctly streamable by ROOT, in order to communicate between different processes

Lorenzo