KeyError while running dask with distRDF


We’re trying to use distributed feature of RDF using the wrappers in our python library bamboo. One user has a persistent error which is the following. Do you have any idea on this?

Traceback (most recent call last):
  File "/cvmfs/", line 70, in get_total_cores_jobqueuecluster
    return sum(spec["options"]["cores"] for spec in workers_spec.values())
  File "/cvmfs/", line 70, in <genexpr>
    return sum(spec["options"]["cores"] for spec in workers_spec.values())
KeyError: 'cores'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/afs/", line 8, in <module>
  File "/afs/", line 75, in main
  File "/afs/", line 313, in run
  File "/afs/", line 804, in run_notworker
    stats = backend.writeResults(
  File "/afs/", line 940, in writeResults
  File "/cvmfs/", line 198, in _call_action_result
    return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
  File "/cvmfs/", line 190, in GetValue
  File "/cvmfs/", line 57, in execute_graph
  File "/cvmfs/", line 190, in execute_graph
    self.npartitions = self.backend.optimize_npartitions()
  File "/cvmfs/", line 112, in optimize_npartitions
    return get_total_cores(self.client)
  File "/cvmfs/", line 85, in get_total_cores
    return get_total_cores_jobqueuecluster(client.cluster)
  File "/cvmfs/", line 72, in get_total_cores_jobqueuecluster
    raise RuntimeError("Could not retrieve the provided worker specification from the Dask cluster object. "
RuntimeError: Could not retrieve the provided worker specification from the Dask cluster object. Please report this as a bug.

Hello @oguz.guzel,

welcome to the forum! Maybe @vpadulan could take a look at this?


1 Like

Dear @oguz.guzel ,
Thank you for reporting this! This is one of those errors I was waiting for, as it links directly to the dask API and it should not happen. So I would need some more information from your side:

  1. Versions of all the packages you are using (ROOT, dask, distributed)
  2. Platform and compiler
  3. A code example of what you are doing, especially including the dask related setup (e.g. where you are creating LocalCluster, SSHCluster, HTCondorCluster or any other type of dask cluster).


Dear @vpadulan,
Thanks for your reply ! We’re working on lxplus with LGC103, (packages listed here). Basically, it has

  • ROOT 6.28/00, dask and distributed version 2022.6.1 and python 3.9.12.

A piece of code from the bamboo library is as follows,

RDataFrame = root.ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
self.distDF = RDataFrame(self._tree, daskclient=daskclient, npartitions=nPartitions)

from here , and

from dask_jobqueue import HTCondorCluster
cluster = HTCondorCluster(log_directory=log_dir)

from here.
Hope these help. Please let me know if you need further information.

I can’t really understand this line. How can you setup the connection to the cluster without referring to some specification of the jobs? i.e. core count, RAM, disk etc. like in a traditional HTCondor job description file?

I forgot to add, yes. We use a configuration file for dask, i.e.

# jobqueue.yaml file
    processes: 1
    cores: 4
    memory: 3GiB
    disk: 1GiB
      - '--worker-port 10000:10100'
      port: 8786
      +JobFlavour: '"longlunch"'
      should_transfer_files: Yes
      when_to_transfer_output: ON_EXIT
      getenv: True


I was also following this since I am also getting this error.
@vpadulan do you have any further insights on this?

Thanks a lot for your help,


The problem appears when trying to retrieve a sensible default amount of dataset chunks for the RDataFrame distributed execution. This default is computed by retrieving the “expected” amount of cores the cluster will have available. Unfortunately, the dask interface is quite lacking in this regard. Namely, for simple cluster deployments (e.g. LocalCluster on a single node), it is enough to call client.ncores(). For anything more complex, the only public API available is the worker specification held by the cluster object in cluster.worker_spec.

In the case of HTCondorCluster, if one writes

c = HTCondorCluster(n_workers=1, cores=1, memory="2GiB", disk="1GiB")

or similar, then the c object will present a worker_spec attribute like

{'HTCondorCluster-0': {
  'cls': <class 'dask_jobqueue.htcondor.HTCondorJob'>,
  'options': {
    'cores': 1, 'memory': '2GiB', 'disk': '1GiB', 'config_name': 'htcondor',
    'interface': None, 'protocol': None, 'security': None

which will allow to compute the “expected” number of cores.

It seems that in the case of using a yaml file, this interface is completely broken. In fact, I placed the jobqueue.yaml file you provided on my machine and then I get an empty dictionary for the worker_spec attribute.

Bottom line, I have to find a more resilient way of retrieving the “expected” amount of cores from the cluster, because the dask API won’t help me much there (as discussed here).

Possible workaround:

Meanwhile, is it perhaps possible that you write the HTCondor configuration in the call to HTCondorCluster directly?


Hi, we confirm that the workaround you proposed solves our issue :slight_smile: Thanks

1 Like