Using numba declare for python MVA node processing

Dear experts

I have a question about usage of _rdfentry as trick suggested in various threads in the forum to interoperate RDataFrame with python modules to attach BDT weights.

I have ended up with a code like this in my analysis :

def reweight_node(
        self,
        node: ROOT.RDF.RNode,
        particle_kin: str = "B",
        #will use "B_PT", "B_ETA" for example if particle_kin is "B" (if kin_variables is PT ETA)
        label_weight_postfix: str = "",
        sample_tag_train: list = ["bp2jpsikp", "bs2jpsiphi"],
        blocks: list = [
            "block1", "block2", "block5", "block6", "block7", "block8",
            "25c1_up", "25c1_down"
        ],
        versions: list = ["pidselect", "pidweight", "pid_trackweight"]
    ) -> ROOT.RDF.RNode:
        cols_kin = [f"{particle_kin}_{var}" for var in self.kin_variables]
        cols_mult = self.mult_variables  # assuming multiplicity variables are not particle specific, as in the current setup where they are nPVs and nLongTracks, but can be easily changed if needed.
        cols = list(set(cols_kin + cols_mult))
        arrays_df = node.AsNumpy(columns=cols)
        logger.info(f"""
            Columns used for kinematic reweighter (ordered): {cols_kin},
            Columns used for multiplicity reweighter (ordered) : {cols_mult}
        """)
        # to trigger the RDF and check that the columns are there, otherwise the reweighter loading will be done for nothing and the error will come later when trying to compute the weights without the needed columns.
        X_kin = np.column_stack([arrays_df[c] for c in cols_kin])
        X_mult = np.column_stack([arrays_df[c] for c in cols_mult])
        all_weights = {}
        for sample in sample_tag_train:
            # loop on sample input b+/b0
            for block in blocks:
                # loop on block
                for version in versions:
                    # loop on trainign version
                    logger.info(
                        f"Loading reweighters for sample {sample}, block {block}, version {version}..."
                    )
                    kin_pkl = self.map_cfg[sample][block][version]["kin"]
                    mult_pkl = self.map_cfg[sample][block][version]["mult"]
                    if os.path.exists(self.eos_path + kin_pkl):
                        kin_reweighter = joblib.load(self.eos_path + kin_pkl)
                        weight_label_kin = f"wkin_{block}_{version}_{sample}"
                        kin_weights = np.asarray(
                            kin_reweighter.predict_weights(X_kin))
                        all_weights[weight_label_kin] = kin_weights
                        if self.normalize_check:
                            all_weights[weight_label_kin] = kin_weights * len(
                                kin_weights
                            ) / kin_weights.sum(
                            )  # renormalize to the original number of events, to check that it does not change the normalization (it should not, but just in case to be sure that there is no bug in the code that creates the reweighters)
                    else:
                        logger.error(
                            f"Kinematic reweighter pickle file not found for sample {sample}, block {block}, version {version} at location {self.eos_path + kin_pkl}. Please check the path and the maps.yaml configuration."
                        )
                        logger.error("SKIP attaching, but must be fixed!")
                    if os.path.exists(self.eos_path + mult_pkl):
                        mult_reweighter = joblib.load(self.eos_path + mult_pkl)
                        weight_label_mult = f"wmult_{block}_{version}_{sample}"
                        mult_weights = np.asarray(
                            mult_reweighter.predict_weights(X_mult))
                        all_weights[weight_label_mult] = mult_weights
                        if self.normalize_check:
                            all_weights[
                                weight_label_mult] = mult_weights * len(
                                    mult_weights) / mult_weights.sum()
                    else:
                        logger.error(
                            f"Multiplicity reweighter pickle file not found for sample {sample}, block {block}, version {version} at location {self.eos_path + mult_pkl}. Please check the path and the maps.yaml configuration."
                        )
                        logger.error("SKIP attaching, but must be fixed!")

        # ============================================================
        # 2. Helper: create one unique Numba getter per weight array
        # ============================================================
        def make_weight_getter(weight_array, unique_name):
            """Returns a Numba-declared function that closes over this specific array."""

            @ROOT.Numba.Declare(["unsigned long"], "float", unique_name)
            def _getter(entry):
                return weight_array[entry]

            return unique_name

        # ============================================================
        # 3. Create getters + Define columns on the RDataFrame
        # ============================================================
        for key in all_weights.keys():
            col_name = f"{key}{label_weight_postfix}"
            getter_name = f"get_{col_name}"
            unique_name = make_weight_getter(all_weights[key], getter_name)
            logger.info(
                f"Declaring column {col_name} using Numba::{unique_name}")
            node = node.Define(col_name, f"Numba::{unique_name}(rdfentry_)")
        return node

Which basically

  • input a RDataFrame node:
  • load 20 MVAs from some configuration file
  • do a single AsNumpy( columns= [ cols_needed_for_mva])
  • predict weights,
  • declare a numba for each attacher, push back to the node the column added.

The ‘issue’ with this is the following :
1- can’t work multi-threaded, it’s ok for now
2- if the inputted Node is “Filtered” the rdfentry trick doesn’t work or at least according to my collaborators this doesn’t, for my use case i just attach this ‘upfront’ , but it would be nice to make this work regardless of the inputted node.

As this operation is quite fast to do on the fly rather than saving intermediate tuples, i wonder if you have any suggestion on how to do this.

Is there an alternative of “rdfentry” which works on “filtered nodes” ?

Thanks in advance ,
Renato

Hello @rquaglia,

As this question is quite elaborate I will redirect you to the RDataframe experts, @vpadulan and @StephanH for a complete answer.

However I’m not sure I understand the rdfentry_ problem with filtering: if the problem is a mismatch between entry index and underlying entry after the filtering, maybe it’s just a matter of defining a custom column before the Filter and assign it to rdfentry_, so that its index remains associated to the same entry as before. If that is not the problem you were talking about, then nevermind :slight_smile:

Hi @silverweed , thanks. I think my question is indeed long but the point is exactly this , as I understood with Numba Delcare we attach colums based on “array[0,1,2,3,4…]” index which works with rdfentry only if the node is not filtered before being consumed.

Your suggestion is the correct one, the point is that using a lookup table for a “uniqueID” of an entry might be heavy in accessing it.

basically if one wants to do :

node = node.Filter("xx")
node = addReweight( node) 

And in addReweight you use

def addReweight( node) : 
    nodeNp = node.AsNumpy( colsNeed)
    weight_array = evalWeights( nodeNp)
    @ROOT.Numba.Declare(["unsigned long"], "float", "myfunction")
    def _getter(entry):
      return weight_array[entry]
    node = node.Define(col_name, f"Numba::myfunction(rdfentry_)")
    return node

I tried to make it as short as possible, the point being that rdfentry_ here is assumed to be
0,1,2,3,4,5....,N-1 with N = number of Filtered entries, but the rdfentry_ on a filtered node is only a sub-set of the N with values for example being 0,4,13 (if i understood correctly, which i hope i did). It would be nice basically to have something rdfentry_ like but being re-mapped post filtering to an ordered index like

Ah, I see, so the problem is actually the inverse of what I thought: you would need an rdfentry_ equivalent that “resets” after the filtering, right?

I’m not sure if there is such a mechanism built into RDF.
You can of course use a Define after the Filter to increment a lambda-captured variable and assign it as the column value, but that requires C++ so it’s not exactly ideal.

i mean it would be fine to do like that if it’s a small C++ code to create, it just needs to create ordered entries somewhat so that the addReweight can be applied “to any node” coming form any pre-processing step.

You mean doing something like :

ROOT.gInterpreter.Declare( """
RDF::RNode EntryPosSelectionCounter( RDF::RNode ){
    int counter = 0 ;
    if( node.HasColumn("dummy") ){
       auto retnode =  node.Define("index_nodestatus", [ & counter]( float dummy){ return counter++; }, "dummy")
    } else{

        auto retnode = node.Define("dummy" , "1.")
                .Define("index_nodestatus", [ & counter]( float dummy){ return counter++; }, "dummy")
    }
    return retnode; 
};
""")
node = EntryPosSelectionCounter( node)

def addReweight( node) : 
    nodeNp = node.AsNumpy( colsNeed)
    weight_array = evalWeights( nodeNp)
    @ROOT.Numba.Declare(["unsigned long"], "float", "myfunction")
    def _getter(entry):
      return weight_array[entry]
    node = node.Define(col_name, f"Numba::myfunction(index_nodestatus)")
    return node

Issue with this is that multiple instances of addReweight might be happening at different selection stage, so that the index_nodestatus has the same isue of rdfentry, of course one can make this safer parsing a different “index_nodestatus” for each call of addReweight.

Yes, something like that. Of course you need to make sure that counter is visible when you actually run the graph computation (in your example you’d get a dangling reference since counter is local to the C++ function).

My general idea was (pseudo-code):

int counter = 0;
# `df` is the already-filtered dataframe
df.Define("my_index", [&counter] { return counter++; })

so that the lambda is only run for entries that survived the filtering. Ideally you would define "my_index" immediately before using it, so that there is no further filtering involved that would invalidate it.
Clearly this doesn’t work as neatly in a more complicated scenario (as yours might be), like if you need multiple counters and so on. Sorry if my example is simplifying too much.