RDataFrame Foreach causing memory leak

Hello,

I’m applying different combinations of filters on a Dataframe and then I’m using the Foreach method to perform a custom operation on every filtered dataset separately.
Maybe this is not the best strategy possible, but it’s quite simple and fits perfectly my needs.

The problem is that the Foreach operation seems leaking memory: I’m performing this action hundreds of times on ~5000 rows with 5 columns and the memory consumption keeps growing up (~10 Mb every Forech)

I have attached a small script that reproduces the problem. You can find the root file for the test at
http://dvalsecc.web.cern.ch/dvalsecc/test.root

Thanks for your help!


_ROOT Version: 6.17/01
_Platform: /cvmfs/sft-nightlies.cern.ch/lcg/views/dev3python3/latest/x86_64-slc6-gcc7-opt
_Compiler: gcc 7.3.0


test.cpp (989 Bytes)

@dpiparo @eguiraud perhaps you can help here please?

Ciao Davide,

thanks for taking the time to share such a clear example!
Short answer. The solution is to replace your jitted filter by a compiled one, i.e. change this

df.Filter("(nPU=="+ to_string(pu) +")&&(signalTruth=="+to_string(s) +")")

to this

df.Filter([pu,s](int nPU, double signalTruth){return nPU == pu && signalTruth == s;}, {"nPU", "signalTruth"})

As a side effect, the performance of your program will also increase.

Longer answer :slight_smile: The behaviour you stumbled on is not a memory leak, but rather a memory hog. This is due to the fact that for each of the pairs, a new filter was just-in-time compiled. Every time cling, ROOT’s interpreter, compiles some code, some entites are put in memory, for example the compiled binaries, the nodes of the AST (clang representation of code).

We are aware of this and working towards a solution.

I also prototyped thre alternatives for you, minimally modifying your code. For me alternative 3) was by far the fastest (it runs in parallel…)

  1. Cache the full dataset in memory once, then run on it N times (as a macro):
using namespace ROOT; 
using namespace ROOT::RDF;
using namespace ROOT::VecOps;


void dostuff(int nPU, double E_pu, double signalTruth,double amplitudeTruth, RVec<double>digis){
     cout << nPU << " " << E_pu << " " << signalTruth <<endl;
}   

int test(){
    auto df = RDataFrame("weights", "test.root");

    vector<int> PUs = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90 ,100};
    vector<float> Ss = {10.,12.,14.,16.,18.,20.,30.,40.,50.,60.,70};

    // Cache in memory the dataset
    auto dfCached = df.Cache();

    for (int pu: PUs){
        for (float s: Ss){
            cout << "PU:"<< pu << " | S:" << s <<endl;

            dfCached.Filter([pu,s](int nPU, double signalTruth){return nPU == pu && signalTruth == s;}, {"nPU", "signalTruth"})
                    .Foreach(dostuff, {"nPU", "E_pu", "signalTruth", "amplitudeTruth", "digis"});
        }            
    }
    return 0;
        
}
  1. Reorder the loops (not sure it’s really possible in your case) and run on the data once:
using namespace ROOT; 
using namespace ROOT::RDF;
using namespace ROOT::VecOps;

vector<int> PUs = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90 ,100};
vector<float> Ss = {10.,12.,14.,16.,18.,20.,30.,40.,50.,60.,70};

void dostuff(int nPU, double E_pu, double signalTruth,double amplitudeTruth, RVec<double> &digis){
   cout << nPU << " " << E_pu << " " << signalTruth <<endl;
}

void dostuff2(int nPU, double E_pu, double signalTruth,double amplitudeTruth, RVec<double> &digis){
    for (int pu: PUs){
        for (float s: Ss){
           dostuff( nPU, E_pu, signalTruth, amplitudeTruth, digis); // The original function :)
        }
    }
}   

int test(){
    auto df = RDataFrame("weights", "test.root");

    // Cache in memory the dataset
    df.Foreach(dostuff2, {"nPU", "E_pu", "signalTruth", "amplitudeTruth", "digis"});
        
    return 0;
}
  1. Like 1), but runs in parallel. Note the new signature of dostuff. That implies that an integer is passed to it which represents the worker id running the function (it is more or less what in parallel CMSSW are called ‘Streams’)
using namespace ROOT; 
using namespace ROOT::RDF;
using namespace ROOT::VecOps;

// Note the first parameter!!!
void dostuff(unsigned int slot, int nPU, double E_pu, double signalTruth,double amplitudeTruth, RVec<double>digis){
  //   cout << nPU << " " << E_pu << " " << signalTruth <<endl;
}   

int test(){

    ROOT::EnableImplicitMT();

    auto df = RDataFrame("weights", "test.root");

    vector<int> PUs = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90 ,100};
    vector<float> Ss = {10.,12.,14.,16.,18.,20.,30.,40.,50.,60.,70};

    // Cache in memory the dataset
    auto dfCached = df.Cache();
    
    for (int pu: PUs){
        for (float s: Ss){
            cout << "PU:"<< pu << " | S:" << s <<endl;

            dfCached.Filter([pu,s](int nPU, double signalTruth){return nPU == pu && signalTruth == s;}, {"nPU", "signalTruth"})
                    .ForeachSlot(dostuff, {"nPU", "E_pu", "signalTruth", "amplitudeTruth", "digis"});
        }            
    }
    return 0;
}

It looks like a lot, but all three should be very easily testable.

Cheers,
D

2 Likes

Thanks you very much! This solves completely my problem.
The purpose of Cache is to avoid many readings from the disk?

Thanks also for your snippets. Unfortunately the complete program deals with multiple dataframes that have to be “synchronized”, so I cannot use multi-threading this time.

Cheer
Davide

Hi Davide,

the cache is indeed there for usecases like yours.
About the MT: too bad! At least the examples are there for future questions :slight_smile:

Cheers,
D

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.