Dear experts
I have a question about usage of _rdfentry as trick suggested in various threads in the forum to interoperate RDataFrame with python modules to attach BDT weights.
I have ended up with a code like this in my analysis :
def reweight_node(
self,
node: ROOT.RDF.RNode,
particle_kin: str = "B",
#will use "B_PT", "B_ETA" for example if particle_kin is "B" (if kin_variables is PT ETA)
label_weight_postfix: str = "",
sample_tag_train: list = ["bp2jpsikp", "bs2jpsiphi"],
blocks: list = [
"block1", "block2", "block5", "block6", "block7", "block8",
"25c1_up", "25c1_down"
],
versions: list = ["pidselect", "pidweight", "pid_trackweight"]
) -> ROOT.RDF.RNode:
cols_kin = [f"{particle_kin}_{var}" for var in self.kin_variables]
cols_mult = self.mult_variables # assuming multiplicity variables are not particle specific, as in the current setup where they are nPVs and nLongTracks, but can be easily changed if needed.
cols = list(set(cols_kin + cols_mult))
arrays_df = node.AsNumpy(columns=cols)
logger.info(f"""
Columns used for kinematic reweighter (ordered): {cols_kin},
Columns used for multiplicity reweighter (ordered) : {cols_mult}
""")
# to trigger the RDF and check that the columns are there, otherwise the reweighter loading will be done for nothing and the error will come later when trying to compute the weights without the needed columns.
X_kin = np.column_stack([arrays_df[c] for c in cols_kin])
X_mult = np.column_stack([arrays_df[c] for c in cols_mult])
all_weights = {}
for sample in sample_tag_train:
# loop on sample input b+/b0
for block in blocks:
# loop on block
for version in versions:
# loop on trainign version
logger.info(
f"Loading reweighters for sample {sample}, block {block}, version {version}..."
)
kin_pkl = self.map_cfg[sample][block][version]["kin"]
mult_pkl = self.map_cfg[sample][block][version]["mult"]
if os.path.exists(self.eos_path + kin_pkl):
kin_reweighter = joblib.load(self.eos_path + kin_pkl)
weight_label_kin = f"wkin_{block}_{version}_{sample}"
kin_weights = np.asarray(
kin_reweighter.predict_weights(X_kin))
all_weights[weight_label_kin] = kin_weights
if self.normalize_check:
all_weights[weight_label_kin] = kin_weights * len(
kin_weights
) / kin_weights.sum(
) # renormalize to the original number of events, to check that it does not change the normalization (it should not, but just in case to be sure that there is no bug in the code that creates the reweighters)
else:
logger.error(
f"Kinematic reweighter pickle file not found for sample {sample}, block {block}, version {version} at location {self.eos_path + kin_pkl}. Please check the path and the maps.yaml configuration."
)
logger.error("SKIP attaching, but must be fixed!")
if os.path.exists(self.eos_path + mult_pkl):
mult_reweighter = joblib.load(self.eos_path + mult_pkl)
weight_label_mult = f"wmult_{block}_{version}_{sample}"
mult_weights = np.asarray(
mult_reweighter.predict_weights(X_mult))
all_weights[weight_label_mult] = mult_weights
if self.normalize_check:
all_weights[
weight_label_mult] = mult_weights * len(
mult_weights) / mult_weights.sum()
else:
logger.error(
f"Multiplicity reweighter pickle file not found for sample {sample}, block {block}, version {version} at location {self.eos_path + mult_pkl}. Please check the path and the maps.yaml configuration."
)
logger.error("SKIP attaching, but must be fixed!")
# ============================================================
# 2. Helper: create one unique Numba getter per weight array
# ============================================================
def make_weight_getter(weight_array, unique_name):
"""Returns a Numba-declared function that closes over this specific array."""
@ROOT.Numba.Declare(["unsigned long"], "float", unique_name)
def _getter(entry):
return weight_array[entry]
return unique_name
# ============================================================
# 3. Create getters + Define columns on the RDataFrame
# ============================================================
for key in all_weights.keys():
col_name = f"{key}{label_weight_postfix}"
getter_name = f"get_{col_name}"
unique_name = make_weight_getter(all_weights[key], getter_name)
logger.info(
f"Declaring column {col_name} using Numba::{unique_name}")
node = node.Define(col_name, f"Numba::{unique_name}(rdfentry_)")
return node
Which basically
- input a RDataFrame node:
- load 20 MVAs from some configuration file
- do a single AsNumpy( columns= [ cols_needed_for_mva])
- predict weights,
- declare a numba for each attacher, push back to the node the column added.
The âissueâ with this is the following :
1- canât work multi-threaded, itâs ok for now
2- if the inputted Node is âFilteredâ the rdfentry trick doesnât work or at least according to my collaborators this doesnât, for my use case i just attach this âupfrontâ , but it would be nice to make this work regardless of the inputted node.
As this operation is quite fast to do on the fly rather than saving intermediate tuples, i wonder if you have any suggestion on how to do this.
Is there an alternative of ârdfentryâ which works on âfiltered nodesâ ?
Thanks in advance ,
Renato