RDataFrame on Swan-Spark

Dear RDataFrame experts.

I’m trying to run RDF distributed on Swan-Spark.
Swan initialised with /cvmfs/sft.cern.ch/lcg/views/LCG_105a_swan

I have two questions.

  1. how I pass the voms-provy to RDF
  2. how I pass my user library .h with basic functions to RDF.

below more details of the problem encountered

Thanks

Maria

=======================================================================
Regarding 1) I initialise spark like this (after checking with swan experts)


.

I can read remote file from a cell notebook but I encounter this error when I read with RDF.

  File "/cvmfs/sft.cern.ch/lcg/views/LCG_105a_swan/x86_64-el9-gcc13-opt/lib/ROOT/_pythonization/_tfile.py", line 103, in _TFileOpen
    raise OSError('Failed to open file {}'.format(str(args[0])))
OSError: Failed to open file root://xrootd.cmsaf.mit.edu//store/user/paus/nanohr/D02/VBF_HToPhiGamma_M125_TuneCP5_PSWeights_13TeV_powheg_pythia8+RunIISummer20UL18MiniAODv2-106X_upgrade2018_realistic_v16_L1v1-v1+MINIAODSIM/CAAC1BB3-38EA-824C-9E9E-B4A5E2A34CB4.root

note works when reading a dataset in my /eos/user/d/dalfonso/ …

=======================================================================
Regarding 2)

I initialise RDF in this way

def init():
    ROOT.gInterpreter.ProcessLine('#include "/eos/home-d/dalfonso/SWAN_projects/Hrare/JULY_exp/myLibrary.h"')

import pyspark
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame # for CERN
RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs
ROOT.RDF.Experimental.Distributed.initialize(init)
dfINI = RDataFrame("Events", files, sparkcontext=sc, npartitions=NPARTITIONS) # at CERN
sc.addPyFile("/eos/home-d/dalfonso/SWAN_projects/Hrare/JULY_exp/utilsAna.py")
   df = (dfINI.         .Define("myInt","ROOT.CountCharacters('Hello, let me try again today.')"))

with the simple function like this

[dalfonso@jupyter-dalfonso ~]$ cat /eos/home-d/dalfonso/SWAN_projects/Hrare/JULY_exp/myLibrary.h

#include <iostream>
#include <typeinfo>


/// A trivial function
int CountCharacters(const std::string s)
{
   return s.size();
}

I can print the
ROOT.CountCharacters(“Hello, let me try again today.”)
on a notebook shell, but when I have the define of RDF

    return rdf._OriginalDefine(col_name, callable_or_str)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RJittedFilter,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

Dear Mariarosaria,

Let me add in the loop our DistRDF experts, @mczurylo and @vpadulan .

Cheers,
Danilo

Dear @dalfonso,

It’s unfortunate such problems occurred - we’ll make sure the experience becomes smoother. I don’t immediately see anything wrong code-wise in what you have tried.

  1. Regarding the problem with accessing the root file - have you tried using the k8s instead of analytix?

  2. Regarding the second issue with the header file distribution, I have started to investigate the issue and I will update you once I understand the problem better.

Best wishes,
Marta

Dear @dalfonso,

I managed to debug your issue a bit further - the second part, regarding the .h library and the function.

There are two issues we need to tackle, firstly - as the function is in the header file, we first need to make it available to all the workers so your init function and a function to construct and RDF and distribute those files to the workers should look like:

from pathlib import Path
def init():
    localdir = SparkFiles.getRootDirectory()
    lib_path = Path(localdir) / "example_count_characters.h"
    ROOT.gInterpreter.Declare(f'#include "{lib_path}"')

def make_rdf():
    df = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame(10, sparkcontext=sc) # for CERN
    df._headnode.backend.distribute_unique_paths(
            [
                "/eos/user/m/mczurylo/SWAN_projects/example_count_characters.h",
            ]
    )
    return df

then what I did to see that this works:

df = make_rdf()
ROOT.RDF.Experimental.Distributed.initialize(init)
df_define = df.Define("countchar", "CountCharacters(\"hi\")")
histo = df_define.Histo1D(("name", "title", 10, 0, 10), "countchar")
histo.GetEntries()

I would also like to say that we are now finalising a new distRDF interface in which we added new functions so that adding external headers or libraries will be much more straight forward for the users. For the time being though this is the solution to you problem.

The second problem was in this line:

df.Define("countchar", "CountCharacters('hi')")

which should be changed to

df.Define("countchar", "CountCharacters(\"hi\")")

in order for the compiler to infer the correct types (i.e. “hi” needs to be a string).

I run this with Spark Analytix. Since I do not have an access to your private files, I cannot investigate the voms-proxy issue further on my own. I believe this might be more of the SWAN related issue, but we could discuss this offline, in person at CERN for example. I would suggest that you first try the Spark k8s and see if the same issue appears. Another option could be using SWAN-Dask, I can also guide you further with that if you’d like to try it.

Please let me know if you have more questions.

Cheers,
Marta

Dear Marta.
Thanks you for looking at the issue and finding a solution(*).

The df._headnode.backend.distribute_unique_paths uploaded the file to the worker node and could verify in the /tmp/spark-…/userFiles… directory.
And with the order of command you suggested (construct RDF, upload and call init) it worked, at least with the simple .h file with the CountCharacters function.

With the real-physics .h, it worked with the ProcessLine instead of Declare ( otherwise I run in the vector problem reported here .
[*] Hrare/analysis/config/functions.h at main · mariadalfonso/Hrare · GitHub
[**] RVec and std::vector in ROOT 6.30/04

I will try the Spark k8s and SWAN-Dask as well.

Maria

P.S. below I type again my notebook snippet for reference

def initSpark():
    from pathlib import Path
    from pyspark import SparkFiles
    print('loadUserCode.h')
    localdir = SparkFiles.getRootDirectory()
    lib_path = Path(localdir) / "functions.h"
#    lib_path = Path(localdir) / "myLibrary.h"
#    ROOT.gInterpreter.Declare(f'#include "{lib_path}"')
    ROOT.gInterpreter.ProcessLine(f'#include "{lib_path}"')
def makeRDF(files):

.....

       elif AF=="cern-spark":
            df = RDataFrame("Events", files, sparkcontext=sc, npartitions=NPARTITIONS) # at CERN
            df._headnode.backend.distribute_unique_paths(
                [
                    "/eos/user/d/dalfonso/SWAN_projects/Hrare/JULY_exp/config/functions.h",
                    #"/eos/user/d/dalfonso/SWAN_projects/Hrare/JULY_exp/myLibrary.h",                    
                ]
            )
            sc.addPyFile("/eos/user/d/dalfonso/SWAN_projects/Hrare/JULY_exp/utilsAna.py")
            print(sc.environment)
            ROOT.RDF.Experimental.Distributed.initialize(initSpark)

....

Hi Mariarosaria,

Sorry for a bit of a delayed response, but I can see you found the solution in the meantime - yes, ProcessLine is in this case the correct approach.

Please tet me know in case k8s/dask don’t solve your other issue and we can debug together further.

Cheers,
Marta

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.