Issue with RDataFrame using Spark Cluster on LCG 102 and Higher

Hello everyone,

I wanted to expand on a previous conversation I had privately with @vpadulan, which relates to an issue highlighted earlier titled “Error while using RDataFrame with spark cluster (analytix)”.

While working with RDataFrame on Spark, I encountered a significant error. For clarity, I reproduced the issue by running the standard RDataFrame demo workload, as per the documentation: distrdf001_spark_connection.py.

Interestingly, when I tested this on SWAN using LCG 101, everything functioned smoothly. However, starting from LCG 102 and for subsequent versions, I consistently receive an error stack.

Would appreciate any insights or solutions.

Warm regards,
Luca

1 Like

Welcome to the ROOT Forum!
I’m sure @vpadulan will be able to help. Maybe you could attach the error(s) you got?

Hi,

as mentioned the error is fully reproducible using demo code from the RDataframe documentation and the LCG102 or higher release.

For convenience I paste below the error stack.

Best,
Luca

Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_596/2414637524.py in <module>
      3 c.Divide(2, 1)
      4 c.cd(1)
----> 5 h_gaus.DrawCopy()
      6 c.cd(2)
      7 h_exp.DrawCopy()

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in _call_action_result(self, *args, **kwargs)
    196         result of the current action node.
    197         """
--> 198         return getattr(self.GetValue(), self._cur_attr)(*args, **kwargs)
    199 
    200     def create_variations(self) -> VariationsProxy:

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in GetValue(self)
    188         returning the value.
    189         """
--> 190         execute_graph(self.proxied_node)
    191         return self.proxied_node.value
    192 

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py in execute_graph(node)
     55             # All the information needed to reconstruct the computation graph on
     56             # the workers is contained in the head node
---> 57             node.get_head().execute_graph()
     58 
     59 

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py in execute_graph(self)
    205         # Execute graph distributedly and return the aggregated results from all
    206         # tasks
--> 207         returned_values = self.backend.ProcessAndMerge(self._build_ranges(), mapper, distrdf_reducer)
    208         # Perform any extra checks that may be needed according to the
    209         # type of the head node

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Spark/Backend.py in ProcessAndMerge(self, ranges, mapper, reducer)
    133 
    134         # Map-Reduce using Spark
--> 135         return parallel_collection.map(spark_mapper).treeReduce(reducer)
    136 
    137     def distribute_unique_paths(self, paths):

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeReduce(self, f, depth)
   1295                 return f(x[0], y[0]), False
   1296 
-> 1297         reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
   1298         if reduced[1]:
   1299             raise ValueError("Cannot reduce empty RDD.")

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in treeAggregate(self, zeroValue, seqOp, combOp, depth)
   1437             )
   1438 
-> 1439         return partiallyAggregated.reduce(combOp)
   1440 
   1441     @overload

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in reduce(self, f)
   1248             yield reduce(f, iterator, initial)
   1249 
-> 1250         vals = self.mapPartitions(func).collect()
   1251         if vals:
   1252             return reduce(f, vals)

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/rdd.py in collect(self)
   1195         with SCCallSiteSync(self.context):
   1196             assert self.ctx._jvm is not None
-> 1197             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1198         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
   1199 

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/python3.9/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 18) (ithdp3103.cern.ch executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 19, in <module>
    import ROOT
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/__init__.py", line 38, in <module>
    _register_pythonizations()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/__init__.py", line 351, in _register_pythonizations
    importlib.import_module(__name__ + '.' + module_name)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tmva/__init__.py", line 25, in <module>
    hasRDF = gSystem.GetFromPipe("root-config --has-dataframe") == "yes"
ValueError: TString TSystem::GetFromPipe(const char* command) =>
    ValueError: nullptr result where temporary expected

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/python/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 19, in <module>
    import ROOT
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/__init__.py", line 38, in <module>
    _register_pythonizations()
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/__init__.py", line 351, in _register_pythonizations
    importlib.import_module(__name__ + '.' + module_name)
  File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.9.12-9a1bc/x86_64-centos7-gcc11-opt/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "/cvmfs/sft.cern.ch/lcg/views/LCG_103swan/x86_64-centos7-gcc11-opt/lib/ROOT/_pythonization/_tmva/__init__.py", line 25, in <module>
    hasRDF = gSystem.GetFromPipe("root-config --has-dataframe") == "yes"
ValueError: TString TSystem::GetFromPipe(const char* command) =>
    ValueError: nullptr result where temporary expected

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Dear @lucacanali ,

Thanks for adding your report too. This issue is known and I’m working on it. I will keep you updated with news.

Cheers,
Vincenzo

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.