Home | News | Documentation | Download

Trivial questions about RDataFrame

Hi,
I have a couple of questions about the RDataFrame concept and how it works. I went through the docs and the examples but I still have a few doubts.

  1. RDataFrame seems to be (mostly) a high level interface for analyses of data stored in TTree/CSV. If I were to deal with events in memory, it seems that Fill is my only way to do this (as shown in df005_fillAnyObject.C). Am I missing something?
  2. Related to 1), is there a way to define an empty RDataFrame with the number of entries that will depend on the input source (could be 1k, could be 1B, entries) ? i.e. I see in most examples RDataFrame(n), with n number of entries, declared before any operation.
  3. Assuming the case of having a multi-threaded application that feeds a data sink on a first come - first served data pattern, am I allowed to Snapshot to the same RDataFrame? i.e. in the RSnapshotOptions I see fMode and fOverwriteIfExists with a description of the latter as follows “If fMode is “UPDATE”, overwrite object in output file if it already exists.”. Does it mean that you can’t “append” to the RDataFRame?

Thanks for the help
Best wishes
giordano


Please read tips for efficient and successful posting and posting code

ROOT Version: Not Provided
Platform: Not Provided
Compiler: Not Provided


I am not sure what you mean exactly with “events in memory”. You can just create an empty RDataFrame with RDataFrame(nEntries) and then do anything you want in its event loop. You can “capture” variables from outside the RDF object via C++11 lambdas.

What’s the input source? If it has a corresponding RDataSource implementation, RDataFrame will ask the number of entries to the data source.

RDataFrame is not the data format – data is written in ROOT TTrees. You can append data to TTrees, although the simplest way would probably be to simply write another TTree (to different files if the two writes happen concurrently, otherwise also to the same file) and later read back all trees as a “chain”.

You can’t write bytes to the same file concurrently from multiple threads without some synchronization mechanism – RDataFrame::Snapshot uses TBufferMerger to synchronize multi-thread writes, see below.


Trying to guess what you might really want to know based on your questions: if your application has multiple threads (not managed by RDataFrame) that should produce a ROOT file as output, one option you have is TBufferMerger, which lets you write to the same TTree from multiple threads concurrently.

Another option is to create one RDataFrame per thread, each performing a Snapshot to a separate ROOT file (different RDataFrame objects cannot “coordinate” their concurrent writes to the same ROOT file – you’ll end up with garbled bytes).

A third option is to write a custom RDataSource that feeds data coming from all your threads to RDataFrame, FIFO-style (not trivial, but the most flexible, probably).

Feel free to ask more questions :grinning_face_with_smiling_eyes:

Dear Enrico,

I should have been more clear on this point. I was trying to be general and I ended up being confusing.
My problem is two folded: 1) I don’t have a ROOT file as input 2) I don’t know a priori how many entries are in my input file. Here my confusion. Is RDataFrame the right tool to do the conversion to ROOT format? This is a new problem for me as I moved to a multi-threaded application and standard TFile,TTree are my main bottleneck.

You correctly gave me several option that I need to think about. But to make things the dumbest possible for me. Let’s take option number 2, one snapshot per thread, the only question left I have is: how do I dynamically process events without knowing a priori the number of entries I have to default my RDataFrame?

Thank you so much for your help.
giordano

What’s your input format?

EDIT: …and do I understand correctly that you want to access the same input file/input data from multiple threads?

It’s a binary format.
To give you the full picture on my system: I have an online data flow which is handled by a load-balancing pattern over N threads and pushed to a data sink for histogram viewing on a first come first served base. I would like to add to the histogramming part some code that fills ROOT trees. The old ROOT approach bottlenecks badly, so after finding out about RDataFrame I tried to understand if this is an option for what I want to do.

Ok, it sounds like you are looking for TBufferMerger: you can create the output TFile using TBufferMerger, and use it/write to it via per-thread pointers that take care of the synchronization.

Here is a simple example usage: ROOT: tutorials/multicore/mt103_fillNtupleFromMultipleThreads.C File Reference

Awesome! Thank you Enrico! I’m sure if I have more questions I’ll get back to you.

1 Like

Dear Enrico,
I do have a question on how TBufferMerger and TBufferMergerFiles work together with respect to concurrency. I did write a small example:

#include <ROOT/TBufferMerger.hxx>
#include <ROOT/RDataFrame.hxx>
#include <ROOT/TSeq.hxx>
#include <TNtuple.h>
#include <TRandom.h>
#include <iostream>
#include <pthread.h>
struct arg_struct {
  long thread_id;
  std::shared_ptr<ROOT::Experimental::TBufferMergerFile> thread_rbuf;
};
static void *worker_test(void *args) {
struct arg_struct* a = (struct arg_struct*)(args);
  long tid = (long)a->thread_id;
  std::shared_ptr<ROOT::Experimental::TBufferMergerFile> f = a->thread_rbuf;
  TNtuple ntrand("ntrand", "Random Numbers", "r");
  TRandom rnd(tid);
  for (auto i : ROOT::TSeqI(100))
    ntrand.Fill(rnd.Gaus());
  f->Write();
  pthread_exit(NULL);
}
int main () {
  const int NUM_THREADS  = 5;
  pthread_t* workers = new pthread_t[NUM_THREADS];
  int rc;
  struct arg_struct* args;
  ROOT::EnableThreadSafety();
  std::string fileName = "test.root";
  ROOT::Experimental::TBufferMerger merger(fileName.c_str());
  for(int i = 0; i < NUM_THREADS; ++i) {
    args = new struct arg_struct;
    args->thread_id = i;
    args->thread_rbuf = merger.GetFile();
    rc = pthread_create(workers+i, NULL, worker_test, (void *)args);
    if (rc) {
      std::cout << "Error:unable to create thread," << rc << std::endl;
      exit(-1);
    }
  }
  for (int worker_nbr = 0; worker_nbr < NUM_THREADS; ++worker_nbr) {
    pthread_join(workers[worker_nbr], NULL);
  }
  pthread_exit(NULL);
}

If I run this program, it segfaults with the following error:

Fatal in : TBufferMergerFiles must be destroyed before the server
aborting. I looked for this message in the src code and I found it the TBufferMerger destructor.

So here my doubts:

  1. Am I running in an example where I am bypassing the mutex system declared in ROOT::EnableThreadSafety()?
  2. Is there any visible control for the user where I can turn on/off the mutex?

FYI I can run fine the fillNtupleFromMultipleThreads tutorial example but when I use the same method (TBufferMerger called inside each worker_task) in my program the ntuple is simply not created and it’s not clear to me the reason, so I took a detour to understand better how the pieces work together.

Thanks in advance
Best wishes
giordano

Hi @gcerizza ,

I don’t think so

I don’t understand the question, but:

The error message indicates exactly what the problem is: the TBufferMerger destructor is called before all TBufferMergerFiles are destructed, and that’s not supported. In fact the snippet you shared has a bug: it leaks the arg_struct that are created with new.

std::thread offers a type-safe alternative to direct pthreads usage that might make it simpler to manage lifetimes.

Cheers,
Enrico

Dear Enrico,

Is there any visible control for the user where I can turn on/off the mutex?

Was related to ROOT::EnableThreadSafety() global mutex system.

You have a good point on the leak. I missed that while writing the example. Then, for the same example, should I explicitly destroy TBufferMergerFiles before exiting the worker task to make sure I don’t run in ahead of the TBufferMerger destruction?

Thanks again
giordano

You can either destroy them at the end of the worker task or after you join the threads.

The internal ROOT mutexes needed to make certain operations thread-safe are created when ROOT::EnableThreadSafety is called the first time and cannot be destroyed (i.e. there is no DisableThreadSafety).

You are right. It’s also written in the docs.

Dear Enrico,
I’m very sorry but there is still something is not clear to me.
Look at this even simpler code

static const int NUM_THREADS = 5;

struct arg_struct {
  int  thread_id;
  std::shared_ptr<ROOT::Experimental::TBufferMergerFile> thread_rbuf;
};

void *worker_task(void *threadarg) {
  struct arg_struct *my_data;
  my_data = (struct arg_struct *) threadarg;

  std::cout << "Thread ID : " << my_data->thread_id << std::endl;
  auto f = new std::shared_ptr<ROOT::Experimental::TBufferMergerFile>(my_data->thread_rbuf);
  std::cout << f << std::endl;
  delete f;
  std::cout << "deleting shared pointer for thread " << my_data->thread_id << std::endl;
  pthread_exit(NULL);
}
int main () {
   pthread_t threads[NUM_THREADS];
   struct arg_struct td[NUM_THREADS];
   int rc;

   ROOT::EnableThreadSafety();
   ROOT::Experimental::TBufferMerger merger("test.root");

   for(int i = 0; i < NUM_THREADS; i++ ) {
     td[i].thread_id = i;
     td[i].thread_rbuf = merger.GetFile();
     rc = pthread_create(&threads[i], NULL, worker_task, (void *)&td[i]);
   }

   for (int i = 0; i < NUM_THREADS; ++i) {
     pthread_join(threads[i], NULL);
   }

   pthread_exit(NULL);
}

Either I’m missing something so obvious that I don’t see it or I don’t understand how I’m breaking the concurrency. See the output

Thread ID : Thread ID : 0
1
0x7f74a4000f30
deleting shared pointer for thread 0
Thread ID : 2
0x7f7494000b20
deleting shared pointer for thread 2
0x7f749c000b20
deleting shared pointer for thread 1
Thread ID : 3
0x7f748c000b20
deleting shared pointer for thread 3
Joining thread 0
Thread ID : 4
0x7f748c000b20
deleting shared pointer for thread Joining thread 41
Joining thread 2

Joining thread 3
Joining thread 4
Fatal in : TBufferMergerFiles must be destroyed before the server
aborting
#0 0x00007f74b9a6946a in __GI___waitpid (pid=42786, stat_loc=stat_loc
entry=0x7ffe789090c8, options=options
entry=0) at …/sysdeps/unix/sysv/linux/waitpid.c:30
#1 0x00007f74b99e75df in do_system (line=) at …/sysdeps/posix/system.c:149
#2 0x00007f74bb7b31a3 in TUnixSystem::StackTrace() () from /usr/opt/root/root-6.22.06/lib/libCore.so
#3 0x00007f74bb691be1 in DefaultErrorHandler(int, bool, char const*, char const*) () from /usr/opt/root/root-6.22.06/lib/libCore.so
#4 0x00007f74bb6916ca in ErrorHandler () from /usr/opt/root/root-6.22.06/lib/libCore.so
#5 0x00007f74bb691b02 in Fatal(char const*, char const*, …) () from /usr/opt/root/root-6.22.06/lib/libCore.so
#6 0x00007f74bb211eae in ROOT::Experimental::TBufferMerger::~TBufferMerger() () from /usr/opt/root/root-6.22.06/lib/libRIO.so
#7 0x00005610bcb0c1a8 in main ()
Aborted

What am I missing? Thanks for any hint on this.
Best wishes
giordano

Hi,
you are deleting f, a copy of my_data->thread_rbuf, before the end of the task. That’s ok (although note that the new is redundant, you can just contruct the shared_ptr by value).

However, arg_struct td[NUM_THREADS];, declared at the beginning of main, contains another copy of those TBufferMergerFiles and is destroyed after merger (because it’s declared before).

Changing the order of declarations so that merger is constructed before arg_struct td[NUM_THREADS]; should fix the issue.


Passing the TBufferMerger into the task and calling GetFile in there might be a simpler approach:

void task(TBufferMerger &merger) {
   auto f = merger.GetFile();
}

int main() {
  TBufferMerger merger(...);
  std::vector<std::thread> threads;
  for (int i = 0; i < nThreads; ++i)
    threads.emplace_back(std::thread(task, merger));
  for (auto &t : threads)
    t.join();

  return 0;
}

I haven’t tested the snippet but it should give you an idea.

Cheers,
Enrico

Oh my god I would have never noticed the ordering cries in C++
Thanks and sorry for wasting your time

Best wishes
giordano

1 Like

Just call GetFile() in the task and all that goes away :smiley: