Another use case for RDataFrame "groupby" like feature

Dear ROOT developers,
we have another use case for a groupby feature in RDF.
In practice we have ntuples where a single event enters multiple times (let say K times) and in order to fill some summary statistics (e.g. TH1F) we need first to compute the distribution of that variable on the K copies, and then use this information to do the actual TH1F fill (the idea is that the K copies should be taken as correlated, hence their weights cannot be summed in quadrature). The final fill operation can be done only after all copies are processed (due to Filters it could be than less than K are actually processed) and only at that point the temporary memory needed to compute the partial statistics can be freed.

In principle we can have the input file ordered so that the K folds are contiguous, hence the boundaries can be detected as a change in the event number, but this is immediately spoiled when using multithread.
The point is that if one doesn’t want to keep everything in memory it should be possible to detect when enough data is processed so that for sure some information can be flushed.

Is it guaranteed that while globally events are not processed in the same order as the input file(s), at least in each thread they are? Would it also be possible to add an rdfentry_ equivalent that corresponds to the global input file order?

Hi @arizzi ,

Yes, threads take a contiguous chunk of events and process them in order. Not only, but the chunks will always begin and end at TTree cluster boundaries, so in principle if the K correlated events never (or vanishingly often) cross TTree cluster boundaries you are good.

This is Have rdfentry_ represent the global entry number in the TChain even in MT runs · Issue #12190 · root-project/root · GitHub, it requires some work but it’s in our plans (the last comment in the conversation proposes a development roadmap).

I see how in general this would be solved well by a group-by operation. There was some more discussion of that at Groupby in RDataFrame . The problem, of course, is making it efficient for larger-than-memory datasets. I do not think ROOT has that in the plan of work at the moment.

Cheers,
Enrico

Hi Enrico,
thanks for your inputs.
Let say that I store in the input file that I pass to RDF a progressive number implementing the “global” rdfentry_, lets call it “progressiveEvent”
I assume that then if I get the “minimum” of progressiveEvent across all threads it is guaranteed that all events with progressiveEvent < minimum are already processed, is that right?

Cheers,
Andrea

I guess this is probably the case most of the time. That is, I can’t come up with a race condition that would make this false at this time.

However the most robust assumption is that entry clusters will be processed out of order, in any order; and for scaling it’s best not to require synchronization between threads.

Hi Enrico,
one problem we are facing with this approach is that we do not know the effective number of threads RDF decided to use, is there a way to access this info?
We can detect the number of available slots, but it seems some of them never see any data so we are waiting forever to know which event they are processing and we can never flush out the accumulated data.
One option of course is to limit the number of threads but it would be nicer to know what we expect.
How is RDF deciding the effective splitting?

RDataFrame uses df.GetNSlots() slots which will be juggled between ROOT::GetThreadPoolSize() threads.

However if there are very few TTree clusters it could very well be that some threads (and therefore some slots) never get to do anything. I can’t think of any workaround other than limiting the number of threads when the input is small :confused: