Hello!
I’m trying to use Spark Cluster in the SWAN system.
On that, PyRDF was chosen for the DataFrame handler.
And it shows an error: TypeError: can not resolve method template call for 'Define'
when I tried to use RDataFrame with Several Define functions.
The same code with ROOT.RDataFrame has no issue so I suspect that this error is related with PyRDF.
The core part of the code is:
files = [
"../data/PbPb5TeV/treno20200703/LHC18qr.root"
]
treeName = "RTree"
listOfFiles = ROOT.vector('string')()
for f in files:
listOfFiles.push_back(f)
df = PyRDF.RDataFrame(treeName, listOfFiles)
df_new = df.Define("npt", "2*pt").Define("apt", "-2*pt")
print(df_new.Count().GetValue())
And the error was… (quite long)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-901a0ff138f6> in <module>
----> 1 print(df_new.Count().GetValue())
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/Proxy.py in GetValue(self)
107 if not self.proxied_node.value: # If event-loop not triggered
108 generator = CallableGenerator(self.proxied_node.get_head())
--> 109 current_backend.execute(generator)
110
111 return self.proxied_node.value
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py in execute(self, generator)
580
581 # Values produced after Map-Reduce
--> 582 values = self.ProcessAndMerge(mapper, reducer)
583 # List of action nodes in the same order as values
584 nodes = generator.get_action_nodes()
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py in ProcessAndMerge(self, mapper, reducer)
111
112 # Map-Reduce using Spark
--> 113 return parallel_collection.map(spark_mapper).treeReduce(reducer)
114
115 def distribute_files(self, includes_list):
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
879 return f(x[0], y[0]), False
880
--> 881 reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
882 if reduced[1]:
883 raise ValueError("Cannot reduce empty RDD.")
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
1003 .values()
1004
-> 1005 return partiallyAggregated.reduce(combOp)
1006
1007 def max(self, key=None):
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in reduce(self, f)
842 yield reduce(f, iterator, initial)
843
--> 844 vals = self.mapPartitions(func).collect()
845 if vals:
846 return reduce(f, vals)
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py in collect(self)
814 """
815 with SCCallSiteSync(self.context) as css:
--> 816 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
817 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
818
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 0.0 failed 4 times, most recent failure: Lost task 18.3 in stage 0.0 (TID 44, p06636710h82992.cern.ch, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 377, in main
process()
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
return f(iterator)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
merger.mergeValues(iterator)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/shuffle.py", line 238, in mergeValues
for k, v in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
for obj in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
for obj in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
return mapper(current_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
output = callable_function(rdf, rdf_range=current_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 133, in mapper
prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 109, in mapper
**operation.kwargs)
TypeError: can not resolve method template call for 'Define'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 377, in main
process()
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 2499, in pipeline_func
return func(split, prev_func(split, iterator))
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 352, in func
return f(iterator)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 1861, in combineLocally
merger.mergeValues(iterator)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/shuffle.py", line 238, in mergeValues
for k, v in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 997, in mapPartition
for obj in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/rdd.py", line 983, in aggregatePartition
for obj in iterator:
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/python/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Spark.py", line 104, in spark_mapper
return mapper(current_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/backend/Dist.py", line 460, in mapper
output = callable_function(rdf, rdf_range=current_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 133, in mapper
prev_vals = mapper(parent_node, node_py=n, rdf_range=rdf_range)
File "/cvmfs/sft.cern.ch/lcg/views/LCG_97python3/x86_64-centos7-gcc8-opt/lib/python3.7/site-packages/PyRDF/CallableGenerator.py", line 109, in mapper
**operation.kwargs)
TypeError: can not resolve method template call for 'Define'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
You can find the full code and error here:
https://cernbox.cern.ch/index.php/s/GP1RYnPRwAkV1Qi
Any Idea on this?
Regards,
Bong-Hwi
ROOT Version: 6.20/02
Platform: SWAN, SLC
Compiler: Not Provided