Multithread RDataFrame.Snapshot with big files and lots of columns


ROOT Version: 6.32.11
Python Version: 3.9.14
Platform: AlmaLinux 9 (el9_amd64_gcc12)
Compiler: g++ (GCC) 12.3.1 20230527


Hi, all!

I’ve happened to stumble upon a strange error while saving large RDataFrames to disk with a Snapshot. I’ve been doing this for a while with data with no problem, but now that I’m working with MC I’ve started having the same error when using MultiThreading. I usually get a big crash with the same source always:

...

*** Break *** segmentation violation
Fatal in <TFileMerger::RecursiveRemove>: Output file of the TFile Merger (targeting /my/path/to/the/output.root) has been deleted (likely due to a TTree larger than 100Gb)
aborting

with some (+3000) lines before with the dump of the crash that are mostly just memory addresses (full crash report here).

I’ve tried enabling MT and setting the number of threads to 1 and this runs with no problems, but when I start running with 2, I get this crash again.

I suspect this may be coming from multiple threads accessing unsafely the file, but the intricacies of ROOT elude me.

A temporary solution is to just use 1 thread (or not using multithreading at all), but I loose the parallelization for the intermediate steps, which is why I started using RDataFrames… Does anybody know how could I solve this?

Thanks a lot in advance!

Kind regards,
Martín.

Hello Martín,
that’s a tricky one to debug just reading the description. There is one big difference between single-threaded and multi-threaded snapshot which is that trees have to be written to a temporary memory location for every thread, and be merged under a lock. That’s an explanation why it works in single-threaded mode.

Some questions to understand a bit better:

  • Is there some truth to the 100 Gb limit that’s mentioned in the error message?
  • Does it work with ROOT 6.34?

But these might not point us to the problem. To really understand it, we would need code that can be run.

Hi Stephan,

  • As for the 100 Gb limit, I don’t think that’s the problem. The data production I mentioned returns files that are ~600GB, so I’m not sure this message is really related.
  • I’ve tried with a newer ROOT version (6.36.02) / g++ (15.1.0) and I see the same error. Was ROOT 6.34 required or is this ok?

I’m trying now to writr a mwe so that this can be investigated further.

Thank you for your very prompt response and attention (:

That’s OK. It shows that it’s something that hasn’t been fixed yet. Best of luck with the MWE!

Hi again!

I’ve been trying to reproduce the exact error, but I don’t seem to be able to achieve it using a way simpler version of my code and my data. However, I believe I’ve found the source, and it’s the TFileMerger handling big files.

I’ve made a script for making pseudodata in Python:

make_data.py (1.2 KB)

(tqdm is optional, just an option for a nice progress bar)

It outputs a file of a given number of rows, with 2000 columns and a vector of double with 20 elements in each entry. I was trying to merge the output files (because it produces a new .root file before reaching to 100 Gb, which I believe is the ROOT default max size for a file) and stumbled upon the original crash. I was using the option -j8 to parallelize, so I guessed it could be that.

crash_hadd_j8.txt (12.8 KB)

However, I got the same crash when using just hadd -ff -fk target source1 source2 source3:

crash_hadd.txt (12.7 KB)

I also made a Python script to try to reproduce the behaviour of my code which is way more complicated than that, because I enveloped the default ROOT classes (not through inheritance) to handle them differently. But the core functionality is the same: define, filter and store:

make_snapshot.py (1.9 KB)

I tried running this code through the problematic dataset from my original crash with no luck, so I’m not sure it reproduces properly the full behaviour of my code, but maybe it is not necessary at all, since the hadd is able to reproduce the crash.

I hope this helps you in finding the source of the bug!

Thank you and best of luck (:

Hello again!

I believe your test with hadd ran into this issue:

Unfortunately, RDataFrame does not react to this setting, so I believe that the snapshot crash you encountered is different from the hadd 100G limit that your test revealed.

Is there a way of using ROOT.RDataFrame(<largeNumber>), i.e. an RDF that’s just counting up? You could attach a dummy computation graph and we try to snapshot it, no input data required.

Hi!

Oh, I didn’t know that setting affected hadd and not the RDataFrame, it was one of the recommendations that I had found previously but it hadn’t fixed the issue. At least now I know why (:

Also, I didn’t know I could create an RDF that way, seems definitely easier hehe (:

I’ve been working for a couple of days in this and still haven’t worked out yet a way to reproduce the same issue with the pseudodata for the MWE, although the graphs look pretty much the same. (Please find them attached in this CERNbox folder).

While trying to run my code again, now I get a different crash than originally reported. The stack is QUITE long (+3000 lines), but it reports:

 *** Break *** segmentation violation
Fatal in <TBufferMerger>:  TBufferMergerFiles must be destroyed before the server
aborting

which I hadn’t seen before.

I’m trying to see if the problem is from any of the gen_* branches from my NTuple, this is the only thing I can see that is different from what I’m doing in the MWE. Sorry I can’t be of any more help ):

PS: This seems to have solved the issue, I’ll try to check the types of the branches to see what may be causing the eror.

Minimal Working Example

Here is a description of the current state of the MWE.

Pseudodata generation

The following code is used to generate a pseudo-data file:

rdf = ROOT.RDataFrame(int(args.nentries))
ROOT.RDF.Experimental.AddProgressBar(rdf)

for i in range(nvectors):
    rdf = rdf.Define(f"vec{i}", f"RandomVec({vector_size})")\
             .Define(f"vec{i}_to_mask", f"RandomVec({vector_size})") \
             .Define(f"vec{i}_to_select", f"RandomVec({vector_size})")

rdf.Snapshot('DDTree', str(temp_file))

where args.nentries is the number of entries in the output, nvectors is the number of vector columns that will be created, and vector_size is the number of entries per vector.

The macro RandomVec is defined as follows:

#include <ROOT/RVec.hxx>
#include <TRandom3.h>
#include <numeric>

ROOT::RVecD RandomVec(int length, double min_val = -999.0, double max_val = 999.0) {
    static thread_local TRandom3 rng;
    ROOT::RVecD vec(length);
    for (int i = 0; i < length; i++) {
        vec[i] = rng.Uniform(min_val, max_val);
    }
    return vec;
}

RDataFrame processing

A series of operations, similar in nature to those applied in my original code, are applied to this dataframe:

rdf = ROOT.RDataFrame('DDTree', str(temp_file))
ROOT.RDF.Experimental.AddProgressBar(rdf)

for i in range(nvectors):
    rdf = rdf.Define(f"vec{i}_sum",  f"ROOT::VecOps::Sum(vec{i})") \
             .Define(f"vec{i}_mean", f"ROOT::VecOps::Mean(vec{i})")


for i in range(nvectors):
    rdf = rdf.Define(f"sel{i}",         f"MaskedVec(vec{i}_to_select, vec{i}_to_mask > 0)") \
             .Define(f"sel{i}_sum",     f"ROOT::VecOps::Sum(sel{i})") \
             .Define(f"sel{i}_max_idx", f"ROOT::VecOps::ArgMax(sel{i})") \
             .Define(f"sel{i}_min_idx", f"ROOT::VecOps::ArgMin(sel{i})")

    rdf = rdf.Define(f"pair{i}", "return ROOT::RVecD({"+"vec{i}[sel{i}_max_idx], vec{i}[sel{i}_min_idx]".format(i=i)+"})") \
             .Define(f"pair{i}_sum", f"ROOT::VecOps::Sum(pair{i})")

    
rdf = rdf.Filter("(" + " + ".join([f"pair{i}_sum" for i in range(nvectors)]) +") > 0")


ROOT.RDF.SaveGraph(rdf, "./mydot.dot")


snap_config = ROOT.RDF.RSnapshotOptions()
snap_config.fVector2RVec = False  # Disable conversion of vectors to RVecs
snap_config.fAutoFlush = 1000


rdf.Snapshot('DDTree', str(file_path), rdf.GetColumnNames(), snap_config)

where MaskedVec is defined in the following macro:

template <typename T>
ROOT::RVec<T> MaskedVec(const ROOT::RVec<T>& vec, const ROOT::RVec<bool>& mask) {
    ROOT::RVec<T> result;
    for (size_t i = 0; i < vec.size(); ++i) {
        if (mask[i]) {
            result.push_back(vec[i]);
        }
    }
    return result;
}

the correspondent #include statements have been omitted.

This code, however, doesn’t seem to reproduce the crash.

Logs

Some crash logs are available in this CERNbox folder. A list with a short description follows:

Crash running on my MC files with my code @ HTCondor [log]

This is the original crash as reported in the ROOT Forum. It uses the same data as described in the next section.

Crash running on my MC files with my code and 2 threads [log]

This source files is 35GB in disk, and has 409 columns of varying sizes, some being int, floats or doubles or vectors (or RVec) of these types. The sample is a post-processing of Drell-Yan to 2 Muons at MiniAOD level.

This crash is not the same as the one reported, but may be related. The original crash happens when running in HTCondor, this was run in a local machine.

Update: Found it!

I can confirm that the problem comes two RVec<int> columns in my file that are computed using the following macros:

std::map<int, RVecI> findGenMatches(
    RVecF gen_eta,
    RVecF gen_phi,
    RVecF mu_eta,
    RVecF mu_phi,
    float min_dR,
    RVecB selection = {}
) {
    std::map<int, RVecI> map_gen2reco;
    
    if (selection.size() == 0) {
        selection = RVecB(mu_phi.size(), 1);
    }
 
    // For each GEN particle, count the number of possibly matched muons
    for (uint i=0; i < gen_eta.size(); i++) {
        for (uint j=0; j < gen_eta.size(); j++) {
            if (selection[j] == 0) continue;

            // If the GEN muon is within RECO range, add it as a possible match.
            float dR = deltaR(mu_eta[i], gen_eta[j], mu_phi[i], gen_phi[j]);
            
            if (dR < min_dR) {map_gen2reco[i].push_back(j);}
        }
        // Set the number of matched muons for this gen particle
    }

    return map_gen2reco;
}

RVecI countGenBoth(
    RVecF gen_eta,
    RVecF gen_phi,
    RVecF mu_eta,
    RVecF mu_phi,
    float min_dR,
    RVecB selection = {}
) {
    std::map<int, RVecI> map_gen2reco = findGenMatches(gen_eta, gen_phi, mu_eta, mu_phi, min_dR, selection);
    RVecI nMuons (gen_eta.size(), 0);

    for (uint i=0; i < 2; i++) {
        // See if PAT muons matched to GEN SMu are also matched to GEN Mu
        auto smu_idx = map_gen2reco[i];
        auto mu_idx  = map_gen2reco[i+2];

        for (uint j=0; j < smu_idx.size(); j++) {
            for (uint k=0; k < mu_idx.size(); k++) {
                if (smu_idx[j] == mu_idx[k]) {
                    nMuons[i]++;
                    nMuons[i+2]++;
                }
            }
        }

    }

    return nMuons;
}

RVecI countGenUnique(
    RVecF gen_eta,
    RVecF gen_phi,
    RVecF mu_eta,
    RVecF mu_phi,
    float min_dR,
    RVecB selection = {}
) {
    std::map<int, RVecI> map_gen2reco = findGenMatches(gen_eta, gen_phi, mu_eta, mu_phi, min_dR, selection);
    RVecI nMuons (gen_eta.size(), 0);

    for (uint i=0; i < 2; i++) {
        // See if PAT muons matched to GEN SMu are not matched to GEN Mu
        auto smu_idx = map_gen2reco[i];
        auto mu_idx  = map_gen2reco[i+2];

        for (uint j=0; j < smu_idx.size(); j++) {
            bool isUnique = true;
            for (uint k=0; k < mu_idx.size(); k++) {
                if (smu_idx[j] == mu_idx[k]) {isUnique = false;}
            }
            if (isUnique) {nMuons[i]++;}
        }
    }

    return nMuons;
}

Then, it is applied as:

  gen_nBothMatched -> countGenBoth(gen_eta, gen_phi, patmu_eta, patmu_phi,  0.3)
  gen_nUniqueMatched -> countGenUnique(gen_eta, gen_phi, patmu_eta, patmu_phi,  0.3)

These two branches can be used with no problems interactively, but upon saving a snapshot, it’s when the crash occurs. When removing them from the list of columns to save, there is no crash! (:

Maybe this helps or not:

Edit your find and count functions to return 0 or false/true at the very beginning of the function, as if it was an early break, and see if the crash disappears. Then move those returns one line below and rerun, until finding out exactly what is the problematic line of code in the filter leading to this.

Maybe also verify that all these vectors have he same length within the function?

mu_eta[i], gen_eta[j], mu_phi[i], gen_phi[j]

And also that

nMuons[i+2]++;

never goes out of bound with the i+2 and things like that.

You could also try running this with a debug-build of ROOT with gdb.

Hello @martialc,

nice that we are getting closer! Does the crash only happen with a large output file and when running in MT or can we reproduce it with a small RDF (maybe even a counting RDF)?