Dear ROOT experts,
(Tagging @Pedro_Silva, @vpadulan)
We really enjoy RDataFrame and its many nice features. We are looking into moving our setup for the calibration of CMS HGCal to the distributed RDataFrame via Dask on lxplus. Our current setup creates a “classic” RDataFrame instance from a JSON file with ROOT::RDF::Experimental::FromSpec
, roughly like
import ROOT
rdf = ROOT.RDF.Experimental.FromSpec(specs)
rdf = rdf.DefinePerSample("mymodule", 'rdfsampleinfo_.GetI("module")')
rdf = rdf.Filter("module==mymodule")
graphs = [
rdf.Histo2D(("x", "x vs. y;x;y", (10,0,10), (10,0,10)), "x", "y"),
]
ROOT.RDF.RunGraphs(graphlist)
Could advise us on how we can achieve the same with Dask, please? Does support creating a distributed RDataFrame from a JSON via FromSpec
already exists, and if not, is this a feature we could request, please? This would be really useful! (Related post: “Distributed RDataFrame fromSpec()”)
We have tried several things as a workarounds below, but we hit dead ends.
Thanks!
Izaak
Attempt 1: RDatasetSpec
Following this post, we realized that since ROOT v6.32 via PR #1480 thanks to @gwmyers, one can create a distributed RDataFrame from a RDatasetSpec
, which in turn is created from a JSON. Something like:
def create_rds(spec):
"""Create RDF from JSON spec."""
# https://root.cern.ch/doc/master/RDataFrame_8cxx_source.html#l01729
print(f">>> create_rds: Create RDatasetSpec from {spec}...")
with open(spec,'r') as file:
data = json.load(file)
rds = ROOT.RDF.Experimental.RDatasetSpec()
for sname, sinfo in data['samples'].items():
meta = ROOT.RDF.Experimental.RMetaData()
for key, value in sinfo['metadata'].items():
meta.Add(key,value)
rs = ROOT.RDF.Experimental.RSample(sname,sinfo['trees'],sinfo['files'],meta)
rds.AddSample(rs)
return rds
rds = create_rds("spec.json")
rdf = RDataFrame(rds,daskclient=client) # needs ROOT >= v6.32...
rdf = rdf.DefinePerSample("mymodule", 'rdfsampleinfo_.GetI("module")')
rdf = rdf.Filter("module==mymodule")
graphs = [
rdf.Histo2D(("x", "x vs. y;x;y", (10,0,10), (10,0,10)), "x", "y"),
]
ROOT.RDF.RunGraphs(graphlist)
The above works fine for the “classic” RDataFrame, but for a distributed RDataFrame, we get a RuntimeError
[1] with
logic_error: No key with name xsec in the metadata object.
when we load metadata via DefinePerSample
. Attached is a standalone python script with the minimal reproducible example.
We setup our environment with the following.
. /cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/setup.sh
pip3 install --user dask_lxplus
export PYTHONPATH=~/.local/lib/python3.11/site-packages:$PYTHONPATH
We setup our client for the HTCondor cluster on lxplus as shown in [2]. This runs succesfully with a distributed RDataFrame initiated with just RDataFrame('Events',fnames,daskclient=client)
.
Attempt 2: ChangeSpec
Because I prefer to run our setup in CMSSW 14, which has only ROOT v6.30 at the moment, I naively tried to “sideload” the RDatasetSpec
via ChangeSpec
after creating the distributed RDataFrame, but this failed with a TypeError: Template method resolution failed
[2], probably because the distributed RDataFrame object cannot be casted by ROOT.RDF.AsRNode
like the “classic” RDataFrame:
# create RDataFrame with same files
rdf = RDataFrame('Events',fnames,daskclient=client)
# copy specs from RDatasetSpec
ROOT.Internal.RDF.ChangeSpec(ROOT.RDF.AsRNode(rdf),ROOT.std.move(rds))
Environment was setup with
#. /cvmfs/sft.cern.ch/lcg/views/LCG_105/x86_64-el9-gcc12-opt/setup.sh # ROOT v6.30/02
. /cvmfs/sft.cern.ch/lcg/views/LCG_105c/x86_64-el9-gcc13-opt/setup.sh # ROOT v6.30/08
pip3 install --user dask_lxplus
export PYTHONPATH=~/.local/lib/python3.9/site-packages:$PYTHONPATH
Footnotes
[1]
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
>>> run_rdf: Plotting...
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
[ ] | 0% Completed | 0.4sError in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
cppyy.CPPExcInstance: <cppyy.gbl.std.logic_error object at 0x(nil)>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 291, in <module>
main()
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 287, in main
run_rdf_fromspec(client,nevts)
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 272, in run_rdf_fromspec
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
Error in <TClass::New>: cannot create object of class logic_error
Error in <TBufferFile::ReadObject>: could not create object of class logic_error
fname = f"rdf_fromsspec_{hist.GetName()}.png"
^^^^^^^^^^^^^^
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 201, in _call_action_result
return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
^^^^^^^^^^^^^^^
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 193, in GetValue
execute_graph(self.proxied_node)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Proxy.py", line 58, in execute_graph
node.get_head().execute_graph()
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/HeadNode.py", line 247, in execute_graph
returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Dask/Backend.py", line 205, in ProcessAndMerge
return final_results.compute()
^^^^^^^^^^^^^^^^^^^^^^^
File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.11/site-packages/dask/base.py", line 372, in compute
(result,) = compute(self, traverse=False, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/afs/cern.ch/user/i/ineuteli/.local/lib/python3.11/site-packages/dask/base.py", line 660, in compute
results = schedule(dsk, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Dask/Backend.py", line 157, in dask_mapper
return mapper(current_range)
^^^^^^^^^^^^^^^^^
File "/cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/lib/DistRDF/Backends/Base.py", line 104, in distrdf_mapper
raise RuntimeError(f"C++ exception thrown:\n\t{type(e).__name__}: {e.what()}")
^^^^^^^^^^^^^^^^^
RuntimeError: C++ exception thrown:
logic_error: No key with name category in the metadata object.
[2]
def create_connection(jobs=5):
print(">>> create_connection:")
port = 8786 # only one per lxplus node available
user = os.getenv('USER')
logdir = f"/afs/cern.ch/user/{user[0]}/{user}/test"
outdir = f"/eos/user/{user[0]}/{user}/test"
cluster = CernCluster(
cores=1,
memory='2000MB',
disk='1000MB',
death_timeout='60',
lcg=True,
nanny=False,
container_runtime='none',
log_directory=logdir,
scheduler_options={
'port': port,
'host': socket.gethostname(),
},
job_extra={
'+JobFlavour': '"espresso"',
'+AccountingGroup': '"group_u_CMST3"',
},
worker_command='distributed.cli.dask_worker', # prevent `KeyError: 'worker-command'`
python=sys.executable, # prevent `KeyError: 'python'`
extra=['--worker-port 10000:10100']
)
#cluster.scale(jobs=jobs)
cluster.adapt(minimum=jobs, maximum=50)
indent = ">>> "
print(">>> create_connection: job script")
print(indent+cluster.job_script().replace('\n','\n'+indent))
print(">>> create_connection: create client")
client = Client(cluster)
return cluster, client
[3]
Traceback (most recent call last):
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 292, in <module>
main()
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 288, in main
run_rdf_fromspec(client,nevts)
File "/afs/cern.ch/user/i/ineuteli/hgcal/CMSSW_14_1_0_pre4/src/HGCalCommissioning/LocalCalibration/test_RDF_dask.py", line 246, in run_rdf_fromspec
ROOT.Internal.RDF.ChangeSpec(ROOT.RDF.AsRNode(rdf),ROOT.std.move(rds)) # add RDatasetSpec
^^^^^^^^^^^^^^^^^^^^^
TypeError: Template method resolution failed:
Failed to instantiate "AsRNode(RDataFrame)"
Minimal reproducible example:
test_RDF_dask.py (9.3 KB)
ROOT Version: 6.32.02
Platform: lxplus9
Compiler: g++ (GCC) 13.1.0
In /cvmfs/sft.cern.ch/lcg/views/LCG_106_ATLAS_11/x86_64-el9-gcc13-opt/bin/root