Creating distributed RDataFrame from JSON with FromSpec

Dear ROOT experts,

(Tagging @Pedro_Silva, @vpadulan)

We really enjoy RDataFrame and its many nice features. We are looking into moving our setup for the calibration of CMS HGCal to the distributed RDataFrame via Dask on lxplus. Our current setup creates a “classic” RDataFrame instance from a JSON file with ROOT::RDF::Experimental::FromSpec, roughly like

import ROOT
rdf = ROOT.RDF.Experimental.FromSpec(specs)
rdf = rdf.DefinePerSample("mymodule", 'rdfsampleinfo_.GetI("module")')
rdf = rdf.Filter("module==mymodule")
graphs = [
  rdf.Histo2D(("x", "x vs. y;x;y", (10,0,10), (10,0,10)), "x", "y"),
]
ROOT.RDF.RunGraphs(graphlist)

Could advise us on how we can achieve the same with Dask, please? Does support creating a distributed RDataFrame from a JSON via FromSpec already exists, and if not, is this a feature we could request, please? This would be really useful! (Related post: “Distributed RDataFrame fromSpec()”)

We have tried several things as a workarounds below, but we hit dead ends.

Thanks!
Izaak

Attempt 1: RDatasetSpec

Following this post, we realized that since ROOT v6.32 via PR #1480 thanks to @gwmyers, one can create a distributed RDataFrame from a RDatasetSpec, which in turn is created from a JSON. Something like:

def create_rds(spec):
  """Create RDF from JSON spec."""
  # https://root.cern.ch/doc/master/RDataFrame_8cxx_source.html#l01729
  print(f">>> create_rds: Create RDatasetSpec from {spec}...")
  with open(spec,'r') as file:
    data = json.load(file)
  rds = ROOT.RDF.Experimental.RDatasetSpec()
  for sname, sinfo in data['samples'].items():
    meta = ROOT.RDF.Experimental.RMetaData()
    for key, value in sinfo['metadata'].items():
      meta.Add(key,value)
    rs = ROOT.RDF.Experimental.RSample(sname,sinfo['trees'],sinfo['files'],meta)
    rds.AddSample(rs)
  return rds

rds = create_rds("spec.json")
rdf = RDataFrame(rds,daskclient=client) # needs ROOT >= v6.32...
rdf = rdf.DefinePerSample("mymodule", 'rdfsampleinfo_.GetI("module")')
rdf = rdf.Filter("module==mymodule")
graphs = [
  rdf.Histo2D(("x", "x vs. y;x;y", (10,0,10), (10,0,10)), "x", "y"),
]
ROOT.RDF.RunGraphs(graphlist)

The above works fine for the “classic” RDataFrame, but for a distributed RDataFrame, we get a RuntimeError [1] with

logic_error: No key with name xsec in the metadata object.

when we load metadata via DefinePerSample. Attached is a standalone python script with the minimal reproducible example.

We setup our environment with the following.

. /cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/setup.sh
pip3 install --user dask_lxplus
export PYTHONPATH=~/.local/lib/python3.11/site-packages:$PYTHONPATH

We setup our client for the HTCondor cluster on lxplus as shown in [2]. This runs succesfully with a distributed RDataFrame initiated with just RDataFrame('Events',fnames,daskclient=client).

Attempt 2: ChangeSpec

Because I prefer to run our setup in CMSSW 14, which has only ROOT v6.30 at the moment, I naively tried to “sideload” the RDatasetSpec via ChangeSpec after creating the distributed RDataFrame, but this failed with a TypeError: Template method resolution failed [2], probably because the distributed RDataFrame object cannot be casted by ROOT.RDF.AsRNode like the “classic” RDataFrame:

# create RDataFrame with same files
rdf = RDataFrame('Events',fnames,daskclient=client)

# copy specs from RDatasetSpec
ROOT.Internal.RDF.ChangeSpec(ROOT.RDF.AsRNode(rdf),ROOT.std.move(rds))

Environment was setup with

#. /cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/setup.sh  # ROOT v6.30/02
. /cvmfs/sft.cern.ch/lcg/views/LCG_105c/x86_64-el9-gcc13-opt/setup.sh # ROOT v6.30/08
pip3 install --user dask_lxplus
export PYTHONPATH=~/.local/lib/python3.9/site-packages:$PYTHONPATH

Footnotes

[1]

Error in <TBufferFile::ReadObject>: could not create object of class logic_error
>>> run_rdf: Plotting...
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
[                                        ] | 0% Completed |  0.4sError in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
cppyy.CPPExcInstance: <cppyy.gbl.std.logic_error object at 0x(nil)>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 291, in <module>
    main()
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 287, in main
    run_rdf_fromspec(client,nevts)
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 272, in run_rdf_fromspec
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
    fname = f"rdf_fromsspec_{hist.GetName()}.png"
                             ^^^^^^^^^^^^^^
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 201, in _call_action_result
    return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
                   ^^^^^^^^^^^^^^^
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 193, in GetValue
    execute_graph(self.proxied_node)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 58, in execute_graph
    node.get_head().execute_graph()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/HeadNode.py", line 247, in execute_graph
    returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Dask/Backend.py", line 205, in ProcessAndMerge
    return final_results.compute()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.11/site-packages/dask/base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.11/site-packages/dask/base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Dask/Backend.py", line 157, in dask_mapper
    return mapper(current_range)
  ^^^^^^^^^^^^^^^^^
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Base.py", line 104, in distrdf_mapper
    raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
  ^^^^^^^^^^^^^^^^^
RuntimeError: C++ exception thrown:
	logic_error: No key with name category in the metadata object.

[2]

def create_connection(jobs=5):
  print(">>> create_connection:")
  port = 8786 # only one per lxplus node available
  user = os.getenv('USER')
  logdir = f"/afs/cern.ch/user/{user[0]}/{user}/test"
  outdir = f"/eos/user/{user[0]}/{user}/test"
  cluster = CernCluster(
    cores=1,
    memory='2000MB',
    disk='1000MB',
    death_timeout='60',
    lcg=True,
    nanny=False,
    container_runtime='none',
    log_directory=logdir,
    scheduler_options={
      'port': port,
      'host': socket.gethostname(),
    },
    job_extra={
      '+JobFlavour': '"espresso"',
      '+AccountingGroup': '"group_u_CMST3"',
    },
    worker_command='distributed.cli.dask_worker', # prevent `KeyError: 'worker-command'`
    python=sys.executable, #  prevent `KeyError: 'python'`
    extra=['--worker-port 10000:10100']
  )
  #cluster.scale(jobs=jobs)
  cluster.adapt(minimum=jobs, maximum=50)
  indent = ">>>   "
  print(">>> create_connection: job script")
  print(indent+cluster.job_script().replace('\n','\n'+indent))
  print(">>> create_connection: create client")
  client = Client(cluster)
  return cluster, client

[3]

Traceback (most recent call last):
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 292, in <module>
    main()
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 288, in main
    run_rdf_fromspec(client,nevts)
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 246, in run_rdf_fromspec
    ROOT.Internal.RDF.ChangeSpec(ROOT.RDF.AsRNode(rdf),ROOT.std.move(rds)) # add RDatasetSpec
                                 ^^^^^^^^^^^^^^^^^^^^^
TypeError: Template method resolution failed:
  Failed to instantiate "AsRNode(RDataFrame)"

Minimal reproducible example:

test_RDF_dask.py (9.3 KB)


ROOT Version: 6.32.02
Platform: lxplus9
Compiler: g++ (GCC) 13.1.0

In /cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/bin/root


Hi Izaak,

Thanks for this post, and the work you invested in describing the attempts.
Let me add in the loop @mczurylo , the expert of DistRDF, as well as @StephanH .

Cheers,
Danilo

Hello @IzaakWN,

as you saw, it’s not implemented yet. :sweat_smile:

Distributed RDF understands RDatasetSpec, but it does not propagate the metadata part of the spec into the distributed computation, just the names of trees and files. This is why your attempt #1 failed.

We are trying to put the remedy for that in the plan of work for 2025 by the way, but that’s only being drafted at the moment.

For a similar reason, there is no distributed equivalent of FromSpec yet. One can only manually create the dataset spec and pass it to the RDF constructor, but you saw the limitations of that.

I believe your attempt #2 failed since this is not the C++ object RDataFrame, but it’s a Python facade that cannot be cast to RNode. That’s why the template resolution failed.

Hi @StephanH,

Thank you for the quick reply!

Good to know! :sweat_smile: It seems like it’s planned in the future, so looking forward to the support of metadata & FromSpec in distributed RDataFrame.

Yeah, I figured as much… I brought it to your attention, as it might be handy to cast a distributed RDataFrame with ROOT.RDF.AsRNode to use other “classic” features? I don’t know if that’s planned (or straightforward).

Cheers,
Izaak

P.S.

There were a couple of other snags I ran into. They are probably more of an issue with lxplus/HTCondor/Dask than ROOT/RDataFrame, but I bring them up in case other people run into them as well, or you have feedback.

First, to get CernCluster running in most LCG environments, I had to add some extra keys, to avoid KeyError: 'python' and KeyError: 'worker-command'. Not sure if these are correct settings:

cluster = CernCluster(
    ...
    worker_command='distributed.cli.dask_worker', # prevent `KeyError: 'worker-command'`
    python=sys.executable, #  prevent `KeyError: 'python'`
    ...
  )

Second, is it possible RDataFrame jobs do not create log files from the HTCondor jobs, even is log_directory is set in CernCluster? I wanted to check these for debugging. The cluster.job_script() contains

LogDirectory = /afs/cern.ch/user/i/ineuteli/test
Output = $(LogDirectory)/worker-$F(MY.JobId).out
Error = $(LogDirectory)/worker-$F(MY.JobId).err
Log = $(LogDirectory)/worker-$(ClusterId).log

Third, the HTCondor jobs would fail from local files on AFS with a relative path (e.g. just nano_1.root). It does with an absolute path to AFS (e.g. /afs/cern.ch/user/i/ineuteli/test/nano_1.root) or EOS (e.g. /eos/user/i/ineuteli/test/nano_1.root). The following error occurred when running with a relative paths in RDataFrame('Events',fnames,daskclient=client):

Traceback (most recent call last):
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 292, in <module>
    main()
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 288, in main
    run_rdf_fromspec(client,nevts)
  File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 273, in run_rdf_fromspec
    fname = f"rdf_fromsspec_{hist.GetName()}.png"
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Proxy.py", line 197, in _call_action_result
    return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Proxy.py", line 189, in GetValue
    execute_graph(self.proxied_node)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Proxy.py", line 57, in execute_graph
    node.get_head().execute_graph()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/HeadNode.py", line 236, in execute_graph
    returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Backends/Dask/Backend.py", line 205, in ProcessAndMerge
    return final_results.compute()
  File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.9/site-packages/dask/base.py", line 376, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.9/site-packages/dask/base.py", line 662, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Backends/Dask/Backend.py", line 157, in dask_mapper
    return mapper(current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Backends/Base.py", line 97, in distrdf_mapper
    rdf_plus = build_rdf_from_range(current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/HeadNode.py", line 527, in build_rdf_from_range
    clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Ranges.py", line 349, in get_clustered_range_from_percs
    all_clusters, all_entries = zip(*all_clusters_entries)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Ranges.py", line 346, in <genexpr>
    get_clusters_and_entries(treename, filename)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/DistRDF/Ranges.py", line 180, in get_clusters_and_entries
    with ROOT.TFile.Open(filename, "READ_WITHOUT_GLOBALREGISTRATION") as tfile:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file test_1.root