Select unique candidates based on their ID and variable

Dear Experts,

I have a RDataFrame which contains multiple candidates for each event. Multiple candidates are entries with the same event number ID (eventNumber). I would like to filter these multiple candidates based on the value of another variable in the tree. I was trying to follow @eguiraud 's example in A thread-safe stateful Filter for RDataFrame · GitHub extending his approach to compare the values of another variable and returning the element with highest value of such variable. To create an example: with the following I create a rdf with a set of entries with non-unique IDs (“category”) each entry has a unique variable value (“x”)

import ROOT
ROOT.gInterpreter.Declare(
"""
#include <vector>
#include <algorithm>
#include <map>
#include <random>

std::random_device rd;
std::mt19937 e{rd()}; // or std::default_random_engine e{rd()};
std::normal_distribution<float> gaus(10., 1.);

float generateGaussNumber(){
    return gaus(e);
}

""")

df = ROOT.RDataFrame(20).Define("category", "int(rdfentry_ % 10)").Define("x", "generateGaussNumber()")

print(df.AsNumpy())

Which gives the following RDF (category, x):

array([[ 0.        , 11.11326599],
       [ 1.        , 10.15282154],
       [ 2.        , 10.73499107],
       [ 3.        , 10.45162868],
       [ 4.        , 10.2524004 ],
       [ 5.        , 11.10525703],
       [ 6.        ,  9.92098904],
       [ 7.        , 11.83233261],
       [ 8.        , 11.15157604],
       [ 9.        ,  9.56202412],
       [ 0.        ,  8.48303699],
       [ 1.        ,  9.08411694],
       [ 2.        ,  9.12842464],
       [ 3.        , 10.6169529 ],
       [ 4.        , 10.09912395],
       [ 5.        , 12.2478199 ],
       [ 6.        ,  8.52565002],
       [ 7.        , 12.2470808 ],
       [ 8.        ,  9.79731941],
       [ 9.        , 11.51276779]])

By defining the ElementTracker:


ROOT.gInterpreter.Declare("""
// A thread-safe stateful filter that lets only one event pass for each value of
// "category" (where "category" is a random character).
// It is using gCoreMutex, which is a read-write lock, to have a bit less contention between threads.
#include <unordered_map>
class ElementTracker {
public:
    // Method to check if a given element (represented by id and value) has the highest value among elements with the same id
    bool operator()(int id, float value) {
        // Check if the current element has a higher value than the stored value
        R__READ_LOCKGUARD(ROOT::gCoreMutex); // many threads can take a read lock concurrently
        if (highestValues.find(id) == highestValues.end() || value > highestValues[id]) {
            R__READ_LOCKGUARD(ROOT::gCoreMutex); // many threads can take a read lock concurrently
            highestValues[id] = value;
            return true;
        }
        return value == highestValues[id];
    } 

private:
    // Map to store the highest value for each id
    std::unordered_map<int, float> highestValues;
};
""")

And by calling:


cols = ROOT.std.vector['string'](["category","x"])
df_with_unique_categories = df.Filter(ROOT.ElementTracker(), cols)

I would like to reduced the above RDF to the following one:

array([[ 0.        , 11.11326599],
       [ 1.        , 10.15282154],
       [ 2.        , 10.73499107],
       [ 4.        , 10.2524004 ],
       [ 6.        ,  9.92098904],
       [ 8.        , 11.15157604],
       [ 3.        , 10.6169529 ],
       [ 5.        , 12.2478199 ],
       [ 7.        , 12.2470808 ],
       [ 9.        , 11.51276779]])

i.e. I’d like to keep those uniques “categories” with highest “x”. Any idea of how I could enhance my function to do this? Or would you suggest a different method than Filter?

Thanks,
Davide

ROOT Version: 6.30/04


Hi Davide,

Thanks for interesting question and nice code example!
You have a dataset and some entries (events) are logically grouped via a certain category variable, the “evt number ID”. You would like for each value of this ID select of the events characterised by it only one based on a second variable, called x (please correct me if I am wrong).
Have you tried to enhance your filtering class to somehow store the pairs category-x during the first loop and to use them for filtering during subsequent loops? Or to extract the “analysis state”, the collection of category-x pairs, and use it to initialise a second simple filtering class that just checks if the entry being processed is in the list (this second one, since we’ll just be reading will not need locks)?

Cheers,
Danilo

Thanks Danilo for your prompt answer.

You have a dataset and some entries (events) are logically grouped via a certain category variable, the “evt number ID”. You would like for each value of this ID select of the events characterised by it only one based on a second variable, called x (please correct me if I am wrong).

Indeed that’s exactly the functionality I’m trying to implement.

Have you tried to enhance your filtering class to somehow store the pairs category-x during the first loop and to use them for filtering during subsequent loops? Or to extract the “analysis state”, the collection of category-x pairs, and use it to initialise a second simple filtering class that just checks if the entry being processed is in the list (this second one, since we’ll just be reading will not need locks)?

As I’m not very familiar with RDataFrames I wasn’t sure whether (and how to) perform two subsequent loops within one instance of Filter. Do you have a skeleton example or boiler plate I could have a look at?

Thanks in advance!
Davide

I tried with something easier and deterministic as opposed to the previous example which used a random number generator.
I modified the above function to the following:

ROOT.gInterpreter.Declare("""
// A thread-safe stateful filter that lets only one event pass for each value of
// "category" (where "category" is a random character).
// It is using gCoreMutex, which is a read-write lock, to have a bit less contention between threads.
#include <unordered_map>
class ElementTracker {
public:
    // Method to check if a given element (represented by id and value) has the highest value among elements with the same id
    bool operator()(int id, float value) {
    
      if (highestValues.contains(id)){
        if(value < highestValues[id]){
            return false;
        }
        else{
            highestValues[id] = value;
        }
      }
      else{
        highestValues.insert({id, value});
      }
    return true;

  }
private:
    // Map to store the highest value for each id
    std::unordered_map<int, float> highestValues;
};

""")

This yields the correct output only if the entries are already sorted from lowest to highest value of x. Here are two example dfs, df1 is sorted from lowest to highest and df2 is sorted from highest to lowest values of x

import ROOT
import pandas as pd

df1 = ROOT.RDataFrame(20).Define("category", "int(rdfentry_ % 10)").Define("x", "float(rdfentry_ / 3)")
df2 = ROOT.RDataFrame(20).Define("category", "int(rdfentry_ % 10)").Define("x", "float(1.-rdfentry_ / 3)")



cols = ROOT.std.vector['string'](["category","x"])
df1_with_unique_categories = df1.Filter(ROOT.ElementTracker(), cols)
df2_with_unique_categories = df2.Filter(ROOT.ElementTracker(), cols)

Returns:

reduced_rdf_pd1 = pd.DataFrame(df1_with_unique_categories.AsNumpy())
reduced_rdf_pd1.to_numpy()

array([[0., 0.],
       [1., 0.],
       [2., 0.],
       [3., 1.],
       [4., 1.],
       [5., 1.],
       [6., 2.],
       [7., 2.],
       [8., 2.],
       [9., 3.],
       [0., 3.],
       [1., 3.],
       [2., 4.],
       [3., 4.],
       [4., 4.],
       [5., 5.],
       [6., 5.],
       [7., 5.],
       [8., 6.],
       [9., 6.]])


And

reduced_rdf_pd2.to_numpy()
reduced_rdf_pd2 = pd.DataFrame(df2_with_unique_categories.AsNumpy())
array([[ 0.,  1.],
       [ 1.,  1.],
       [ 2.,  1.],
       [ 3.,  0.],
       [ 4.,  0.],
       [ 5.,  0.],
       [ 6., -1.],
       [ 7., -1.],
       [ 8., -1.],
       [ 9., -2.]])

I.e. the correct behaviour is only achieved in case of df2. Any idea on how to modify ElementTracker to work with a generic sorting of the elements. Also would a different approach, with Reduce for example, work?

For the moment I could only solve this with a 2 step approach in case of increasing values of x namely as:

Create the example dataset:

import ROOT

df1 = ROOT.RDataFrame(20).Define("event_category", "int(rdfentry_ % 10)").Define("event_x", "double(rdfentry_ / 3)")

fileName = "test.root"
treeName = "DecayTree"

df1.Snapshot(treeName,fileName)

Declare a function that fills the unordered_map where we’ll store the highest category,x pair

ROOT.gInterpreter.Declare('''
#include <unordered_map>

void fillMap(int id, double value, unordered_map<int, double> & map){

      if (map.contains(id)){
          cout<<"contains already"<<endl;
          cout<<"id "<<id<<" map[id] "<<map[id]<<" value "<<value<<endl;
        if(value > map[id]){
            map[id] = value;
        }
        else{
            cout<<"pass"<<endl;
        }   
      }
      else{
        
        map.insert({id, value});
        cout<<"new insertion"<<endl;
        cout<<"id "<<id<<" value "<<value<<" map[id] "<<map[id]<<endl;
      }        
    }
''')

Fill the highestValues map

event_code=f"""

    std::unordered_map<int, double> highestValues;
    TFile f("{fileName}");
    TTree* t = (TTree*)f.Get("{treeName}");
    int cat;
    double x;
    t->SetBranchStatus("*", 0);   
    t->SetBranchStatus("event_category", 1);
    t->SetBranchStatus("event_x", 1);
    
    t->SetBranchAddress("event_category", &cat);
    t->SetBranchAddress("event_x", &x);
    
    int nEntries(t->GetEntries());
    cout<<nEntries<<endl;
    for(int i(0); i<nEntries; ++i){{
        t->GetEntry(i);
        fillMap(cat, x, highestValues);
    }}
    f.Close()
"""

ROOT.gROOT.ProcessLine(event_code)

And finally feed this to Filter

ROOT.gInterpreter.Declare('''
class ElementTracker{
public:

    bool operator()(int id, double value) {
    
        cout<<"returning: "<<bool(highestValues[id]==value)<<endl;
        return value==highestValues[id];

  }

};

''')

cols = ROOT.std.vector['string'](["event_category","event_x"])
df1_with_unique_categories = df1.Filter(ROOT.ElementTracker(), cols)

df1_with_unique_categories.AsNumpy()

Yields the desired result:

{'event_category': ndarray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32),
 'event_x': ndarray([3., 3., 4., 4., 4., 5., 5., 5., 6., 6.])}

This is really not an elegant solution, any feedback on how to rewrite this with dedicated RDataFrame functions or in a more clean and concise way are very welcome. The other option would be to sort the RDataFrame entries by the values of x. This would make everything cleaner however I couldn’t find a quick solution to do this, any ideas?

Thanks,
Davide

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.