//C++ #include #include #include #include #include #include #include #include #include //root #include #include #include #include #include #include #include #include #include #include #include #include #include #include //local #include "../IOjuggler/ProgressBar.h" #include "../IOjuggler/IOjuggler.h" #ifndef NTUPLE_GIZMO_GIT_HASH #define NTUPLE_GIZMO_GIT_HASH " " #endif int main(int argc, char** argv){ //parse command line options const auto options = IOjuggler::parse_options(argc, argv, "c:d:i:o:t:v:h","nonoptions: any number of combinations"); const auto wd = options.get("workdir"); const auto ofn = options.get("outfilename"); const auto ifn = options.get("infilename"); const auto itn = options.get("treename"); //init MessageService for easy and nice printout MessageService msgsvc("selector_rootMT",static_cast(options.get("verbosity"))); msgsvc.debugmsg("Current ntuple-gizmo git hash: " + static_cast(NTUPLE_GIZMO_GIT_HASH)); //get input as TChain //gEnv->SetValue("XNet.ForceParallelOpen",1); <-- doesn't work auto chain = IOjuggler::get_chain(itn,ifn,wd,msgsvc); //get config-file auto configtree = IOjuggler::get_ptree(options.get("config")); //manipulations of the config file IOjuggler::auto_replace_in_ptree(configtree); IOjuggler::auto_append_in_ptree(configtree); auto trim_vars = configtree.get_child_optional("variables"); auto tmp_msg = "Trimming input tree(s) with cut " + configtree.get("basiccuts",""); if(trim_vars) msgsvc.infomsg(tmp_msg + " and writing out " + configtree.get_child("variables").size() + " variables"); else msgsvc.infomsg(tmp_msg + " and writing out full tree"); const bool noImplicitCuts= (const bool) (configtree.get_child_optional("noImplicitCuts")); if(noImplicitCuts) msgsvc.infomsg("Implicit cuts switched off"); // Like in PROOF, each thread will produce it's output, which is merged later // The chain is split into equal ranges of entries to process. // So it is assumed that we have many entries, and that the threads process them with similar speed. // //first, we need to know how many events there are and how many threads we use: const auto max_workers = std::thread::hardware_concurrency(); auto n_workers = configtree.get("threads",max_workers); msgsvc.infomsg("Trying to run with " + std::to_string(n_workers) + " parallel threads"); if(n_workers > max_workers){ msgsvc.infomsg("Current machine only supports " + std::to_string(max_workers) + " parallel threads. I will be using this number"); n_workers = max_workers; } //now we can split the number of events into (almost) equal sized parts (assuming nevents >> n_threads) const auto nevents = chain->GetEntries(); const auto range = nevents/n_workers; //init counter for the Progress bar ProgressBar pb; pb.start(static_cast(range),msgsvc,n_workers); //other stuff to be done on the master const auto outtreename = configtree.get("outtreename",itn); const auto outpfopt = configtree.get("outfopt","recreate"); //now we need the function that should be executed on the workers auto loop_and_fill = [&] (unsigned int i_worker) { //get range const auto range_lo = i_worker*range; //prevent events to be cut off by setting this manually const typename std::decay::type range_hi = i_worker < static_cast(n_workers - 1) ? (i_worker+1)*range : nevents; msgsvc.debugmsg("Worker " + std::to_string(i_worker) + ": First entry: " + std::to_string(range_lo) + " last entry: " + std::to_string(range_hi)); //open the chain in each thread auto wchain = IOjuggler::get_chain(itn,ifn,wd,msgsvc);//this is where it breaks with xrootd if(trim_vars){ wchain->SetBranchStatus("*",0); for(const auto& var : *trim_vars) wchain->SetBranchStatus(var.first.data(),true); } //define the output TFile* of = new TFile((wd + "/" + std::to_string(i_worker) + "_" + ofn).data(),outpfopt.data()); auto otree = wchain->CloneTree(0); otree->SetNameTitle(outtreename.data(),(outtreename).data()); otree->SetDirectory(of); for(typename std::decay::type ev = range_lo; ev < range_hi; ev++) { wchain->GetEntry(ev); if(i_worker == 0) pb.update(ev); otree->Fill(); } of->cd(); otree->GetCurrentFile()->Write(); otree->GetCurrentFile()->Close(); SafeDelete(of); return 0u; }; //let the games begin ROOT::TProcessExecutor pex(n_workers); pex.Map(loop_and_fill,ROOT::TSeqU(n_workers)); const int time = pb.stop(); msgsvc.infomsg("Elapsed time for processing chain: "+std::to_string(time)+" s"); return 0; }