I am trying to run a python script that constructs a Spark distributed dataframe (ROOT.RDF.Experimental.Distributed.Spark.RDataFrame) inside a spark cluster (after clicking the star button in SWAN). Here is my script:
- function to make a root dataframe (distributed) and ultimately convert it into a pandas dataframe (undistributed)
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame def extract_Df(files,treename,branches): rdf = RDataFrame(treename,files,npartitions=15,sparkcontext=sc) ndf = pd.DataFrame(rdf_filtered.AsNumpy(branches)) return ndf ## which is now a pandas dataframe
Why to a pandas df? because I want to compare two dataframes generated by running this function on two different root files. Here I use merge(on=[‘runNumber’]) for pandas df. I couldn’t find an equivalent operation for root dataframe which is why I had to involve pandas df into picture. Is there any such operation?
My subsequent operations are based on that pandas df (obtained after merge()). Basically I create different histograms corresponsing to 300 different weights.
What I want to do?
Parallelize the histogram creation.
Currently I just make a for loop saying for weight in weight_list, process_weight(weight). I allocated 5 executors with 10 cores each and 3g memory per executor but somehow it just utilizes 1 executos and queues up all the weights. I doubt it happens because extract_Df eventually returns a pandas dataframe which is not distributed and hence doesn’t fit into the flavour of spark. I would really appreciate any ideas on best parallelizing my script using spark clusters
Thanks in advance!