RDataFrame is going distributed!

So you love RDataFrame, but would like to use it on a cluster? We hear you! In fact, we just introduced in ROOT a Python package to enable distributing ROOT RDataFrame workloads to a set of remote resources. This feature is available in experimental phase since the latest ROOT 6.24 release, allowing users to write and run their applications from within the same interface while steering the computations to, for instance, an Apache Spark cluster.


This is a companion discussion topic for the original entry at https://root.cern/blog/distributed-rdataframe/

Hi,

All this is very nice, but I have barely heard of Spark, I think I heard it is used in industry. As far as I know most institutes use HTCondor, Slurm, Lustre or Torque. Can we use any of these workload management systems?

Cheers.

According to documentation, Dask-Jobqueue can use Slurm, HTCondor and such. Otherwise Dask-MPI may be usable to distribute tasks.

Ganga is a tool to compose, run and track computing jobs across a variety of backends and application types.

1 Like

Hi,

Yes, we can use Ganga to submit jobs. I have used Ganga for years, however ganga is not a job management system. Ganga is simply a layer that can be used to communicate with a job management system. i.e.

Ganga → HTCondor → Job

Could you please explain me how Ganga is relevant here? Just getting a link to the Github page of Ganga is not informative enough.

Cheers.

Hi,

Ok, this seems more relevant and probably something that deserves a far more detailed discussion. However what would be even more useful is a set of detailed instructions to make this new feature work with Dask-Jobqueue + HTCondor/Slurm, etc Otherwise no one is going to use this new feature and you will be writting code that no one will use.

One good thing is that I see Dask available through CVMFS:

http://lcginfo.cern.ch/

which is really nice.

Hi,
The experimental distributed RDataFrame Python module aims at making it easy to take an existing RDataFrame application and distribute it to a cluster. The module has interactive workflows in mind (think about running an entire analysis from a Jupyter notebook without needing to wait in a queue or processing partial results from the distributed tasks in a separate step). This kind of workflow is quite common in industry, Apache Spark and Dask are prime examples of software tools in this context. Naturally, it is not completely aligned with the job-queue system proposed by HTCondor or Slurm. While Apache Spark completely neglects that model, Dask tries to be a bit more flexible with its dask-jobqueue module which was mentioned in a previous comment.

That being said, distributed RDataFrame is designed with modularity in mind, so that we can support running on Spark, Dask or really any computing framework, as long as we can use its concrete API to call our abstractions (i.e. calling the RDataFrame graph of computations on a range of entries from the original dataset).

We already have in mind to test this new module with Dask + HTCondor, stay tuned for that :smiley:

Cheers,
Vincenzo

1 Like

Hi,

Thanks for your reply. The problem I normally have (and given that I have been doing Physics analysis for years, probably it’s the problem that many others in my situation have) is transitioning from a piece of code that does the job locally in our laptop to something that scales up to a cluster.

Ideally, we would do something like:

ROOT.EnableCluster('Condor', queue='short')
df.Filter("var>3")

and the analysis would be done in 30 machines in the computing cluster without us having to do anything extra. In practice we have to take care of telling the software:

  1. Setup the environment.
  2. Where is the script to run and wether to copy input files from one place to another.
  3. The names of the log files where the output should go.
  4. Memory requirements, CPU requirements, etc.
  5. Which machine should process what part of the data. E.g. I have 1000 files of different sizes.Machine 1 should take ~30, but the files have different sizes, so some machines would take 100, others 5 files. I have to figure out the way to split things.
  6. Resubmission. If some files were not processed, I need to figure out which ones and resubmit only for those files.
  7. Checking outputs. Make sure that the events after the processing are the same as the ones in the input. If a selection was involved we need to check if the efficiencies make sense.

and:

A. This requires a lot of code and time and effort to be done correctly, our time.
B. We mostly process stuff in computing clusters through the shell. If we process stuff through a web browser in a Jupyter notebook I assume that we will need more software to be installed by whoever manages the computing cluster in our institutes. That person might not be happy to put all the extra work to get that software in place.
C. Working in a shell vs a GUI with a browser has a lot of advantages. Although It might seem daunting at the beginning, as soon as you learn the commands and get through the learning curve, shells provide a lot of flexibility. And although a GUI looks pretty, I would rather use a tool that gets the job done and saves me time.

Finally, remember this. Our goal (yours too) is to do work that has an impact. If you write code that we do not need/cannot use, your code will be forgotten and your work will be wasted.

Cheers.

1 Like

Hi,

Ok, I think I got your point. Probably what you wanted to express by posting that link is that we do not need this new feature for this type of jobs because this is already what Ganga does. I guess I did not get it at the beggining because I have never used Ganga for cluster jobs, but only for jobs in the Grid.

You are probably right here and I should have tried to profit from Ganga also for this type of jobs. I might give it a try in the future.

Cheers.

I was not aware of Ganga, thanks @Wile_E_Coyote! In fact it seems a good candidate for a new RDataFrame backend.

https://doi.org/10.1088/1742-6596/119/7/072021

Yes! Totally! That’s the goal: keep the analysis code exactly the same and switch between single-thread, multi-thread, distributed execution with minimal configuration boilerplate. RDataFrame is the ideal ROOT interface for that because it’s so high-level, so we can switch how the event loop is run under the hood, transparently.

To add my two cents to Vincenzo’s excellent reply:

Why Spark at all? Because users with a CERN account have access to a Spark cluster via SWAN.

What about HTCondor? As mentioned above Dask allows to attach to an existing HTCondor cluster.

Jupyter notebook vs shell submission: distRDF supports both. We keep an eye on making interactive workflows nice (we expect to see a lot of that in the future), but you can just put the exact same code in a Python script and run it as usual.

What about Ganga? We kept interfaces and backends decoupled precisely because we want to be able to address these kind of future extensions if there is demand for it.

I hope this clarifies a few points. Thank you very much for the feedback, that’s exactly why we advertise our experimental features! :slight_smile:

2 Likes