'Define' error in PyRDF

Hello!

I’m trying to use Spark Cluster in the SWAN system.
On that, PyRDF was chosen for the DataFrame handler.
And it shows an error: TypeError: can not resolve method template call for 'Define' when I tried to use RDataFrame with Several Define functions.
The same code with ROOT.RDataFrame has no issue so I suspect that this error is related with PyRDF.

The core part of the code is:

files = [
    "../data/PbPb5TeV/treno20200703/LHC18qr.root"
]
treeName = "RTree"
listOfFiles = ROOT.vector('string')()
for f in files:
    listOfFiles.push_back(f)
df = PyRDF.RDataFrame(treeName, listOfFiles)
df_new = df.Define("npt", "2*pt").Define("apt", "-2*pt")
print(df_new.Count().GetValue())

And the error was… (quite long)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-901a0ff138f6> in <module>
----> 1 print(df_new.Count().GetValue())

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/Proxy.py in GetValue(self)
    107             if not self.proxied_node.value:  # If event-loop not triggered
    108                 generator = CallableGenerator(self.proxied_node.get_head())
--> 109                 current_backend.execute(generator)
    110 
    111         return self.proxied_node.value

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py in execute(self, generator)
    580 
    581         # Values produced after Map-Reduce
--> 582         values = self.ProcessAndMerge(mapper, reducer)
    583         # List of action nodes in the same order as values
    584         nodes = generator.get_action_nodes()

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py in ProcessAndMerge(self, mapper, reducer)
    111 
    112         # Map-Reduce using Spark
--> 113         return parallel_collection.map(spark_mapper).treeReduce(reducer)
    114 
    115     def distribute_files(self, includes_list):

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
    879                 return f(x[0], y[0]), False
    880 
--> 881         reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
    882         if reduced[1]:
    883             raise ValueError("Cannot reduce empty RDD.")

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
   1003                 .values()
   1004 
-> 1005         return partiallyAggregated.reduce(combOp)
   1006 
   1007     def max(self, key=None):

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in reduce(self, f)
    842             yield reduce(f, iterator, initial)
    843 
--> 844         vals = self.mapPartitions(func).collect()
    845         if vals:
    846             return reduce(f, vals)

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in collect(self)
    814         """
    815         with SCCallSiteSync(self.context) as css:
--> 816             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    817         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    818 

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 0.0 failed 4 times, most recent failure: Lost task 18.3 in stage 0.0 (TID 44, p06636710h82992.cern.ch, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 377, in main
    process()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
    output = callable_function(rdf, rdf_range=current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 133, in mapper
    prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 109, in mapper
    **operation.kwargs)
TypeError: can not resolve method template call for 'Define'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 377, in main
    process()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
    output = callable_function(rdf, rdf_range=current_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 133, in mapper
    prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 109, in mapper
    **operation.kwargs)
TypeError: can not resolve method template call for 'Define'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

You can find the full code and error here:
https://cernbox.cern.ch/index.php/s/GP1RYnPRwAkV1Qi

Any Idea on this?

Regards,
Bong-Hwi


ROOT Version: 6.20/02
Platform: SWAN, SLC
Compiler: Not Provided


I think @eguiraud or @etejedor might help you with this

Hi Bong-Hwi,

Our PyRDF expert @vpadulan will be able to help you.

Hello @bellenot @etejedor ,

Thanks for the redirection!

Hi @blim,
Thank you for reporting the issue. Could you tell the SWAN configuration you are using?

Hi @vpadulan,

I’m using SWAN in this configuration:

  • Software stack: 97
  • Platform CentOS 7 (gcc8)
  • Environment script: none
  • Number of cores: 4
  • Memory: 16GB
  • Spark cluster: General Purpose (Analytix)

Thanks @blim,
I think the problem is with the file name you are giving to the dataframe. The remote workers do not have access directly to the local files present on your SWAN driver session, so you should give them a valid handle to a remote file (usually through xrootd or https).
So in your script:

"../data/PbPb5TeV/treno20200703/LHC18qr.root"

Should become something like

"root://eosuser.cern.ch//youruserhome/..../data/PbPb5TeV/treno20200703/LHC18qr.root"

Then I suggest you switch to using K8s Spark cluster since you will get your Kerberos token already forwarded to the workers and shouldn’t incur in any “permission denied” errors. To do so please select the “Bleeding Edge” stack in SWAN. With the latest PyROOT the error you got should be a little more verbose:

TypeError: can not resolve method template call for 'Define'

Becomes

cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: GetBranchNames: error in opening the tree MyTree

Let me know if this helps.
Cheers,
Vincenzo

Thanks for the suggestion.

First, I tried with the stack with 97.
File Path: “root://eosuser.cern.ch//eos/user/b/blim/SWAN_projects/He3-PbPb-2018/data/PbPb5TeV/treno20200703/LHC18qr.root”

but, it looks it can’t access the file with the permission error:

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/Proxy.py:109: UserWarning: No entries in the Tree, falling back to local execution!
  current_backend.execute(generator)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-901a0ff138f6> in <module>
----> 1 print(df_new.Count().GetValue())

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/Proxy.py in GetValue(self)
    107             if not self.proxied_node.value:  # If event-loop not triggered
    108                 generator = CallableGenerator(self.proxied_node.get_head())
--> 109                 current_backend.execute(generator)
    110 
    111         return self.proxied_node.value

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py in execute(self, generator)
    577             PyRDF.use("local")
    578             from .. import current_backend
--> 579             return current_backend.execute(generator)
    580 
    581         # Values produced after Map-Reduce

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Local.py in execute(self, generator)
     61             self.pyroot_rdf = ROOT.ROOT.RDataFrame(*generator.head_node.args)
     62 
---> 63         values = mapper(self.pyroot_rdf)  # Execute the mapper function
     64 
     65         # Get the action nodes in the same order as values

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py in mapper(node_cpp, node_py, rdf_range)
    131             for n in node_py.children:
    132                 # Recurse through children and get their output
--> 133                 prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
    134 
    135                 # Attach the output of the children node

/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py in mapper(node_cpp, node_py, rdf_range)
    107                 else:
    108                     pyroot_node = RDFOperation(*operation.args,
--> 109                                                **operation.kwargs)
    110 
    111                 # The result is a pyroot object which is stored together with

TypeError: can not resolve method template call for 'Define'

Error in <TNetXNGFile::Open>: [ERROR] Server responded with an error: [3010] Unable to give access - user access restricted - unauthorized identity used ; Permission denied
INFO:SparkMonitorKernel:Scala socket closed - empty data
INFO:SparkMonitorKernel:Socket Exiting Client Loop
INFO:SparkMonitorKernel:Starting socket thread, going to accept
Error in <TNetXNGFile::Open>: [ERROR] Server responded with an error: [3010] Unable to give access - user access restricted - unauthorized identity used ; Permission denied
Error in <TNetXNGFile::Open>: [ERROR] Server responded with an error: [3010] Unable to give access - user access restricted - unauthorized identity used ; Permission denied

Is this related to the cluster as you mentioned?

Second, I tried to use the Bleeding Edge with the k8 cluster.
I can’t connect to the cluster:

Error while connecting to Spark cluster

Invalid kube-config file. Expected key token in kube-config/users[name=spark]/user

Maybe I don’t have access permission to the k8 cluster.

Regards,
Bong-Hwi

Hi @blim,
Yes the error you see with the Analytix cluster is because your access token is not forwarded to the workers.

For the K8s cluster you need to request access permission, you can open a snow ticket for IT-DB with your error and they can set you up to use the cluster.

Cheers,
Vincenzo

Thanks, I opened a ticket for the k8 cluster access.
I’ll update once I get the result.

Regards,
Bong-Hwi

Hi @vpadulan,

I tried with the k8 cluster and it still shows an error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-901a0ff138f6> in <module>
----> 1 print(df_new.Count().GetValue())

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/Proxy.py in GetValue(self)
    107             if not self.proxied_node.value:  # If event-loop not triggered
    108                 generator = CallableGenerator(self.proxied_node.get_head())
--> 109                 current_backend.execute(generator)
    110 
    111         return self.proxied_node.value

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py in execute(self, generator)
    580 
    581         # Values produced after Map-Reduce
--> 582         values = self.ProcessAndMerge(mapper, reducer)
    583         # List of action nodes in the same order as values
    584         nodes = generator.get_action_nodes()

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py in ProcessAndMerge(self, mapper, reducer)
    111 
    112         # Map-Reduce using Spark
--> 113         return parallel_collection.map(spark_mapper).treeReduce(reducer)
    114 
    115     def distribute_files(self, includes_list):

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
    879                 return f(x[0], y[0]), False
    880 
--> 881         reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
    882         if reduced[1]:
    883             raise ValueError("Cannot reduce empty RDD.")

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
   1003                 .values()
   1004 
-> 1005         return partiallyAggregated.reduce(combOp)
   1006 
   1007     def max(self, key=None):

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in reduce(self, f)
    842             yield reduce(f, iterator, initial)
    843 
--> 844         vals = self.mapPartitions(func).collect()
    845         if vals:
    846             return reduce(f, vals)

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in collect(self)
    814         """
    815         with SCCallSiteSync(self.context) as css:
--> 816             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    817         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    818 

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 4.0 failed 4 times, most recent failure: Lost task 5.3 in stage 4.0 (TID 92, 10.100.11.75, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
    output = callable_function(rdf, rdf_range=current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 131, in mapper
    prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 107, in mapper
    **operation.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.


	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
    merger.mergeValues(iterator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
    for obj in iterator:
  File "/cvmfs/sft-nightlies.cern.ch/lcg/latest/spark/2.4.6-cern1-7dc07/x86_64-centos7-gcc8-opt/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
    return mapper(current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
    output = callable_function(rdf, rdf_range=current_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 131, in mapper
    prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/Thu/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 107, in mapper
    **operation.kwargs)
cppyy.gbl.std.runtime_error: Template method resolution failed:
  ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.

  ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void> ROOT::RDF::RInterface<ROOT::Detail::RDF::RRange<ROOT::Detail::RDF::RLoopManager>,void>::Define(basic_string_view<char,char_traits<char> > name, basic_string_view<char,char_traits<char> > expression) =>
    runtime_error: 
RDataFrame: An error occurred during just-in-time compilation. The lines above might indicate the cause of the crash
 All RDF objects that have not run an event loop yet should be considered in an invalid state.


	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

I tried with both paths:

  • root://eosuser.cern.ch//eos/user/b/blim/SWAN_projects/He3-PbPb-2018/data/PbPb5TeV/treno20200703/LHC18qr.root
  • /eos/user/b/blim/SWAN_projects/He3-PbPb-2018/data/PbPb5TeV/treno20200703/LHC18qr.root

but their error was same so I suspect that the error would come while getting root file.
You can find the full code here:
https://cernbox.cern.ch/index.php/s/GP1RYnPRwAkV1Qi

Regards,
Bong-Hwi

Dear @blim,
Ok the code now should work so maybe there’s some issue with the file itself. Would you mind sharing with me some sample data on cernbox so that I can try to reproduce your issue ?
Cheers,
Vincenzo

Dear @vpadulan,

Sorry for the delay!
I’ll try to make a sample tree and I hope this will not be forbidden in the rule of our collaboration.

Meanwhile, I could connect and run the job on the k8 cluster with LCG_96bpython3.
Since LCG_97 cannot connect to the k8 cluster, I tried with 96Python3 and it was possible.

The code run was done without a problem on LCG_96.
And If I use BleedingEdge, the script shows and error even I tried with ROOT.RDataFrame.
So I think that the error is related to the version of some software in BleedingEdge.

While I could run some jobs in the k8 cluster, there are some crucial problems still.

  1. The outputs from RDataFrame are not saved.
    The error output is here:
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fATPCcounts) has not been written.
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fADCAxyTPC) has not been written.
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fADCAzTPC) has not been written.
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fMTPCcounts) has not been written.
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fMDCAxyTPC) has not been written.
Error in <TROOT::WriteTObject>: The current directory (PyROOT) is not associated with a file. The object (fMDCAzTPC) has not been written.

From the code:

listOfFiles = ROOT.vector('string')()
for f in files:
    listOfFiles.push_back(f)
df = ROOT.RDataFrame(treeName, listOfFiles)
df_ch1 = df.Define("npt", "2*pt")#.Filter("trackingPID==7")
df_new = df_ch1.Define("apt", "-2*pt").Filter("trackingPID==7")

oFile = ROOT.TFile.Open(outputFilePath, "RECREATE")
listOfHistograms = dict()
listOfDirectories = []
listOfHistogramsAnti = []
listOfDCAxyHistogramsAnti = []
listOfDCAzHistogramsAnti = []
listOfHistogramsMatter = []
listOfDCAxyHistogramsMatter = []
listOfDCAzHistogramsMatter = []

# DATA
particles = df_new.Filter("apt > 0")
antiparticles = df_new.Filter("pt > 0")

for sys in list(dictOfSystematics.keys()):
    tempFolder = f"{BasicInputDir}{sys}"
    print(f"Current Folder: {tempFolder}")
    listOfDirectories.append(oFile.mkdir(tempFolder))
    listOfHistogramsAnti.append(antiparticles.Filter(dictOfSystematics[sys]).Histo3D(("fATPCcounts",";Centrality (%);#it{p}_{T} (GeV/#it{c}); n_{#sigma} d",nCentBins,centBins,nPtBins,pTbins,fSigmaNbins,sigmaBins),"centrality","apt","tpcNsigma"))
    listOfDCAxyHistogramsAnti.append(antiparticles.Filter(dictOfSystematics[sys]).Histo3D(("fADCAxyTPC",";Centrality (%);#it{p}_{T} (GeV/#it{c}); DCA_{xy} (cm)",nCentBins,centBins,nPtBins,pTbins,nDCAbins,dcaBins),"centrality","apt","dcaxy"))
    listOfDCAzHistogramsAnti.append(antiparticles.Filter(dictOfSystematics[sys]).Histo3D(("fADCAzTPC",";Centrality (%);#it{p}_{T} (GeV/#it{c}); DCA_{z} (cm)",nCentBins,centBins,nPtBins,pTbins,fDCAzNbins,dcazBins),"centrality","apt","dcaz"))
    listOfHistogramsMatter.append(particles.Filter(dictOfSystematics[sys]).Histo3D(("fMTPCcounts",";Centrality (%);#it{p}_{T} (GeV/#it{c}); n_{#sigma} d",nCentBins,centBins,nPtBins,pTbins,fSigmaNbins,sigmaBins),"centrality","npt","tpcNsigma"))
    listOfDCAxyHistogramsMatter.append(particles.Filter(dictOfSystematics[sys]).Histo3D(("fMDCAxyTPC",";Centrality (%);#it{p}_{T} (GeV/#it{c}); DCA_{xy} (cm)",nCentBins,centBins,nPtBins,pTbins,nDCAbins,dcaBins),"centrality","npt","dcaxy"))
    listOfDCAzHistogramsMatter.append(particles.Filter(dictOfSystematics[sys]).Histo3D(("fMDCAzTPC",";Centrality (%);#it{p}_{T} (GeV/#it{c}); DCA_{z} (cm)",nCentBins,centBins,nPtBins,pTbins,fDCAzNbins,dcazBins),"centrality","npt","dcaz"))

%%time
for directory,antiTPCsigma,matterTPCsigma,antiDCAxy,antiDCAz,matterDCAxy,matterDCAz in zip(listOfDirectories, listOfHistogramsAnti, listOfHistogramsMatter, listOfDCAxyHistogramsAnti, listOfDCAzHistogramsAnti, listOfDCAxyHistogramsMatter, listOfDCAzHistogramsMatter):
    directory.cd()
    antiTPCsigma.Write()
    antiDCAxy.Write()
    antiDCAz.Write()
    matterTPCsigma.Write()
    matterDCAxy.Write()
    matterDCAz.Write()
    for f in rootfiles:
        for obj in listOfHistogramWantToSave:
            hTemp = f.Get(f"{BasicInputDir}_").FindObject(obj)
            directory.cd()
            hTemp.Write()

print('finished!')
oFile.Close()

So basically, I’m trying to define the histograms first, and trying to save them on each directory.
Full script can be found here: https://cernbox.cern.ch/index.php/s/bADyJKxWjE2CvsH

  1. It’s slower than the process without the Spark.
    I think that this is related to the configuration of my tree or definition of the way how I configure the histogram, but It takes a long time than the traditional job.
    In the link above, you can find that it took an 11 min 35 sec (even though it fails to save)
    If I run without the Spark, it took around 10 min. but this is strange since I’m using more thread to run the same job.
    Could you give me any suggestions to improve the parallel process?

Regards,
Bong-Hwi

Dear @blim,
The error with WriteObject is related to an old bug in PyRDF solved by #85, unfortunately LCG_96 still didn’t have that patch. I suggest you checkout the new LCG_97a configuration on SWAN that should also work with K8s and tell me if you see the same error again.

The execution time can depend on the different configurations you have tried. If you send me more information (e.g. how many threads you are using in the ROOT RDF multithreaded case and how many pods and cores in the PyRDF with Spark on K8s case) it would be helpful. Bear in mind that for small datasets with a relatively short runtime the overhead of starting the pods on the K8s cluster is non-negligible and could contribute to that higher execution time you are experiencing

You can try tweaking the Spark parameters to add more pods and cores by changing the values of spark.executor.instances and spark.executor.cores in the SWAN Spark configuration of your notebook when connecting (docs).

Cheers,
Vincenzo

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.