Dear experts,
I want to use distributed RDataFrame to perform my analysis using Dask HTCondor cluster.
Unfortunately, I always get an error and the error message is as follows.
File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Backends/Dask/Backend.py", line 171, in dask_mapper
return mapper(current_range)
File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Backends/Base.py", line 117, in distrdf_mapper
raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
RuntimeError: C++ exception thrown:
exception: std::exception
2023-11-09 01:50:43,873 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) local=tcp://202.140.187.218:16063 remote=tcp://202.140.187.218:52252>
Traceback (most recent call last):
File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
value = future.result()
File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/distributed/comm/tcp.py", line 268, in write
raise CommClosedError()
distributed.comm.core.CommClosedError
The minimum reproducible script is the following. I am attempting to use RDataFrame to read a single CMS NanoAOD file and calculate the sum of nElectron. This approach does not yield the expected results; only the Count
function seems to be successful.
from dask.distributed import LocalCluster, Client
from dask_jobqueue import HTCondorCluster
import ROOT
# setup the cluster and client
def create_connection():
# cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True, memory_limit="2GiB")
cluster = HTCondorCluster(
cores=1,
memory="3000MB",
disk="2000MB",
)
# Use the scale method to send as many jobs as needed
cluster.scale(jobs=4)
client = Client(cluster)
return client
# create distributed RDF
if __name__ == "__main__":
DistRDF = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
df = DistRDF("Events", "/dicos_ui_home/chenghan/data/8d7ce473-38e4-43b3-8878-0dd4c6376b1e.root", daskclient=create_connection(), npartitions=4)
print(df.Count().GetValue()) # works
print(df.Sum("nElectron").GetValue()) # dosen't work
The versions of the related packages are
ROOT: 6.28/04
dask: 2023.10.1
dask_jobqueue: 0.8.2
condor: 9.4.0 Dec 02 2021
Can you give me some suggestions on this?
By the way, everything is fine when changing to use Dask Local cluster.
Hope the information provided above is clear to you.
Thank you!
ROOT Version: 6.28/04
Platform: CentOS 7