RuntimeError while running RDataFrame with Dask HTCondor cluster


Dear experts,

I want to use distributed RDataFrame to perform my analysis using Dask HTCondor cluster.
Unfortunately, I always get an error and the error message is as follows.

File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Backends/Dask/Backend.py", line 171, in dask_mapper
    return mapper(current_range)
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Backends/Base.py", line 117, in distrdf_mapper
    raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
RuntimeError: C++ exception thrown:
        exception: std::exception
2023-11-09 01:50:43,873 - distributed.batched - INFO - Batched Comm Closed <TCP (closed)  local=tcp://202.140.187.218:16063 remote=tcp://202.140.187.218:52252>
Traceback (most recent call last):
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/distributed/comm/tcp.py", line 268, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

The minimum reproducible script is the following. I am attempting to use RDataFrame to read a single CMS NanoAOD file and calculate the sum of nElectron. This approach does not yield the expected results; only the Count function seems to be successful.

from dask.distributed import LocalCluster, Client
from dask_jobqueue import HTCondorCluster
import ROOT

# setup the cluster and client
def create_connection():
    # cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True, memory_limit="2GiB")
    cluster = HTCondorCluster(
        cores=1,
        memory="3000MB",  
        disk="2000MB",
    )
    # Use the scale method to send as many jobs as needed
    cluster.scale(jobs=4)
    client = Client(cluster)
    return client

# create distributed RDF
if __name__ == "__main__":
    DistRDF = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
    df = DistRDF("Events", "/dicos_ui_home/chenghan/data/8d7ce473-38e4-43b3-8878-0dd4c6376b1e.root", daskclient=create_connection(), npartitions=4)
    print(df.Count().GetValue())  # works
    print(df.Sum("nElectron").GetValue()) # dosen't work 

The versions of the related packages are
ROOT: 6.28/04
dask: 2023.10.1
dask_jobqueue: 0.8.2
condor: 9.4.0 Dec 02 2021

Can you give me some suggestions on this?
By the way, everything is fine when changing to use Dask Local cluster.
Hope the information provided above is clear to you.

Thank you!

ROOT Version: 6.28/04
Platform: CentOS 7


Hi,

Can someone give me some ideas on this?
Any suggestion is appreciated.

Thanks!

Dear @Cheng-Han ,

Thank you for posting on the forum! Sorry for the delayed reply, here are a couple of ideas to start the debugging process.

First, by looking at your reproducer above it is not clear to me how you are actually configuring the connection to the HTCondor jobs. Sure, the cores,memory and disk parameters are very important to specify, but usually there is also something like a queueu and other parameters that refer to the connection to the specific deployment which are sort of necessary. If you try an application without RDataFrame to submit a simple Python function to one of the workers spawned via HTCondorCluster does that work?

Second, you supply a path to a file /dicos_ui_home/chenghan/data/8d7ce473-38e4-43b3-8878-0dd4c6376b1e.root1, is this path accessible transparently from all the machines of the HTCondor cluster?

Third, could you try only executing the line with the Sum, without executing the previous line with the Count ?

Cheers,
Vincenzo

Hi @vpadulan,

Thanks for your help!
I followed the tutorial distrdf002_dask_connection.py to create the connection of HTCondor jobs to RDataFrame.
Could you please specify what options should I add to the HTCondorCluster?

First, HTCondorCluster works fine without RDataFrame. The following code gives me the output successfully.

from distributed import Client, LocalCluster
from dask_jobqueue import HTCondorCluster
import ROOT
import dask.array as da

def create_connection():   
    cluster = HTCondorCluster(
        cores=1,
        memory="3000MB",  
        disk="2000MB",
        silence_logs="debug",
        job_extra_directives={
            "log": "dask_job.log",
            "output": "dask_job.out",
            "error": "dask_job.err"
        }
    )
    # Use the scale method to send as many jobs as needed
    cluster.scale(3)
    client = Client(cluster)
    return client

if __name__ == "__main__":
    connection = create_connection()
    x = da.random.random((5000))
    print(x.mean().compute())  # 0.49516575027742377

Second, I have tried to execute the line with the Sum, without executing the previous line with the Count. However, it still failed.
In addition, If the line Count can be executed successfully, this file should be accessible among all the machines, right?
Just in case this is the problem, I changed the file to the open data, which should be available among all of the machines.

root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/ZZTo2e2mu.root

This time it gave me a different error. It seems that this file cannot be opened by ROOT.TFile.Open. However, I can open the file without any problem by executing the ROOT.TFile.Open locally.

File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Ranges.py", line 329, in <genexpr>
    get_clusters_and_entries(treename, filename)
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/DistRDF/Ranges.py", line 165, in get_clusters_and_entries
    with ROOT.TFile.Open(filename, "READ_WITHOUT_GLOBALREGISTRATION") as tfile:
  File "/dicos_ui_home/chenghan/miniconda3/envs/ROOT/lib/python3.10/site-packages/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/ZZTo2e2mu.root

Best,
Cheng-Han

First, HTCondorCluster works fine without RDataFrame. The following code gives me the output successfully.

The snippet you sent me doesn’t really mean that you are connection to any HTCondor job. You should also try to run some computations (without RDataFrame). Try for example with this one to start.

Then, the second error is probably what we are after. For some reason the file cannot be opened on the workers. Since you are not doing client.wait_for_workers after the creation of your cluster object, this probably means the application is starting without any workers connected. So I would solve that first befoere continuing with RDataFrame itself, for now it seems that the baseline situation is not clear enough.

Cheers,
Vincenzo

Thanks for your prompt reply.
I tried the example and made HTCondorCluster connect to the client. It works without any problem.

def create_connection():   
    # cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True, memory_limit="5GiB")
    cluster = HTCondorCluster(
        cores=1,
        memory="3000MB",  
        disk="2000MB",
        job_extra_directives={
            "log": "dask_job.log",
            "output": "dask_job.out",
            "error": "dask_job.err"
        }
    )
    # Use the scale method to send as many jobs as needed
    cluster.scale(3)
    client = Client(cluster)
    client.wait_for_workers(1)
    return client
    
@delayed
def fib(n):
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib(n - 1)  # these calls are delayed
    b = fib(n - 2)
    a, b = compute(a, b)  # execute both in parallel
    return a + b

if __name__ == "__main__":
    # these features require the dask.distributed scheduler
    client = create_connection()

    result = fib(10).compute()
    print(result)  # prints "55"

In addition, even if I request the client to wait for the workers by adding the line below, it still fails to read the open data file.

client.wait_for_workers(1)

Best,
Cheng-Han