RDataFrame performance processing many small files

ROOT Version: 6.26/10
Platform: Ubuntu 18.04 on local machine with i5-8250U CPU
Compiler: prebuilt binary


Dear ROOT experts!

I want to ask a question on the RDataFrame performance when dealing with a lot of small files.
I’ve attached the files that mimic the logic of my actual analysis code. I’ve got processes that are split in multiple sample files containing 3 trees and a histogram with normalization information. For every process I want to

  1. Select events with slight variations for every tree
  2. Select branches and compute new variables with slight variations between trees
  3. Modify the event weight with a distinct normalisation coefficient for every sample

The (3) requirement prevents me from joining all of the samples of one process into a single TChain. Instead, I need to process every sample individually. This greatly reduces the performance of my code when I need to deal with the processes that consist of multiple small files, even when I’m using the lazy Snapshot() and RunGraphs() functionality.
To be noted: this code is to be run on the lxplus accessing the files on eos (via root:// as advised here), but the benchmark are done on a local machine since it seems to provide much stable results.

Here’s the code that mimics my overall analysis structure.
ConvertDatasets.py (8.7 KB)
ConvertTree.py (4.3 KB)
RdfHelpers.py (1.7 KB)

The handling of the processes is done with ConvertDatasets.py that uses ConvertTree.py as the code for generating Snapshot() handles. RdfHelpers.py contain some C++ functions to speed up some of the calculations.

I’ve tested this code on 3 types of processes (that I’m unfortunately not able to share publicly at the moment, but can do in the PMs):

Process 1
	Number of files:		9
	Input events:		5 917 598.0
	Passed events:		192 944.0
	Selection efficiency:	3.26%

Process 2
	Number of files:		84
	Input events:		15 011.0
	Passed events:		328.0
	Selection efficiency:	2.19%

Process 3
	Number of files:		6
	Input events:		2 167 265.0
	Passed events:		1 117 982.0
	Selection efficiency:	51.58%

Process 2 even has some of the trees empty.

Running ConvertDatasets.py for 1 thread and averaging over 3 runs yields following results:

======>Mean performance<==========
Process 1
	Number of files:	9
	Input events:		5 917 598.0
	Passed events:		192 944.0
	Selection efficiency:	3.26%

	Setting up handles:	1.7929 +- 1.9828 s
	RDF RunGraphs():	31.7754 +- 2.6518 s
	Get Sum() and Count():	0.0191 +- 0.0261 s
	Merge trees:		0.3354 +- 0.0166 s
		Total: 		33.9228 +- 4.6773 s
Process 2
	Number of files:	84
	Input events:		15 011.0
	Passed events:		328.0
	Selection efficiency:	2.19%

	Setting up handles:	3.0023 +- 0.3269 s
	RDF RunGraphs():	29.7169 +- 3.1373 s
	Get Sum() and Count():	0.0166 +- 0.0017 s
	Merge trees:		0.1429 +- 0.0026 s
		Total: 		32.8787 +- 3.4599 s
Process 3
	Number of files:	6
	Input events:		2 167 265.0
	Passed events:		1 117 982.0
	Selection efficiency:	51.58%

	Setting up handles:	0.4955 +- 0.0469 s
	RDF RunGraphs():	59.6083 +- 2.3774 s
	Get Sum() and Count():	0.0004 +- 0.0000 s
	Merge trees:		2.0597 +- 1.0171 s
		Total: 		62.1638 +- 1.3134 s
Total time for ConvertDatasets(): 		128.9653 +- 6.8237 s

There are few things to note

  1. The run time per process depends on the final file size, not on the initial one (this one was a surprise for me). Process 1 initially has more events over 3 trees than Process 3, but in the Process 3 more events pass the filters and thus higher processing time. But I guess it mirrors the solution to my previous question.
  2. Process 2 while significantly smaller than any other two takes comparable time to process.

When increasing the CPU count to 4 (my machine only has 4 physical cores), the overall time does go down, but the proportion between processes stays the same.
2 threads

======>Mean performance<==========
Process 1
	Number of files:	9
	Input events:		5 917 598.0
	Passed events:		192 944.0
	Selection efficiency:	3.26%

	Setting up handles:	1.8021 +- 1.9737 s
	RDF RunGraphs():	25.6703 +- 5.9387 s
	Get Sum() and Count():	0.0174 +- 0.0235 s
	Merge trees:		0.4043 +- 0.0392 s
		Total: 		27.8941 +- 7.9750 s
Process 2
	Number of files:	84
	Input events:		15 011.0
	Passed events:		328.0
	Selection efficiency:	2.19%

	Setting up handles:	3.0808 +- 0.2372 s
	RDF RunGraphs():	31.0479 +- 2.9642 s
	Get Sum() and Count():	0.0176 +- 0.0026 s
	Merge trees:		0.1823 +- 0.0414 s
		Total: 		34.3286 +- 3.2402 s
Process 3
	Number of files:	6
	Input events:		2 167 265.0
	Passed events:		1 117 982.0
	Selection efficiency:	51.58%

	Setting up handles:	0.8561 +- 0.0197 s
	RDF RunGraphs():	35.4611 +- 2.1408 s
	Get Sum() and Count():	0.0005 +- 0.0000 s
	Merge trees:		1.5322 +- 0.0245 s
		Total: 		37.8498 +- 2.1360 s
Total time for ConvertDatasets(): 		100.0725 +- 9.0792 s

4 threads

======>Mean performance<==========
Process 1
	Number of files:	9
	Input events:		5 917 598.0
	Passed events:		192 944.0
	Selection efficiency:	3.26%

	Setting up handles:	1.7606 +- 1.8464 s
	RDF RunGraphs():	20.1267 +- 3.8619 s
	Get Sum() and Count():	0.0189 +- 0.0252 s
	Merge trees:		0.4359 +- 0.0669 s
		Total: 		22.3421 +- 5.8004 s
Process 2
	Number of files:	84
	Input events:		15 011.0
	Passed events:		328.0
	Selection efficiency:	2.19%

	Setting up handles:	3.0255 +- 0.3126 s
	RDF RunGraphs():	29.6785 +- 2.1564 s
	Get Sum() and Count():	0.0302 +- 0.0151 s
	Merge trees:		0.1906 +- 0.0298 s
		Total: 		32.9248 +- 2.5140 s
Process 3
	Number of files:	6
	Input events:		2 167 265.0
	Passed events:		1 117 982.0
	Selection efficiency:	51.58%

	Setting up handles:	0.8742 +- 0.0785 s
	RDF RunGraphs():	26.1213 +- 0.9291 s
	Get Sum() and Count():	0.0004 +- 0.0001 s
	Merge trees:		2.1728 +- 0.9996 s
		Total: 		29.1687 +- 2.0071 s
Total time for ConvertDatasets(): 		84.4356 +- 6.3073 s

I have also tried to run all of the Snapshot() handles in a single RunGraph() using this code
ConvertDatasetsSingleLoop.py (7.1 KB)

1 thread
Separate RunGraphs() per process: 		128.9653 +- 6.8237 s
One RunGraphs() for all processes: 		110.3038 +- 5.2638 s

2 threads
Separate RunGraphs() per process: 		100.0725 +- 9.0792 s
One RunGraphs() for all processes:		 95.7382 +- 4.9013 s

4 threads
Separate RunGraphs() per process: 		84.4356 +- 6.3073 s
One RunGraphs() for all processes: 		67.3011 +- 5.9421 s

Though there is an improvement comparing to the previous version, the Process 2 still seems to take up way more time than expected. This might not seem much right now, but this code is to be run for every systematic variation resulting in hundreds of calls of ConvertDatastes() so it will quickly add up.

So my question is whether there is anything that I’ve missed that would help processing this kind of processes in my setup (i.e. unable to create one Snapshot() for all of the sample files in a process)? Or any further improvement will be incremental while requiring me to abandon simplicity of the python code for the speed and complexity of C++

Thanks in advance,
Aleksandr

I think @eguiraud can help.

Hi Aleksandr,

In practice it depends on both, with different weights: a fraction of your runtime is spent reading the data in, and another fraction (the larger fraction, given your measurements) is spent writing new data out after computing some derived quantities. More events in output → more computation and writes, more events in input → more reads.

  1. I haven’t looked at the code yet, but something that might give you a speed-up for free is to set the environment variable export EXTRA_CLING_ARGS='-O2' before running. It asks the ROOT interpreter to just-in-time-compile code at optimization level 2 instead of 1 (the default in v6.26.10). -O2 will be the default in future ROOT versions.

  2. when you say Input events: 15 011.0 for process 2, does that mean you have ~180 events per file divided in 84 files? That’s pretty bad, it means that ROOT has to spend some time opening the file and setting things up for processing it only to read 180 events (which takes very little to no time depending on how many branches you are reading) and then start again for the next file. ROOT I/O is optimized for much larger sizes. This is independent of RDF. Can you merge together files that will be processed the same way, possibly re-optimizing the “baskets” (e.g. with hadd -O)?

  3. RDF-wise, as you know we want to do as much as possible in a single computation graph. I would have to study your program logic to tell whether that’s possible. For that, and also to simply check where the program spends time, it would be nice if you could provide a single stripped down Python script that can act as a reproducer (that only runs process 2 on some input files that you can share with me, without any other logic).

Cheers,
Enrico

Hi @apetukho ,

thank you for the reproducer (shared privately). By activating RDF logs and adding a couple of timers in the Python program one can see that, roughly:

  • the total runtime for the stripped down reproducer is 20 seconds
  • 16 of those seconds are spent in the actual RunGraphs call
  • 14 of those seconds are spent in just-in-time-compilation of the expressions required by the 252(!) separate computation graphs that the program runs (84 files x 3 trees per file)
  • then actually running those 252 graphs actually takes ~1.5 seconds, a few milliseconds per tree (since every tree only contains ~180 events). That’s basically the time it takes to open the file and extract the data, the actual computation does not matter much

So the bottleneck is really the number of separate RDataFrame objects: each needs to just-in-time-compile a few things, but with 252 of them that time adds up.

The ideal solution here would be to merge some of these computation graphs together: if they only differ in some weights, you can use DefinePerSample for that.

Cheers,
Enrico

P.S.

note that jitting times (which currently dominate your total runtimes) will not scale linearly with the number of systematics, so this might not be a problem in practice.

Dear Enrico,

Thank you for the breakdown of the runtime. I’m trying to rewrite the code using the DefinePerSample, but I’ve run into some problems implementing it in python.

As the documentation and this helpful thread suggest, I need to pass the function used in DefinePerSample() some C++ objects that contain information on the pairs of "sample identifier: sample weight". I’m using std::vector<std::string> and std::vector<std::double> as in the thread. I’ve declared this function

ROOT.gInterpreter.Declare('''
float GetSampleWeight(unsigned int slot, const ROOT::RDF::RSampleInfo &id, std::vector<std::string> filePathVector, std::vector<double> weightVector) {
    for (unsigned int i = 0; i < filePathVector.size(); i++) {
        if (id.Contains(filePathVector[i])) {
            return weightVector[i];
        }
    }
    return -1.;
}
''')

to later use it in the RDataFrame as

df = df.DefinePerSample("sampleWeight", "GetSampleWeight(rdfslot_, rdfsampleinfo_, filePathVector, weightVector)")

but the question is how exactly do I get the filePathVector and weightVector to pass it to this function?
I’ve tried creating strings like this and feeding them to ROOT.gInterpreter.Declare()

ROOT.gInterpreter.Declare('std::vector<std::string> filePathVector {"file1.root", "file2.root", };')
ROOT.gInterpreter.Declare('std::vector<double> weightVector {1, 2, };')

But when I run my program over processes (which all result in a new filePathVector and weightVector) in a loop I get a C++ error about std::vector<std::string> filePathVector and std::vector<double> redefinition.

The reproducer and the input files files can be found here.

So my question is how to properly use the DefinePerSample when working in python?

Best regards,
Aleksandr

One way to do that is with a small helper struct that captures the two vectors. PyROOT automatically takes care of converting Python lists into std::vectors:

ROOT.gInterpreter.Declare('''
struct SampleWeightEvaluator {
    std::vector<std::string> _paths;
    std::vector<double> _weights;

    SampleWeightEvaluator(const std::vector<std::string> &paths, const std::vector<double> &weights)
        : _paths(paths), _weights(weights) {}

    float operator()(unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
        for (unsigned int i = 0; i < _paths.size(); i++) {
            if (id.Contains(_paths[i])) {
                return _weights[i];
            }
        }
        return -1.;
    }
};
''')

def main():
    sampleWeightDictList = [
            {'input/Light_a_1.root': 1., 
            'input/Light_a_2.root': 2., 
            'input/Light_a_3.root': 3.},
            {'input/Light_d_1.root': 4., 
            'input/Light_d_2.root': 5., 
            'input/Light_d_3.root': 6.}
    ]

    for idx, sampleWeightDict in enumerate(sampleWeightDictList):
        inputTreeName = 'output_tree_sys_default'
        inputFiles = sampleWeightDict.keys()
        weights = sampleWeightDict.values()
        df = ROOT.RDataFrame(inputTreeName, inputFiles)     
        sampleWeightEvaluator = ROOT.SampleWeightEvaluator(inputFiles, weights)
        df = df.DefinePerSample("sampleWeight", sampleWeightEvaluator)

P.S.
I love these well-scoped questions with small reproducers that I can play with :smiley:

That’s perfect, thank you very much!

I’ve got one final question. In my code I want to also apply weight corrections that depend on kinematic variables to one of the samples in one of the process. Since this correction will also be applied to the experimental systematic uncertainty variations (i.e. different versions of the same samples) I want to do it in the RDataFrame and not in some external script.

Using the same files as in this post lets say the correction function is

ROOT.gInterpreter.Declare('''
double WeightCorrection(Float_t m_jj)
{
    if (m_jj <= 2)
        return 0.8;
    if (m_jj > 2 && m_jj <= 8)
        return 0.2;
    if (m_jj > 8 && m_jj <= 17)
        return 0.15;
    return 0.1;
}
''')

And I want to only apply it to the Light_a_2 sample.

When I was creating individual graphs for each file I could do something like

ewkCorr = 1
 if 'Light_a_2' in inputFilePath:
	processedDataFrame = processedDataFrame.Define('corr', 'WeightCorrection(met_tst)')
	ewkCorr = 'ewkCorr'
processedDataFrame = processedDataFrame.Define('weightModified', 'weight * {} '.format(corr))

But with one graph for all of the files I can’t do that. I figured, that I could define a function like

ROOT.gInterpreter.Declare(
"""
bool EstimateNeedsCorrection (unsigned int slot, const ROOT::RDF::RSampleInfo &id){
   return id.Contains("Light_a_2");
}
""")

and then use it in the code

df = df.DefinePerSample("needsCorrection", "EstimateNeedsCorrection(rdfslot_, rdfsampleinfo_)")
df = df.Define("weightCorrection", 'Double_t x; if (needsCorrection) {return WeightCorrection(m_jj);} else {return 1.;}')
df = df.Define("weightModified", 'weight * weightCorrection')

Is this the right way to do it? What I’m afraid of is that only 3 samples out of ~200 need this corrections and I would waste time and space creating useless columns filled with 1 for all other files.

P.S. Thanks! :slight_smile:

Hi,

you can either do that or just for those samples create a separate RDataFrame object.
You are right that the way with DefinePerSample is a bit more wasteful but if the result is that you can run 1 RDF computation graph instead of 252 it should be worth it.

Although I just realized I’m not sure how you are still keeping the outputs of the Snapshots separate when using a single RDF?

As an aside, as I mentioned above, I’m not sure that the original issue is really an issue, because jitting times, which dominate the total runtime for this case with 252 very small computation graphs, will not scale linearly with the dataset size or the number of samples, so it could be that it never becomes a real issue. I can’t tell upfront, would need to try it out.

Cheers,
Enrico

Hi,

Let me be more explicit about structure of the data I’m processing. The nominal files look like this

Process 1
	|__ Sample 1
	|__ Sample 2
	...
	|__ Sample 9
Process 2
	|__ Sample 1
	|__ Sample 2
	...
	|__ Sample 84
...
Process 14
	|__ Sample 1
	|__ Sample 2
	...
	|__ Sample 6 

and there are two types of systematic uncertainties

  1. Encoded as a weights in the nominal files
  2. Encoded in a different version of the nominal files

Thank you for pointing that out. If I only need this column-depending scaling only for the Sample 6 of Process 1, I can pass the name of the process to the function that creates the RDF graphs (ConvertTree in my case) and only do the operations described in my previous post for Process 1.

I don’t need the separate files for the Samples, only for Processes and I’ve been merging them in my previous code. The approach of creating the RDF for a Process with a list of Sample files and DefinePerSample() does exactly what I need.

As I understand, you are talking about the systematic encoded with weights (1) in the same file? I don’t think it should be a problem. What I’m worried is the (2) case, with the systematic uncertainties in the different versions of the nominal dataset. It results in 109 copies of the nominal dataset which means the 252 small computation graphs will need to be created 109 times which is a problem.

Anyway, I’m yet to test it on the full scale, but I do see significant total speed-up of ~3-4 times for the files I’ve used for my first post (even the Process 3 with only 6 big files is faster, though not as much). So thank you for your help :slight_smile:

I’m saying jitting times will not increase by a factor 109 in that case, so total runtimes might still be OK.

But of course if you can avoid creating many mini RDFs that’s even better :smile:

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.