Hi @Chinmay ,
As written by Enrico, distributed RDataFrame is Python-only. This is in fact needed since the distributed backends (like Spark and Dask) usually have Python APIs. Now, in order to send C++ code to the computing nodes that can be then passed to Define
and Filter
operations, you have two options:
Initialize function paired with gInterpreter.Declare
The ROOT.gInterpreter.Declare
function can be used in any Python script to declare C++ code to the ROOT interpreter in the Python session. The function expectes a string containing the C++ code you want to declare.
The distributed RDataFrame module offers the functionality of having a single function that each computing node will run before starting to execute the RDataFrame computations. The function is
ROOT.RDF.Experimental.Distributed.initialize .
You can pair the above together, and create a function that will be executed once per computing node that just calls gInterpreter.Declare
, thus having the ROOT interpreter on each node aware of your C++ code.
This currently looks like
import ROOT
initialize = ROOT.RDF.Experimental.Distributed.initialize
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
def myfun():
ROOT.gInterpreter.Declare("""
int get_int(){
return 111;
}
""")
# Signal function to be called once on each node
initialize(myfun)
df = RDataFrame(100)
# Function is available in calls to Define or Filter
m = df.Define("x","get_int()").Mean("x")
print(f"Value returned by declared function: {m.GetValue()}")
Distribute C++ headers to computing nodes
Another functionality in distributed RDataFrame is to send a copy of a C++ header files containing functions and variables needed in the analysis to each computing node (see the code). The function expects a path to a single header or a directory containing all the headers you need (or an iterable of paths leading to various headers). The files will be sent to the nodes before the analysis and the headers will be declared before the start of the computations to the ROOT interpreter.
Example:
myheader.hpp
#ifndef myheader
#define myheader
bool check_number_less_than_5(int num){
// Checks if the input number is less than 5
return num < 5;
}
#endif // myheader
analysis.py
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
df = RDataFrame(100)
# Signal the distributed RDataFrame backend instance that there is a header to be sent
df._headnode.backend.distribute_headers("myheader.hpp")
df_filtered = df.Filter("check_number_less_than_5(rdfentry_)")
histo = df_filtered.Histo1D(("distributeheader","distributeheader",10,0,100), "rdfentry_")
c = ROOT.TCanvas("", "", 600, 600)
histo.Draw()
c.Draw()
Hope this can help to setup your usecase, reach back to us if you have problem or feedback.
Cheers,
Vincenzo