How to expand RDataframe functionalities in a custom compiled code

Hi experts
I would like to understand if and how i can implement some new calls for RDataFrame like the Define or Min, Max. In particular i would like to implement a Covariance( T column1, T column2) function returning for 2 columns the calculated Covariance element.
In practice, i think it might be a good idea to have the analogous of pandas :

df = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
                  columns=['dogs', 'cats'])
df.cov()

in the RDataFrame context, but possibly specifying a ColumnNames_t.
In principle i guess this could return directly a TMatrixSym object if possible if more than 2 columns are asked for…

Any suggestion from where to start looking at would be appreciated, as well as maybe a rooot macro where extensions of functionalities can be designed.
Thanks
Renato


Please read tips for efficient and successful posting and posting code

ROOT Version: Not Provided
Platform: Not Provided
Compiler: Not Provided


Hi,
looks like you are looking for the Book method, which lets you book an arbitrary action and is designed to also support multi-thread execution.

This tutorial shows how to use Book to implement an action that fills a THn, for instance.

Cheers,
Enrico

Hi @eguiraud, so if i get it right one should implement something like :

template <typename T, unsigned int NDIM>
class Covariance : public ROOT::Detail::RDF::RActionImpl<THnHelper<T, NDIM>> {
public:
   TMatrixDSym covMatrix; // my final cov Matrix or whatever has to contain the cov(x,y);
   /// This type is a requirement for every helper.
   using Result_t = TORETURNTYPE;
 
private:
   //some private memembers to cache the ````x^{2}, x*y ,...``` to calculate final cov matrix ...
 
public:
   //Methods to do the math ? 
};

I probably start simple with a 2x2 covMatrix and then expand to have a TMatrixD object using templated dimensionality to accept many arguments

Dear @dpiparo , i tried to follow the example.
I came up with this class to call at the end

   // We book the action: it will be treated during the event loop.
   auto covXY = dd.Book(CovarianceHelper(), {"x0", "x1"});


class CovarianceHelper : public ROOT::Detail::RDF::RActionImpl<CovarianceHelper>{ 
    public : 
        using Result_t = double;
    private : 
        std::vector<  std::shared_ptr< double > > _xyproductSUM;  //one per data processing slot
        std::vector<  std::shared_ptr< double > > _xStatsSUM; //one per data processing slot
        std::vector<  std::shared_ptr< double > > _yStatsSUM; //one per data processing slot
    public : 
        /// This constructor takes all the parameters necessary to build the covariance. 
        /// In addition, it requires the names of
        //  the columns which will be used.        
        CovarianceHelper( ){
            const auto nSlots = ROOT::IsImplicitMTEnabled() ?  ROOT::GetThreadPoolSize() : 1;
            for (auto i : ROOT::TSeqU(nSlots)){
                 _xyproductSUM.emplace_back(std::make_shared<double>);
                 _xStatsSUM.emplace_back(std::make_shared<double>);
                 _yStatsSUM.emplace_back(std::make_shared<double>);
                 (void)i;
            }
        }
        CovarianceHelper( CovarianceHelper &&)= default;
        CovarianceHelper( const CovarianceHelper &) = delete;
        std::shared_ptr<double> GetResultPtr() const { 
            return (1./99.) * ( _xyproductSUM[0] - 100 * _xStatsSUM[0] & _xStatsSUM[0])
        }
        void Initialize() {}
        void InitTask(TTreeReader *, unsigned int) {}
        void Exec(unsigned int slot,double X, double Y){
            _xyproductSUM[slot] += X*Y;
            _xStatsSUM[slot] += X;
            _yStatsSUM[slot] += Y;
        }
        void Finalize(){
            auto & xyproductSUM = _xyproductSUM[0];
            auto & xStats       = _xStatsSUM[0];
            auto & yStats       = _yStatsSUM[0];
            for( auto  slot : ROOT::TSeqU(1, _xyproductSUM.size())){
                _xyproductSUM += _xyproductSUM[slot].get();
                _xStats       += _xStatsSUM[slot].get();
                _yStats       += _yStatsSUM[slot].get();
            }
        }
        std::string GetActionName(){
            return "CovarianceHelper";
        }
}

I will need to further expand it to accept n-arguments as n columns .
I wonder if i get the logics right.
You basically have private members to collect each slot results and in the Finalize stage you fill-up the one associated to slot=0 and the function to make the return you do the proper math for the final return using the [0] slot.
Correct?
Thanks
Renato

PS: given that the math depends on the nEntries processed, i currently have it hard-coded with n-1 and n since i know there are 100 entries, but i wonder how i can keep track of n to do the averaging.

The code as it is now is not compiling, i am prototyping it in a Macro, when i try to compile it .x FILE.C i get :

ROOT_prompt_2:1:7: error: no member named 'GetThreadPoolSize' in namespace 'ROOT'

among other issues. Is this due to ROOT version i am using? ( Welcome to ROOT 6.18/04 )
?

Yes, ROOT::GetThreadPoolSize was called ROOT::GetImplicitMTPoolSize in 6.18 (deprecated in 6.20, removed in 6.22 if I remember correctly).

The implementation looks ok at a first glance, but the best and only way to be sure is write a test :slight_smile: I did not double-check your math, for instance.

You can see several other action helper examples at https://github.com/root-project/root/blob/master/tree/dataframe/inc/ROOT/RDF/ActionHelpers.hxx (these are the ones RDF uses for its own actions).
MeanHelper, for instance, keeps a count of events processed by each thread/slot.

Cheers,
Enrico

Ok, i get it compiling, but i had to remove all the shared_ptr stuff.

class CovarianceHelper : public ROOT::Detail::RDF::RActionImpl<CovarianceHelper>{ 
    public : 
        using Result_t = double;
    private : 
        std::vector<  double  > _xyproductSUM;  //one per data processing slot
        std::vector<  double  > _xStatsSUM; //one per data processing slot
        std::vector<  double  > _yStatsSUM; //one per data processing slot
    public : 
        /// This constructor takes all the parameters necessary to build the covariance. 
        /// In addition, it requires the names of
        //  the columns which will be used.        
        CovarianceHelper( ){
            const auto nSlots = ROOT::IsImplicitMTEnabled() ?  ROOT::GetImplicitMTPoolSize() : 1;
            for (auto i : ROOT::TSeqU(nSlots)){
                 _xyproductSUM.emplace_back(0.);
                 _xStatsSUM.emplace_back(0.);
                 _yStatsSUM.emplace_back(0.);
                 (void)i;
            }
        }
        CovarianceHelper( CovarianceHelper &&)= default;
        CovarianceHelper( const CovarianceHelper &) = delete;
        std::shared_ptr<double> GetResultPtr() const { 
            // 1/(100-1) * ( Sum( x* y ) - 100 * <x> * <y> )
            double result = (1./99.) *  ( _xyproductSUM[0] - 100 * (_xStatsSUM[0] / 100.) * ( _yStatsSUM[0]/100.) ) ;
            return std::make_shared<double>( result);
        }
        void Initialize() {}
        void InitTask(TTreeReader *, unsigned int) {}
        void Exec(unsigned int slot,double X, double Y){
            _xyproductSUM[slot] += X*Y;
            _xStatsSUM[slot] += X;
            _yStatsSUM[slot] += Y;
        }
        void Finalize(){
            auto xyproductSUM = _xyproductSUM[0];
            auto xStats = _xStatsSUM[0];
            auto yStats = _yStatsSUM[0];
            for( auto  slot : ROOT::TSeqU(1, _xyproductSUM.size())){
                xyproductSUM += _xyproductSUM[slot];
                xStats       += _xStatsSUM[slot];
                yStats       += _yStatsSUM[slot];
            }
            _xyproductSUM[0] = xyproductSUM;
            _xStatsSUM[0] = xStats ;
            _yStatsSUM[0] = yStats;
        }
        std::string GetActionName(){
            return "CovarianceHelper";
        }
};

What is the reason of having a shared pointer for the private members? I guess with a simple counting example i have, i don’t need them, but i might be wrong.

You don’t need all those shared ptrs, you only need one.
If you test this implementation, I think you will see that it does not return the expected result.
The reason is that GetResultPtr is called before the event loop starts, but your implementation assumes that it is called after the event loop. The docs could be clearer about that, admittedly.

I suggest to:

  • add a single shared_ptr<double> _result; private data member
  • have GetResultPtr return a copy of _result
  • add a last line to Finalize that computes the result and assigns it to _result
  • write a test :smiley:

Cheers,
Enrico

Hi @eguiraud, here the working test i have

template< typename T> 
class CovarianceHelper : public ROOT::Detail::RDF::RActionImpl<CovarianceHelper<T>>{ 
    public : 
        using Covariance_t = T;    
        using Result_t = Covariance_t;
    private : 
        std::vector<  std::shared_ptr<Covariance_t>  > _xyproductSUM;  //one per data processing slot
        std::vector<  std::shared_ptr<Covariance_t>  > _xStatsSUM; //one per data processing slot
        std::vector<  std::shared_ptr<Covariance_t>  > _yStatsSUM; //one per data processing slot
        std::shared_ptr<Covariance_t> _result; 
    public : 
        /// This constructor takes all the parameters necessary to build the covariance. 
        /// In addition, it requires the names of
        //  the columns which will be used.        
        CovarianceHelper( ){
            const auto nSlots = ROOT::IsImplicitMTEnabled() ?  ROOT::GetImplicitMTPoolSize() : 1;
            for (auto i : ROOT::TSeqU(nSlots)){
                 _xyproductSUM.emplace_back(std::make_shared<double>(0.) );
                 _xStatsSUM.emplace_back(std::make_shared<double>(0.) );
                 _yStatsSUM.emplace_back(std::make_shared<double>(0.) );                 
                 (void)i;
            }        
            _result =  std::make_shared<double>(0.);
        }
        CovarianceHelper( CovarianceHelper &&)= default;
        CovarianceHelper( const CovarianceHelper &) = delete;
        std::shared_ptr<Covariance_t> GetResultPtr() const { 
            // 1/(100-1) * ( Sum( x* y ) - 100 * <x> * <y> )
            std::cout<< "Result xySum = " << *_xyproductSUM[0] << std::endl;
            std::cout<< "Result xSum = " << *_xStatsSUM[0] << std::endl;
            std::cout<< "Result ySum = " << *_yStatsSUM[0] << std::endl;
            // Covariance_t result = (1./99.) *  ( _xyproductSUM[0] - 100 * (_xStatsSUM[0] / 100.) * ( _yStatsSUM[0]/100.) ) ;
            return  _result;
        }
        void Initialize() {}
        void InitTask(TTreeReader *, unsigned int) {}
        template <typename... ColumnTypes>
        void Exec(unsigned int slot, ColumnTypes... values){
            // void Exec(unsigned int slot,double X, double Y){
            std::array<double, sizeof...(ColumnTypes)> valuesArr{static_cast<double>(values)...};     
            std::cout<< "  valuesArr[0] " <<  valuesArr[0] << std::endl;
            std::cout<< "  valuesArr[1] " <<  valuesArr[1] << std::endl;

            *_xyproductSUM[slot] += valuesArr[0]*valuesArr[1];
            *_xStatsSUM[slot] += valuesArr[0];
            *_yStatsSUM[slot] += valuesArr[1];
        }
        void Finalize(){
            std::cout<<"Finalize"<<std::endl;        
            for( auto  slot : ROOT::TSeqU(1, _xyproductSUM.size())){
                *_xyproductSUM[0] += *_xyproductSUM[slot];
                *_xStatsSUM[0]    += *_xStatsSUM[slot];
                *_yStatsSUM[0]    += *_yStatsSUM[slot];
            }
            *_result =  (1./100.) *  ( *_xyproductSUM[0] ) -  ( (*_xStatsSUM[0]) / 100.) * ( (*_yStatsSUM[0])/100.)  ;
            std::cout<<"Finalize done"<<std::endl;
        }
        std::string GetActionName(){
            return "CovarianceHelper";
        }
};
void Covariance_Handling(){
    ROOT::RDataFrame df( "Base_Augmented", "AllEffs.root");
    auto covi = CovarianceHelper<double>();
    auto cov = df.Book<double,double>(std::move(covi), {"RKst_MM_12_MD_effnorm_L0L_exclusive_wBp_kde", "RKst_MM_12_MD_effnorm_L0L_exclusive_wBp_kde"} );
    auto stdev = df.StdDev("RKst_MM_12_MD_effnorm_L0L_exclusive_wBp_kde" );
    std::cout << "MYCOV(X,X) " << * cov << std::endl;
    std::cout << "UnbiasedSTDV (X) * UnbiasedSTDV (X) " << (* stdev ) * ( * stdev) << std::endl;
    /*
    For each term we need to make Col1*Col2 element by element and also do the Average of each column indepenedntly     
    Cov(X,Y) = 1/(100-1) * ( Sum( x* y ) - 100 * <x> * <y> )
    */
}

The code seems to work fine, however when i compare cov(X,X) with the StdDev squared value from RDataFrame i get slightly different valuees:

From my code : 1.20521e-07
UnbiasedSTDV (X) * UnbiasedSTDV (X) 1.21738e-07

In principle Cov(X,X) / StdDev^{2}(x) = 1 , It’s almost correct, but not entirely. I am not sure now where is my mistake.
This code can be used as reproducer i think .

Note that 1.21738e-07 * 99 / 100 equals 1.2052062e-07, i.e. RDF is using N-1 as denominator and your code is using N.

Cheers,
Enrico

1 Like

Indeed, changing :

            *_covariance =  (1./99.) * (  ( *_xyproductSUM[0] ) -  1./100. * ( (*_xStatsSUM[0])) * ( (*_yStatsSUM[0])) )  ;

All works fine.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.