Dear experts,
We are trying to use ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
with SlurmCluster
. We encountered some issues with this, and would appreciate your assistance in debugging them.
While the computation seems for the most part to finish successfully, as no errors are present in the job logs or elsewhere, something seems to kill the worker(s) prematurely, resulting in an error when executing the script. Here it is,
$ python test.py
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
2024-07-23 06:23:46,988 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-07-23 06:23:47,068 - distributed.scheduler - INFO - State start
2024-07-23 06:23:47,081 - distributed.scheduler - INFO - Scheduler at: tcp://18.4.134.162:12831
2024-07-23 06:23:47,082 - distributed.scheduler - INFO - dashboard at: http://18.4.134.162:8000/status
2024-07-23 06:23:47,083 - distributed.scheduler - INFO - Registering Worker plugin shuffle
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
2024-07-23 06:23:47,096 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=1
2024-07-23 06:23:48,448 - distributed.scheduler - INFO - Receive client connection: Client-a55b3500-48dd-11ef-abd3-043f72dc3564
2024-07-23 06:23:48,450 - distributed.core - INFO - Starting established connection to tcp://18.4.134.162:48286
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
#!/usr/bin/env bash
#SBATCH -J test1
#SBATCH -A Hrare_Slurm
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=8G
#SBATCH -t 00:30:00
#SBATCH --output=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.out
#SBATCH --error=/home/submit/lavezzo/submit/dask/logs/dask_job_output_%j.err
#SBATCH --partition=submit,submit-gpu
export DASK_DISTRIBUTED__COMM__ALLOWED_TRANSPORTS=["tcp://[::]:0"]
export XRD_RUNFORKHANDLER=1
export XRD_STREAMTIMEOUT=10
echo "Landed on $HOSTNAME"
export DASK_CONFIG=/home/submit/lavezzo/submit/dask/dask.yamlsource /home/submit/lavezzo/.bashrc
conda activate rootdf2
/work/submit/lavezzo/miniforge3/envs/rootdf2/bin/python -m distributed.cli.dask_worker tcp://18.4.134.162:12831 --name dummy-name --nthreads 1 --memory-limit 7.45GiB --nanny --death-timeout 60
<Client: 'tcp://18.4.134.162:12831' processes=0 threads=0, memory=0 B>
TClass::Init:0: RuntimeWarning: no dictionary for class edm::Hash<1> is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ParameterSetBlob is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessHistory is available
TClass::Init:0: RuntimeWarning: no dictionary for class edm::ProcessConfiguration is available
TClass::Init:0: RuntimeWarning: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
2024-07-23 06:23:50,877 - distributed.deploy.adaptive - INFO - Retiring workers ['SLURMCluster-0']
2024-07-23 06:23:50,883 - distributed.scheduler - INFO - Retire worker names ('SLURMCluster-0',)
2024-07-23 06:23:50,888 - distributed.scheduler - INFO - Retire worker addresses ('SLURMCluster-0',)
[ ] | 0% Completed | 0.2s/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
[ ] | 0% Completed | 16.2s2024-07-23 06:24:07,163 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://18.4.134.83:22429', name: SLURMCluster-0, status: init, memory: 0, processing: 0>
2024-07-23 06:24:07,166 - distributed.scheduler - INFO - Starting worker compute stream, tcp://18.4.134.83:22429
2024-07-23 06:24:07,166 - distributed.core - INFO - Starting established connection to tcp://18.4.134.83:56304
2024-07-23 06:24:31,441 - distributed.deploy.adaptive_core - INFO - Adaptive stop
/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
2024-07-23 06:24:31,445 - distributed.scheduler - INFO - Retire worker addresses ('SLURMCluster-0',)
2024-07-23 06:24:31,473 - distributed.scheduler - INFO - Scheduler closing due to unknown reason...
2024-07-23 06:24:31,474 - distributed.scheduler - INFO - Scheduler closing all comms
2024-07-23 06:24:31,476 - distributed.core - INFO - Connection to tcp://18.4.134.83:56304 has been closed.
2024-07-23 06:24:31,476 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://18.4.134.83:22429', name: SLURMCluster-0, status: running, memory: 1, processing: 0> (stimulus_id='handle-worker-cleanup-1721730271.476543')
2024-07-23 06:24:31,477 - distributed.scheduler - WARNING - Removing worker 'tcp://18.4.134.83:22429' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {'distrdf_reducer-a64d7211-dea1-4037-85a3-6f2df9b62376'} (stimulus_id='handle-worker-cleanup-1721730271.476543')
2024-07-23 06:24:31,486 - distributed.scheduler - INFO - Lost all workers
2024-07-23 06:24:31,505 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Scheduler->Client local=tcp://18.4.134.162:12831 remote=tcp://18.4.134.162:48286>
Traceback (most recent call last):
File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/work/submit/lavezzo/miniforge3/envs/rootdf2/lib/python3.12/site-packages/distributed/comm/tcp.py", line 262, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
The output is still produced, but when scaling up, seems to miss contributions from some of the workers. Presumably, those that were killed ungracefully.
The worker nodes donāt contain any errors, as far as I can judge, though you can see from them that the worker was stopped.
2024-07-23 04:17:54,156 - distributed.nanny - INFO - Start Nanny at: 'tcp://18.4.134.76:29633'
2024-07-23 04:17:56,239 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-6q27qs3h', purging
2024-07-23 04:17:56,240 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-cv2cjabd', purging
2024-07-23 04:17:56,240 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-xinv594r', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-a8s9m9jn', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-i3pgeccy', purging
2024-07-23 04:17:56,241 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-210253/worker-7zkqnby2', purging
2024-07-23 04:17:57,765 - distributed.worker - INFO - Start worker at: tcp://18.4.134.76:20323
2024-07-23 04:17:57,765 - distributed.worker - INFO - Listening to: tcp://18.4.134.76:20323
2024-07-23 04:17:57,765 - distributed.worker - INFO - Worker name: SLURMCluster-9
2024-07-23 04:17:57,765 - distributed.worker - INFO - dashboard at: 18.4.134.76:26557
2024-07-23 04:17:57,765 - distributed.worker - INFO - Waiting to connect to: tcp://18.4.134.163:29015
2024-07-23 04:17:57,765 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:57,765 - distributed.worker - INFO - Threads: 1
2024-07-23 04:17:57,765 - distributed.worker - INFO - Memory: 7.45 GiB
2024-07-23 04:17:57,765 - distributed.worker - INFO - Local Directory: /tmp/dask-scratch-space-210253/worker-hr_s2lak
2024-07-23 04:17:57,765 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:59,186 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-07-23 04:17:59,187 - distributed.worker - INFO - Registered to: tcp://18.4.134.163:29015
2024-07-23 04:17:59,187 - distributed.worker - INFO - -------------------------------------------------
2024-07-23 04:17:59,189 - distributed.core - INFO - Starting established connection to tcp://18.4.134.163:29015
Warning in <TClass::Init>: no dictionary for class edm::Hash<1> is available
Warning in <TClass::Init>: no dictionary for class edm::ParameterSetBlob is available
Warning in <TClass::Init>: no dictionary for class edm::ProcessHistory is available
Warning in <TClass::Init>: no dictionary for class edm::ProcessConfiguration is available
Warning in <TClass::Init>: no dictionary for class pair<edm::Hash<1>,edm::ParameterSetBlob> is available
2024-07-23 04:18:03,518 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.33s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-07-23 04:18:10,581 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.96s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
slurmstepd: error: *** JOB 1518229 ON submit66 CANCELLED AT 2024-07-23T04:18:13 ***
2024-07-23 04:18:13,136 - distributed.worker - INFO - Stopping worker at tcp://18.4.134.76:20323. Reason: scheduler-close
I copy here a minimal version of a script that still reproduces the issue, in full. You can verify by using the LocalCluster that the script itself is fine, and doesnāt consume crazy amounts of memory, well within what is request to slurm.
import os
import socket
import time
import ROOT
from distributed import Client
from dask_jobqueue import SLURMCluster
import distributed
def create_remote_connection():
slurm_env = [
'export DASK_DISTRIBUTED__COMM__ALLOWED_TRANSPORTS=["tcp://[::]:0"]',
'export XRD_RUNFORKHANDLER=1',
'export XRD_STREAMTIMEOUT=10',
'echo "Landed on $HOSTNAME"',
'export DASK_CONFIG=dask.yaml',
f'source {os.getenv("HOME")}/.bashrc',
f'conda activate myenv',
]
extra_args=[
"--output=dask_job_output_%j.out",
"--error=dask_job_output_%j.err",
"--partition=<partitions>",
]
cluster = SLURMCluster(
job_name="test",
cores=1,
memory='8GB',
scheduler_options={
'dashboard_address': 8000,
'host': socket.gethostname()
},
silence_logs="debug",
job_extra_directives=extra_args,
job_script_prologue=slurm_env
)
cluster.scale(2)
cluster.adapt(minimum=0, maximum=2)
client = Client(cluster, heartbeat_interval='5s', timeout='60s')
print(cluster.job_script())
return client
def create_local_connection(n_workers):
from dask.distributed import LocalCluster
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1, processes=True, memory_limit="8GB" ,dashboard_address=":0", worker_dashboard_address=":0")
client = Client(cluster)
return client
if __name__ == "__main__":
# with slurm
client = create_remote_connection()
# with local
# client = create_local_connection(2)
print(client)
files = [...<some standard nanoaod files>...]
NPARTITIONS = 10
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
df = RDataFrame("Events", files, daskclient=client, npartitions=NPARTITIONS)
df_1mu = df.Filter("nMuon == 1", "Events with one muon.")
#df_1mu = df_1mu.Snapshot('event', 'snapshot.root', ['Muon_pt'])
h = df_1mu.Histo1D(("Muon_pt", "Muon_pt", 100, 0, 1000), "Muon_pt")
file = ROOT.TFile("test.root", "RECREATE")
h.Write()
file.Close()
The nanoaod files are ~1GB each, and the script fails even when using just one.
I installed a fresh conda environment with conda install root dask dask-jobqueue
, which, at the time of writing, results in the following versions,
root==6.32.2
dask==2024.7.1
dask-jobqueue==0.8.5
distributed==2024.7.1
The dask.yaml
only configures the debugging configuration, in order to get as much output as possible; I copy it here for completeness,
logging:
version: 1
disable_existing_loggers: false
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
formatter: simple
level: DEBUG # Log all messages at DEBUG level and above
loggers:
distributed:
level: DEBUG # Increase verbosity for all distributed logs
handlers: [console]
distributed.scheduler:
level: DEBUG # Specifically increase scheduler logging
handlers: [console]
distributed.worker:
level: DEBUG # Specifically increase worker logging
handlers: [console]
Let me know if you have any ideas, they would be greatly appreciated.
Best,
Luca