Best way to parallelize script involving RDataframe

Dear experts,

I am trying to run a python script that constructs a Spark distributed dataframe (ROOT.RDF.Experimental.Distributed.Spark.RDataFrame) inside a spark cluster (after clicking the star button in SWAN). Here is my script:

  1. function to make a root dataframe (distributed) and ultimately convert it into a pandas dataframe (undistributed)
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
def extract_Df(files,treename,branches):
    rdf = RDataFrame(treename,files,npartitions=15,sparkcontext=sc)
    ndf = pd.DataFrame(rdf_filtered.AsNumpy(branches))
    return ndf ## which is now a pandas dataframe

Why to a pandas df? because I want to compare two dataframes generated by running this function on two different root files. Here I use merge(on=[‘runNumber’]) for pandas df. I couldn’t find an equivalent operation for root dataframe which is why I had to involve pandas df into picture. Is there any such operation?
My subsequent operations are based on that pandas df (obtained after merge()). Basically I create different histograms corresponsing to 300 different weights.

What I want to do?
Parallelize the histogram creation.

Currently I just make a for loop saying for weight in weight_list, process_weight(weight). I allocated 5 executors with 10 cores each and 3g memory per executor but somehow it just utilizes 1 executos and queues up all the weights. I doubt it happens because extract_Df eventually returns a pandas dataframe which is not distributed and hence doesn’t fit into the flavour of spark. I would really appreciate any ideas on best parallelizing my script using spark clusters

Thanks in advance!
Nilima

I guess @vpadulan or @eguiraud can help

Dear @wandering_particle ,

Indeed, there is no equivalent to merging two different ROOT RDataFrames (you can see many related posts, e.g. here. The closest equivalent is producing a merged dataset (i.e. a merged TTree via the AddFriend mechanism) and then wrap the merged dataset with an RDataFrame.

As for how to parallelize even further the distributed execution, one approach could be booking all your RDataFrame graphs upfront and then sending all of them together to the cluster via RunGraphs. An example:


RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs

numpy_dicts = []

for weight in weight_list:
    numpy_dicts.append(RDataFrame(...).AsNumpy(branches, lazy=True))

RunGraphs(numpy_dicts) # Triggers all the computation graphs concurrently

Note the usage of the kwarg lazy=True in the call to AsNumpy which ensures the operation is not triggered immediately at the call site. RunGraphs then takes the list of AsNumpy futures and triggers the corresponding computation graphs concurrently, so they get all executed together in the Spark cluster.

Afterwards, you can create pandas dataframes from the values contained in the list of numpy_dicts.

Cheers,
Vincenzo

Dear @vpadulan ,

Thank you very much for your reposnse and for educating me regarding RunGraphs. I will try out the method you suggested.

Regards,
Nilima.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.