Memory issues for Dask Distributed RDataframe with a lot of entries per file?

Hello,

I wanted to try out the new distributed processing with Dask on Swan as described in https://swan.docs.cern.ch/condor/configure_session/ (apparently I’m not allowed to put it as a link :man_facepalming: ), on my admittedly somewhat unusual case of 40 MHz scouting.

My input files have O(100M) events each, and while the standard RDataframe seems to be capable to process them, the distributed setup crashes with out-of-memory errors, even if I try with a single file.

So, e.g. for a regular DF on a single file of about 315M entries

import ROOT
ROOT.EnableImplicitMT()
d1 = ROOT.RDataFrame("Events", 'root://eoscms.cern.ch/eos/cms/store/cmst3/group/l1tr/gpetrucc/l1scout/2024/zerobias_run382250/v0/l1nano_job1.root')
#  The file is is accessed via xrootd on eoscms.cern.ch, but the security features of this form don't allow me to write it that way path because new users aren't allowed to post links....

I can do

d1.Filter("Sum(L1Mu_pt>18) >= 1").Count().GetValue()

and get the answer, 62339, in about 1 minute which is not bad considering swan has only 4 cores and so it’s running at 1MHz/core.

However if i try to open a dask RDataframe

DRDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
dd1 = DRDataFrame("Events", '/eos/cms/store/cmst3/group/l1tr/gpetrucc/l1scout/2024/zerobias_run382250/v0/l1nano_job1.root')
dd1.Filter("Sum(L1Mu_pt>18) >= 1").Count().GetValue()

then Swan starts reporting memory errors in the form

2024-06-27 14:00:16,433 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:36501 (pid=11453) exceeded 95% memory budget. Restarting...
2024-06-27 14:00:16,624 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:44505 (pid=11521) exceeded 95% memory budget. Restarting...
2024-06-27 14:00:16,634 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:38437 (pid=11474) exceeded 95% memory budget. Restarting...
2024-06-27 14:00:16,726 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:34547 (pid=11497) exceeded 95% memory budget. Restarting...

Any idea?

Thanks,

Giovanni

Hi @gpetruc,

Let me first add @vpadulan in the loop.

Thanks a lot for reporting the issue. I don’t have an immediate solution, but it would be great for us to try and reproduce your case and understand in detail what is happening. Maybe we can schedule a chat some time next week?

Cheers,
Marta

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.

Dear @gpetruc ,

We have just introduced a few improvements to the distributed RDataFrame scheduling that also affect positively the memory usage of the distributed tasks. Could I ask you to retry your analysis on SWAN, selecting the “Bleeding Edge” software stack so you get the latest ROOT master build? Let me know if you can see any improvements, otherwise we will continue the debugging of your specific case.

Thanks!
Vincenzo

Dear @gpetruc ,

I have reopened the post so we can continue the exchange over here. I have the following reproducer that I can run on SWAN (after selecting the “Bleeding Edge” and creating a SWANHTCondorCluster from the UI). For reference I have requested 8 cores.

import ROOT

from distributed import Client

from time import time

DRDF = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
treename = "Events"
filename = "FILENAME"

def run(client):
    begin = time()
    df = DRDF(treename, filename, daskclient=client)
    count = df.Filter("Sum(L1Mu_pt>18) >= 1").Count().GetValue()
    end = time()
    print(f"{count=}")
    print(f"Run took {end - begin} seconds")

if __name__ == "__main__":
    with Client("SCHEDULER_ADDRESS") as client:
        run(client)

I can see this running properly and the result is

[vpadulan@jupyter-vpadulan 60033]$ python repro.py 
TClass::Init:0: RuntimeWarning: no dictionary for class edm::Hash<1> is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ParameterSetBlob is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessHistory is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessConfiguration is available
TClass::Init:0: RuntimeWarning: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
count=401763
Run took 36.57305932044983 seconds

I can also see the workers memory consistently below 1 GiB

What could be the differences between our setups?

Cheers,
Vincenzo

Hi,

Thanks!

So, I confirm that with your code it works, while with mine it was failing also today with the newest root.
The only differences I see are

  • I had from dask.distributed import Client (which is what jupyterlab injects in the notebook) instead of from distributed import Client (I doubt it matters)
  • I had ROOT.EnableImplicitMT() since in the same script I was also running locally for comparison
  • I didn’t have , daskclient = client in the RDataframe constructor.

Dear @gpetruc ,

I didn’t have , daskclient = client in the RDataframe constructor.

Ok so I believe this is the main cause of the issues you saw. Without that extra keyword argument, you are not even using the SWANHTCondorCluster, but you are just asking the distributed RDataFrame to create a default Dask cluster for you. This means practically creating a mock LocalCluster that runs only on the SWAN session, without actually using any of the condor resources. Then the memory issues were probably a mixture of the fact the code is only running on the client SWAN session and not on the workers, together with the fact that EnableImplicitMT makes every task try to run on all the cores of the node with implicit MT enabled. In general, the two modes of operation should be used exclusively.

Cheers,
Vincenzo