Migrate from PROOF to RDataFrame + Spark

Hi,
I have previously developed a TSelector based simulation code which I ran on small (Clients + 4 Nodes with each one capable of 48 workers) PROOF cluster.
I am trying to figure out ways to migrate to RDataFrame, but I am unable to figure out equivalent of PROOF datasets. PROOF datasets are very crucial to me, as I generate simulated events and save them in PROOF datasets over the cluster. Many of the analysis inputs are then generated from these datasets, on the fly, using ProofDraw methods.
I know little bit of python, and I do not know anything about Spark (or the MapReduce paradigm of RDataFrames). So can someone suggest me with some good beginner’s material for all these ? Also
How much will be the learning curve. The simulation software is fairly big code and I am the sole developer/maintainer.


Please read tips for efficient and successful posting and posting code

ROOT Version: Not Provided
Platform: Not Provided
Compiler: Not Provided


I think @eguiraud can help you.

Hi @Chinmay ,
I don’t know all the ins and outs of PROOF datasets and ProofDraw but we can certainly try to work things out :smiley:

In (distributed) RDF there is no equivalent of PROOF datasets, but depending on what features you need, a simple list of filenames might work: e.g. when writing data from a distributed job, files are produced that you can immediately read into another distributed job.

The ProofDraw technology should not be required, as the filling and merging of histograms happens transparently under the hood.

Learning material: here is the RDataFrame user guide and tutorials (you can directly run them on SWAN by clicking on the orange badge), and here is a tutorial that shows how to run a RDF workload in a distributed fashion.

I don’t think the learning curve will be particularly steep, the one more complicated thing that you might need to learn (if you don’t know already) is how to compile a bunch of C++ functions into a library and then load the library into Python (an explanation can be found here). Of course feel free to just ask questions here in case something in the guide is not clear.

To form an idea of the amount of work the migration would require I would suggest that you take the simplest usecase (like making one histogram out of an already available file) and try to do that in (distributed) RDF. Then you can add more pieces, e.g. producing output files, and see what the challenges are. After a few iterations, and after figuring out how things map from one API into the other, you should have a good understanding of how much work it is to add all the pieces.

Cheers,
Enrico

P.S.
is it ok if I move this question out of the “newbie” category? I think it might be useful to other users.

From the tutorials parallelizing the analysis seems much more painless with RDataFrames. However,since simulation runs are computationally intensive, they are carried only once for each set of operational parameters. So e.g. lets say I have 5 operational parameters (of telescope), I take 100 combinations of these 5 parameter values and then simulate 100 telescope runs one for each of 100 combinations. Each of the 100 simulation runs will generate set of files on distributed job. The features in PROOF which were usefull in this type of work are

  1. Possibility of sending the configuration objects (instances of TObject derived classes) from client node to machines over the cluster , so that initialisations can be done on all the nodes independently
  2. saving and maintaining the ‘catalog’ of outputs generated in each simulation run at master node so that the (distributed) data can be retrieved and processed any time after the simulation run is over.

Sure. I didn’t know where to put this.

I am not sure how all of this translates to a RDF application, but we can probably work it out. In a distributed RDF application you can Snapshot (i.e. write) ROOT TTrees from the worker jobs. @vpadulan can provide more details.

Is there a possibility of organizing’’ the "Snapshot"ed TTrees of worker jobs in Spark DataSets ?
(Sorry if this question doesn’t make any sense. As I said I am trying to learn about spark and RDF and currently my understanding about both is nearly zero).

Dear @Chinmay ,

Distributed RDataFrame aims at being as close to local RDataFrame as possible. Snapshot shows some differences when executed distributedly though, which I will try to explain here.

First off, you need an environment with ROOT 6.24 and pyspark on both the client machine and all the cluster machines (scheduler + workers). This can be set up in SWAN or using cvmfs for example.

Then, you also need to have access to some storage from all the workers. For example a call like
Snapshot("mytree", "/path/to/my/snapshot.root") means that “/path/to/my/snapshot.root” must be a valid path from any worker of your cluster.

Now we can imagine a very simple distributed RDataFrame application with the Snapshot operation:

import ROOT
SparkRDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
# Create a distributed RDataFrame that will run on Spark
df = SparkRDataFrame(100)
# Define a single column of 100 consecutive numbers
df_x = df.Define("x","rdfentry_")
# Snapshot the dataset distributedly
snapdf = df_x.Snapshot("mytree","myfile.root")

If I run this on my laptop (where I have both ROOT 6.24 and pyspark), I will see that the snapshot operation has created two files in the folder where I ran the application:

$: ls 
myfile_0_49.root  myfile_50_99.root

Note that the filenames correspond to the second argument given to the Snapshot operation, plus 2 numbers representing the ranges of entries that are being saved to that particular file.

This derives from the fact that distributed RDataFrame splits the workload of the application along the entries of the dataset in multiple (exclusive) ranges. For our case, we had 100 entries, they were split by default in two ranges of 50 entries each, thus we get two snapshotted files with entries [0,49] and [50,99] respectively.

This behaviour will be the same on a set of distributed resources. Each worker node will get some ranges of entries to process and will make a Snapshot of that range. Note that you can also pass an optional argument to the distributed RDataFrame constructor to signal in how many ranges you would like to split your dataset:

import ROOT
SparkRDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
df = SparkRDataFrame(1000, npartitions=10)

If you call Snapshot on this particular df for example you would save your output tree in 10 separate files.

As for your last question, Spark doesn’t know about the internals of ROOT files or trees. So you cannot make a Spark DataSet/DataFrame out of a snapshotted tree . But in principle RDataFrame should already provide you with the functions to run your analysis, so there shouldn’t be the need to read ROOT data in a Spark DataFrame.

Cheers,
Vincenzo

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.