Dask-distributed RDataFrame on a SlurmCluster, failing

Dear experts,

We are trying to use ROOT.RDF.Experimental.Distributed.Dask.RDataFrame with SlurmCluster. We encountered some issues with this, and would appreciate your assistance in debugging them.

While the computation seems for the most part to finish successfully, as no errors are present in the job logs or elsewhere, something seems to kill the worker(s) prematurely, resulting in an error when executing the script. Here it is,

$ python test.py
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
2024-07-23 06:23:46,988 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-07-23 06:23:47,068 - distributed.scheduler - INFO - State start
2024-07-23 06:23:47,081 - distributed.scheduler - INFO -   Scheduler at:  tcp://18.4.134.162:12831
2024-07-23 06:23:47,082 - distributed.scheduler - INFO -   dashboard at:  http://18.4.134.162:8000/status
2024-07-23 06:23:47,083 - distributed.scheduler - INFO - Registering Worker plugin shuffle
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
2024-07-23 06:23:47,096 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=1
2024-07-23 06:23:48,448 - distributed.scheduler - INFO - Receive client connection: Client-a55b3500-48dd-11ef-abd3-043f72dc3564
2024-07-23 06:23:48,450 - distributed.core - INFO - Starting established connection to tcp://18.4.134.162:48286
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
#!/usr/bin/env bash

#SBATCH -J test1
#SBATCH -A Hrare_Slurm
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=8G
#SBATCH -t 00:30:00
#SBATCH --output=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.out
#SBATCH --error=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.err
#SBATCH --partition=submit,submit-gpu
export DASK_DISTRIBUTED__COMM__ALLOWED_TRANSPORTS=["tcp://[::]:0"]
export XRD_RUNFORKHANDLER=1
export XRD_STREAMTIMEOUT=10
echo "Landed on $HOSTNAME"
export DASK_CONFIG=/home/submit/lavezzo/submit/dask/dask.yamlsource /home/submit/lavezzo/.bashrc
conda activate rootdf2
/work/submit/lavezzo/miniforge3/envs/rootdf2/bin/python -m distributed.cli.dask_worker tcp://18.4.134.162:12831 --name dummy-name --nthreads 1 --memory-limit 7.45GiB --nanny --death-timeout 60

<Client: 'tcp://18.4.134.162:12831' processes=0 threads=0, memory=0 B>
TClass::Init:0: RuntimeWarning: no dictionary for class edm::Hash<1> is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ParameterSetBlob is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessHistory is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessConfiguration is available
TClass::Init:0: RuntimeWarning: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
2024-07-23 06:23:50,877 - distributed.deploy.adaptive - INFO - Retiring workers ['SLURMCluster-0']
2024-07-23 06:23:50,883 - distributed.scheduler - INFO - Retire worker names ('SLURMCluster-0',)
2024-07-23 06:23:50,888 - distributed.scheduler - INFO - Retire worker addresses ('SLURMCluster-0',)
[                                        ] | 0% Completed |  0.2s/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
[                                        ] | 0% Completed | 16.2s2024-07-23 06:24:07,163 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://18.4.134.83:22429', name: SLURMCluster-0, status: init, memory: 0, processing: 0>
2024-07-23 06:24:07,166 - distributed.scheduler - INFO - Starting worker compute stream, tcp://18.4.134.83:22429
2024-07-23 06:24:07,166 - distributed.core - INFO - Starting established connection to tcp://18.4.134.83:56304
2024-07-23 06:24:31,441 - distributed.deploy.adaptive_core - INFO - Adaptive stop
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
2024-07-23 06:24:31,445 - distributed.scheduler - INFO - Retire worker addresses ('SLURMCluster-0',)
2024-07-23 06:24:31,473 - distributed.scheduler - INFO - Scheduler closing due to unknown reason...
2024-07-23 06:24:31,474 - distributed.scheduler - INFO - Scheduler closing all comms
2024-07-23 06:24:31,476 - distributed.core - INFO - Connection to tcp://18.4.134.83:56304 has been closed.
2024-07-23 06:24:31,476 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://18.4.134.83:22429', name: SLURMCluster-0, status: running, memory: 1, processing: 0> (stimulus_id='handle-worker-cleanup-1721730271.476543')
2024-07-23 06:24:31,477 - distributed.scheduler - WARNING - Removing worker 'tcp://18.4.134.83:22429' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'distrdf_reducer-a64d7211-dea1-4037-85a3-6f2df9b62376'} (stimulus_id='handle-worker-cleanup-1721730271.476543')
2024-07-23 06:24:31,486 - distributed.scheduler - INFO - Lost all workers
2024-07-23 06:24:31,505 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://18.4.134.162:12831 remote=tcp://18.4.134.162:48286>
Traceback (most recent call last):
  File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/distributed/comm/tcp.py", line 262, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

The output is still produced, but when scaling up, seems to miss contributions from some of the workers. Presumably, those that were killed ungracefully.

The worker nodes donā€™t contain any errors, as far as I can judge, though you can see from them that the worker was stopped.

2024-07-23 04:17:54,156 - distributed.nanny - INFO -         Start Nanny at: 'tcp://18.4.134.76:29633'
2024-07-23 04:17:56,239 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-6q27qs3h', purging
2024-07-23 04:17:56,240 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-cv2cjabd', purging
2024-07-23 04:17:56,240 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-xinv594r', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-a8s9m9jn', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-i3pgeccy', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-7zkqnby2', purging
2024-07-23 04:17:57,765 - distributed.worker - INFO -       Start worker at:    tcp://18.4.134.76:20323
2024-07-23 04:17:57,765 - distributed.worker - INFO -          Listening to:    tcp://18.4.134.76:20323
2024-07-23 04:17:57,765 - distributed.worker - INFO -           Worker name:             SLURMCluster-9
2024-07-23 04:17:57,765 - distributed.worker - INFO -          dashboard at:          18.4.134.76:26557
2024-07-23 04:17:57,765 - distributed.worker - INFO - Waiting to connect to:   tcp://18.4.134.163:29015
2024-07-23 04:17:57,765 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:57,765 - distributed.worker - INFO -               Threads:                          1
2024-07-23 04:17:57,765 - distributed.worker - INFO -                Memory:                   7.45 GiB
2024-07-23 04:17:57,765 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space-210253/worker-hr_s2lak
2024-07-23 04:17:57,765 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:59,186 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-07-23 04:17:59,187 - distributed.worker - INFO -         Registered to:   tcp://18.4.134.163:29015
2024-07-23 04:17:59,187 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:59,189 - distributed.core - INFO - Starting established connection to tcp://18.4.134.163:29015
Warning in <TClass::Init>: no dictionary for class edm::Hash<1> is available
Warning in <TClass::Init>: no dictionary for class edm::ParameterSetBlob is available
Warning in <TClass::Init>: no dictionary for class edm::ProcessHistory is available
Warning in <TClass::Init>: no dictionary for class edm::ProcessConfiguration is available
Warning in <TClass::Init>: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
2024-07-23 04:18:03,518 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.33s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-07-23 04:18:10,581 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
slurmstepd: error: *** JOB 1518229 ON submit66 CANCELLED AT 2024-07-23T04:18:13 ***
2024-07-23 04:18:13,136 - distributed.worker - INFO - Stopping worker at tcp://18.4.134.76:20323. Reason: scheduler-close

I copy here a minimal version of a script that still reproduces the issue, in full. You can verify by using the LocalCluster that the script itself is fine, and doesnā€™t consume crazy amounts of memory, well within what is request to slurm.

import os
import socket
import time
import ROOT
from distributed import Client
from dask_jobqueue import SLURMCluster
import distributed

def create_remote_connection():

    slurm_env = [
        'export DASK_DISTRIBUTED__COMM__ALLOWED_TRANSPORTS=["tcp://[::]:0"]',
        'export XRD_RUNFORKHANDLER=1',
        'export XRD_STREAMTIMEOUT=10',
        'echo "Landed on $HOSTNAME"',
        'export DASK_CONFIG=dask.yaml',
        f'source {os.getenv("HOME")}/.bashrc',
        f'conda activate myenv',
    ]

    extra_args=[
        "--output=dask_job_output_%j.out",
        "--error=dask_job_output_%j.err",
        "--partition=<partitions>",
    ]


    cluster = SLURMCluster(
        job_name="test",
        cores=1,
        memory='8GB',
        scheduler_options={
            'dashboard_address': 8000,
            'host': socket.gethostname()
        },
        silence_logs="debug",
        job_extra_directives=extra_args,
        job_script_prologue=slurm_env
    )

    cluster.scale(2)
    cluster.adapt(minimum=0, maximum=2)
    client = Client(cluster, heartbeat_interval='5s', timeout='60s')

    print(cluster.job_script())

    return client

def create_local_connection(n_workers):

    from dask.distributed import LocalCluster

    cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1, processes=True, memory_limit="8GB" ,dashboard_address=":0", worker_dashboard_address=":0")
    client = Client(cluster)

    return client

if __name__ == "__main__":

    # with slurm
    client = create_remote_connection()
    # with local
    # client = create_local_connection(2)

    print(client)

    files = [...<some standard nanoaod files>...]
    NPARTITIONS = 10

    RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
    df = RDataFrame("Events", files, daskclient=client, npartitions=NPARTITIONS)

    df_1mu = df.Filter("nMuon == 1", "Events with one muon.")
    #df_1mu = df_1mu.Snapshot('event', 'snapshot.root', ['Muon_pt'])
    h = df_1mu.Histo1D(("Muon_pt", "Muon_pt", 100, 0, 1000), "Muon_pt")

    file = ROOT.TFile("test.root", "RECREATE")
    h.Write()
    file.Close()

The nanoaod files are ~1GB each, and the script fails even when using just one.

I installed a fresh conda environment with conda install root dask dask-jobqueue, which, at the time of writing, results in the following versions,

root==6.32.2 
dask==2024.7.1 
dask-jobqueue==0.8.5
distributed==2024.7.1

The dask.yaml only configures the debugging configuration, in order to get as much output as possible; I copy it here for completeness,

logging:
  version: 1
  disable_existing_loggers: false
  formatters:
    simple:
      format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  handlers:
    console:
      class: logging.StreamHandler
      formatter: simple
      level: DEBUG  # Log all messages at DEBUG level and above
  loggers:
    distributed:
      level: DEBUG  # Increase verbosity for all distributed logs
      handlers: [console]
    distributed.scheduler:
      level: DEBUG  # Specifically increase scheduler logging
      handlers: [console]
    distributed.worker:
      level: DEBUG  # Specifically increase worker logging
      handlers: [console]

Let me know if you have any ideas, they would be greatly appreciated.

Best,
Luca

Hi @lucalavezzo, welcome to the forum!

Let me add @vpadulan in the loop.

Cheers,
Devajith

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.

Dear Luca,

In the meantime the issue has been solved (see Memory issues for Dask Distributed RDataframe with a lot of entries per file? - #4 by vpadulan )

I add in the loop both @vpadulan and @mczurylo to provide additional support if needed.

Cheers,
D

Dear Danilo,

Thank you for your response. I have built the current version of ROOTā€™s master branch on github in a new environment, I am now running with,

python==3.11.0
dask==2024.8.0
dask-jobqueue==0.8.5
distributed==2024.8.0

I am executing the same code that I sent in my previous message, and still see failures. (The only change is that now I source the thisroot.sh file after conda activating the environment.) The post you linked seemed to address memory issues, I am not sure if memory is what is causing the failures here.

The output of the python command:

$ python test.py
2024-08-12 12:47:41,222 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-08-12 12:47:41,231 - distributed.scheduler - INFO - State start
2024-08-12 12:47:41,240 - distributed.scheduler - INFO -   Scheduler at:   tcp://18.4.134.165:6069
2024-08-12 12:47:41,241 - distributed.scheduler - INFO -   dashboard at:  http://18.4.134.165:8000/status
2024-08-12 12:47:41,241 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2024-08-12 12:47:41,252 - distributed.deploy.adaptive_core - INFO - Adaptive scaling started: minimum=0 maximum=2
2024-08-12 12:47:41,297 - distributed.scheduler - INFO - Receive client connection: Client-970b7c1e-58ca-11ef-b7f5-b8cef6003cfc
2024-08-12 12:47:41,299 - distributed.core - INFO - Starting established connection to tcp://18.4.134.165:31608
#!/usr/bin/env bash

#SBATCH -J test1
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=8G
#SBATCH -t 00:30:00
#SBATCH --output=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.out
#SBATCH --error=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.err
#SBATCH --partition=submit,submit-gpu
export DASK_DISTRIBUTED__COMM__ALLOWED_TRANSPORTS=["tcp://[::]:0"]
export XRD_RUNFORKHANDLER=1
export XRD_STREAMTIMEOUT=10
echo "Landed on $HOSTNAME"
export DASK_CONFIG=/home/submit/lavezzo/submit/dask/dask.yaml
source /home/submit/lavezzo/.bashrc
conda activate rootdf3
source /work/submit/lavezzo/root_install/bin/thisroot.sh
echo "which root"
which root
echo "which python"
which python
/work/submit/lavezzo/miniforge3/envs/rootdf3/bin/python -m distributed.cli.dask_worker tcp://18.4.134.165:6069 --name dummy-name --nthreads 1 --memory-limit 7.45GiB --nanny --death-timeout 60

<Client: 'tcp://18.4.134.165:6069' processes=0 threads=0, memory=0 B>
TClass::Init:0: RuntimeWarning: no dictionary for class edm::Hash<1> is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ParameterSetBlob is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessHistory is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessConfiguration is available
TClass::Init:0: RuntimeWarning: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
[                                        ] | 0% Completed |  2.6s2024-08-12 12:47:47,167 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://18.4.134.74:31697', name: SLURMCluster-1, status: init, memory: 0, processing: 0>
2024-08-12 12:47:47,170 - distributed.scheduler - INFO - Starting worker compute stream, tcp://18.4.134.74:31697
2024-08-12 12:47:47,171 - distributed.core - INFO - Starting established connection to tcp://18.4.134.74:4894
2024-08-12 12:47:47,172 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://18.4.134.74:24425', name: SLURMCluster-0, status: init, memory: 0, processing: 0>
2024-08-12 12:47:47,173 - distributed.scheduler - INFO - Starting worker compute stream, tcp://18.4.134.74:24425
2024-08-12 12:47:47,173 - distributed.core - INFO - Starting established connection to tcp://18.4.134.74:4908
2024-08-12 12:48:03,636 - distributed.deploy.adaptive_core - INFO - Adaptive scaling stopped: minimum=0 maximum=2
2024-08-12 12:48:03,638 - distributed.scheduler - INFO - Retire worker addresses ('SLURMCluster-1', 'SLURMCluster-0')
2024-08-12 12:48:03,661 - distributed.scheduler - INFO - Scheduler closing due to unknown reason...
2024-08-12 12:48:03,661 - distributed.scheduler - INFO - Scheduler closing all comms
2024-08-12 12:48:03,663 - distributed.core - INFO - Connection to tcp://18.4.134.74:4894 has been closed.
2024-08-12 12:48:03,663 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://18.4.134.74:31697', name: SLURMCluster-1, status: running, memory: 1, processing: 0> (stimulus_id='handle-worker-cleanup-1723481283.6633608')
2024-08-12 12:48:03,663 - distributed.scheduler - WARNING - Removing worker 'tcp://18.4.134.74:31697' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'distrdf_reducer-0ce9199d-3d16-4523-958b-11c2dba4d868'} (stimulus_id='handle-worker-cleanup-1723481283.6633608')
2024-08-12 12:48:03,670 - distributed.core - INFO - Connection to tcp://18.4.134.74:4908 has been closed.
2024-08-12 12:48:03,671 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://18.4.134.74:24425', name: SLURMCluster-0, status: running, memory: 0, processing: 2> (stimulus_id='handle-worker-cleanup-1723481283.6711125')
2024-08-12 12:48:03,671 - distributed.scheduler - INFO - Lost all workers
2024-08-12 12:48:03,684 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://18.4.134.165:6069 remote=tcp://18.4.134.165:31608>
Traceback (most recent call last):
  File "/work/submit/lavezzo/miniforge3/envs/rootdf3/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/work/submit/lavezzo/miniforge3/envs/rootdf3/lib/python3.11/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/work/submit/lavezzo/miniforge3/envs/rootdf3/lib/python3.11/site-packages/distributed/comm/tcp.py", line 262, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

I note that the progess bar actually reaches 100%, but is overwritten by the next line in the output. This to me indicates that the workers are actually processing the files as expected, and I think indicates a failure in the communicating with the scheduler about shutting down gracefully.

And here is also the logs of one of the workers:

dask_job_output_1604481.txt (141.2 KB)

Thanks again a lot for the help,
Luca

Perhaps of interest: I have ran the same ā€œworkflowā€ with the same files on the SWAN Spark Cluster, using 4g of memory, without any issues. I think that indicates an issue in the specific interplay between all three of Slurm, Dask, and RDF, since Dask on a Local cluster + RDF works, Spark + RDF works.

Hi @lucalavezzo,

thank you for the thorough report and thanks for also trying out your workflow with different setups - this helps a lot to narrow down the problem. We will now do some debugging using your example, I will keep you updated how it is going and in case we need some more inputs from you.

Cheers,
Marta

1 Like

Hi @lucalavezzo,

Iā€™m sorry for the very delayed reply - I got the time to work on your issue now, but to debug it fully, it will be much easier if you could provide some input files as well?

Cheers,
Marta

Hi @mczurylo,

No worries, thank you for following up on this. I was reading any nanoaod files for this script. You can find the specific ones Iā€™m using at the following xrootd address:

xrdfs root://xrootd.cmsaf.mit.edu/ ls /store/user/paus/nanosu/A02/WJetsToLNu_Pt-100To250_MatchEWPDG20_TuneCP5_13TeV-amcatnloFXFX-pythia8+RunIISummer20UL18MiniAODv2-106X_upgrade2018_realistic_v16_L1v1-v1+MINIAODSIM

However, my suspicion is that this has nothing to do with the processing per se, so I assume that swapping out my code for anything else that takes some non-negligible amount of time to run would show the same issue. If I get some time this week I will try that too and let you know.

Thanks again,
Luca

1 Like

Hi @lucalavezzo,

it is indeed a bit hard to exactly figure out what is going on as the error message only says: Scheduler closing due to unknown reason...

So we brainstormed a bit on our side and we have a few extra thoughts/questions:

  • could you try running a slightly modified code where we GetValue of the histogram first, so something like:
h = df_1mu.Histo1D(("Muon_pt", "Muon_pt", 100, 0, 1000), "Muon_pt")
h_val = h.GetValue() 
file = ROOT.TFile("test.root", "RECREATE")
h_val.Write()
file.Close()

and letā€™s see if the same error messages appear

  • we can see in your script that you set up the slurm environment on the workers, but what about the scheduler, is it on the same machine?

  • lastly, have you tried profiling the memory usage of the scheduler during the execution?

We believe this is something rather subtle regarding the specific configuration of the cluster you are using to which we donā€™t have access - are you by any chance based at CERN then we could also meet up in person and try things together?

Sorry for the lack of some more concrete solutions, at least for now.

Cheers,
Marta

1 Like

Dear Marta,

I am not based at CERN unfortunately, I agree that sometimes sitting down together and debugging is the best thingā€¦ I am happy to set up a zoom chat, if itā€™s ever useful, please reach out to me!

Yes, the scheduler is on the same machine, and also runs with the same conda env / root versions, to keep everything consistent. Iā€™ve tried the GetValue(), to no avail. Iā€™ve monitored ā€˜by eyeā€™ (via topā€¦) the memory usage, I can set up something more sophisticated, but it didnā€™t seem to be a spike significantly.

However, Iā€™ve actually found something that is quite strange, though Iā€™m not sure if itā€™s anything interesting. I was messing with the dask logging verbosity via the ~/.config/dask.yaml file (which I understand is automatically loaded by dask, without configuring it), which before, when the error was happening, I did not have at all. I created it, and put in it the following configuration,

logging:
  distributed: warning
  distributed.client: warning
  bokeh: warning

With this, I do not see anymore the Traceback error that I first reported. I am not sure if Iā€™m simply not seeing it because the logging is preventing me from seeing it, though I doubt it, as this logging should still show any warnings and errors, or whether itā€™s actually some aspect of the logging communication that is causing the error itself.

I am very happy to test this on another Slurm set up: indeed, I am operating out of MITā€™s SubMIT cluster, which I help administer, so I can also help understand if itā€™s some issue of Slurm itself, though we have a fairly standard setup. Is there one available at CERN, or elsewhere we can use for debugging whether this is a Slurm/cluster issue?

Thanks,
Luca

Hello all,

I am observing the same issue reported by Luca. In my case, I am also using dask distributed RDF on a Slurm cluster. May I kindly ask if there has been any update on a solution for this problem?

Thank you in advance.

Best,
Jindrich

Hi Jindrich,

Interesting that Iā€™m not the only one seeing this! Out of curiosity, did you toggle the dask logging? For me, it seemed that the logging caused communication errors between the scheduler and the workers.

In particular, I set the file ~/.config/dask.yaml with,

logging:
  distributed: warning
  distributed.client: warning
  bokeh: warning

The other thing I noted is to make sure that the dask workers have enough memory, which can cause issues that arenā€™t properly handled by the scheduler.

Best,
Luca

Hello Luca,

Yes, I can confirm that your solution works for me as well.
Iā€™m not sure if I should consider it a final solution or just a temporary workaround.

Best,
Jindrich

Hi Jindrich,

I believe that this might be an issue with the SlurmCluster in dask-distributed, rather than RDF, but I cannot confirm it. I suppose it is a workaround; Iā€™d be curious if someone gets the chance to dig deeper and find the actual source of the issue.

Best,
Luca