KeyError while running dask with distRDF

Hello,

We’re trying to use distributed feature of RDF using the wrappers in our python library bamboo. One user has a persistent error which is the following. Do you have any idea on this?

Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 70, in get_total_cores_jobqueuecluster
    return sum(spec["options"]["cores"] for spec in workers_spec.values())
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 70, in <genexpr>
    return sum(spec["options"]["cores"] for spec in workers_spec.values())
KeyError: 'cores'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/afs/cern.ch/work/a/.../HH_Analysis_v3/bamboovenv_v3/bin/bambooRun", line 8, in <module>
    sys.exit(main())
  File "/afs/cern.ch/work/a/.../HH_Analysis_v3/bamboovenv_v3/lib/python3.9/site-packages/bamboo/scripts/bambooRun.py", line 75, in main
    modInst.run()
  File "/afs/cern.ch/work/a/.../HH_Analysis_v3/bamboovenv_v3/lib/python3.9/site-packages/bamboo/analysismodules.py", line 313, in run
    run_notworker(self)
  File "/afs/cern.ch/work/a/.../HH_Analysis_v3/bamboovenv_v3/lib/python3.9/site-packages/bamboo/workflow.py", line 804, in run_notworker
    stats = backend.writeResults(
  File "/afs/cern.ch/work/a/.../HH_Analysis_v3/bamboovenv_v3/lib/python3.9/site-packages/bamboo/dataframebackend.py", line 940, in writeResults
    h.Write()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 198, in _call_action_result
    return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 190, in GetValue
    execute_graph(self.proxied_node)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 57, in execute_graph
    node.get_head().execute_graph()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 190, in execute_graph
    self.npartitions = self.backend.optimize_npartitions()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 112, in optimize_npartitions
    return get_total_cores(self.client)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 85, in get_total_cores
    return get_total_cores_jobqueuecluster(client.cluster)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 72, in get_total_cores_jobqueuecluster
    raise RuntimeError("Could not retrieve the provided worker specification from the Dask cluster object. "
RuntimeError: Could not retrieve the provided worker specification from the Dask cluster object. Please report this as a bug.

Hello @oguz.guzel,

welcome to the forum! Maybe @vpadulan could take a look at this?

Cheers,
Marta

1 Like

Dear @oguz.guzel ,
Thank you for reporting this! This is one of those errors I was waiting for, as it links directly to the dask API and it should not happen. So I would need some more information from your side:

  1. Versions of all the packages you are using (ROOT, dask, distributed)
  2. Platform and compiler
  3. A code example of what you are doing, especially including the dask related setup (e.g. where you are creating LocalCluster, SSHCluster, HTCondorCluster or any other type of dask cluster).

Cheers,
Vincenzo

Dear @vpadulan,
Thanks for your reply ! We’re working on lxplus with LGC103, (packages listed here). Basically, it has

  • ROOT 6.28/00, dask and distributed version 2022.6.1 and python 3.9.12.

A piece of code from the bamboo library is as follows,

RDataFrame = root.ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
self.distDF = RDataFrame(self._tree, daskclient=daskclient, npartitions=nPartitions)

from here , and

from dask_jobqueue import HTCondorCluster
cluster = HTCondorCluster(log_directory=log_dir)

from here.
Hope these help. Please let me know if you need further information.

I can’t really understand this line. How can you setup the connection to the cluster without referring to some specification of the jobs? i.e. core count, RAM, disk etc. like in a traditional HTCondor job description file?

I forgot to add, yes. We use a configuration file for dask, i.e.

# jobqueue.yaml file
jobqueue:
  htcondor:
    processes: 1
    cores: 4
    memory: 3GiB
    disk: 1GiB
    extra:
      - '--worker-port 10000:10100'
    scheduler_options:
      port: 8786
    job-extra:
      +JobFlavour: '"longlunch"'
      should_transfer_files: Yes
      when_to_transfer_output: ON_EXIT
      getenv: True

Hi,

I was also following this since I am also getting this error.
@vpadulan do you have any further insights on this?

Thanks a lot for your help,
Abhisek

Hi,

The problem appears when trying to retrieve a sensible default amount of dataset chunks for the RDataFrame distributed execution. This default is computed by retrieving the “expected” amount of cores the cluster will have available. Unfortunately, the dask interface is quite lacking in this regard. Namely, for simple cluster deployments (e.g. LocalCluster on a single node), it is enough to call client.ncores(). For anything more complex, the only public API available is the worker specification held by the cluster object in cluster.worker_spec.

In the case of HTCondorCluster, if one writes

c = HTCondorCluster(n_workers=1, cores=1, memory="2GiB", disk="1GiB")

or similar, then the c object will present a worker_spec attribute like

{'HTCondorCluster-0': {
  'cls': <class 'dask_jobqueue.htcondor.HTCondorJob'>,
  'options': {
    'cores': 1, 'memory': '2GiB', 'disk': '1GiB', 'config_name': 'htcondor',
    'interface': None, 'protocol': None, 'security': None
}}}

which will allow to compute the “expected” number of cores.

It seems that in the case of using a yaml file, this interface is completely broken. In fact, I placed the jobqueue.yaml file you provided on my machine and then I get an empty dictionary for the worker_spec attribute.

Bottom line, I have to find a more resilient way of retrieving the “expected” amount of cores from the cluster, because the dask API won’t help me much there (as discussed here).

Possible workaround:

Meanwhile, is it perhaps possible that you write the HTCondor configuration in the call to HTCondorCluster directly?

Cheers,
Vincenzo

Hi, we confirm that the workaround you proposed solves our issue :slight_smile: Thanks

1 Like

Approximately when do you think you can provide a solution for that issue ? Just to know.

Let’s ping @vpadulan

Hi @oguz.guzel,

I would say this should be fixed for next ROOT release 6.30, before the end of this year.

Cheers,
Vincenzo

Hi, thanks a lot !

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.