//C++ #include #include #include #include #include #include #include #include #include //root #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //BOOST #include #include #include #include template //template because we want to use r- and l-value std::strings std::vector parse_filenames(STR&& input_file_names, const std::string& workdir = "."){ std::vector input_files; //if the input string starts with '@', we have to parse a whole input file if(boost::algorithm::starts_with(input_file_names,"@")){ std::string line; std::ifstream myfile; myfile.open(input_file_names.substr(1).data());//substr returns the string without the '@' if(!myfile) myfile.open((workdir + "/" + input_file_names.substr(1)).data()); while(std::getline (myfile,line)) input_files.push_back(line);//TODO: could be that there is a trailing character that needs to be removed myfile.close(); } else //the input string doesn't start with '@', boost does the work for us :) boost::algorithm::split(input_files, input_file_names, boost::algorithm::is_any_of(";")); if(input_files.empty()) throw std::runtime_error("Is \"input_file_names\" empty?"); return input_files; } template //template because we want to use r- and l-value std::strings std::unique_ptr get_chain(STR1&& treename, STR2&& input_file_names, const std::string& workdir = ".", const std::string& chain_title = "Chain"){ auto input_files = parse_filenames(input_file_names,workdir); std::unique_ptr ch{new TChain(treename.data(),chain_title.data())}; for(auto& infn : input_files) ch->Add(infn.data()); if(!ch->GetFile()){ std::cout << "Cannot find input files. Trying to prepend workdir." << std::endl; for(auto& infn : input_files) ch->Add((workdir+"/"+infn).data()); if(!ch->GetFile()) throw std::runtime_error("TChain is empty. Check inputs!"); } return std::move(ch); } int tree_trimmerTPE(const std::string& ifn, const std::string& itn, const std::string& ofn , const std::string& cfn, const std::string& wd = "."){ TStopwatch clock; clock.Start(); //get input as TChain //gEnv->SetValue("XNet.ForceParallelOpen",1); <-- doesn't work auto chain = get_chain(itn,ifn,wd); //get config-file boost::property_tree::ptree configtree; boost::property_tree::read_info(cfn,configtree,configtree); auto trim_vars = configtree.get_child_optional("variables"); auto tmp_msg = "Trimming input tree(s) with cut " + configtree.get("basiccuts",""); if(trim_vars) printf("%s and writing out %lu variables\n",tmp_msg.data(),configtree.get_child("variables").size()); else printf("%s and writing out full tree\n",tmp_msg.data()); // Like in PROOF, each thread will produce it's output, which is merged later // The chain is split into equal ranges of entries to process. // So it is assumed that we have many entries, and that the threads process them with similar speed. // //first, we need to know how many events there are and how many threads we use: const auto max_workers = std::thread::hardware_concurrency(); auto n_workers = configtree.get("threads",max_workers); printf("Trying to run with %u parallel threads\n",n_workers); if(n_workers > max_workers){ printf("Current machine only supports %u threads. Will be using this amount\n",max_workers); n_workers = max_workers; } //now we can split the number of events into (almost) equal sized parts (assuming nevents >> n_threads) const auto nevents = chain->GetEntries(); const auto range = nevents/n_workers; //other stuff to be done on the master const auto outtreename = configtree.get("outtreename",itn); const auto outpfopt = configtree.get("outfopt","recreate"); //now we need the function that should be executed on the workers auto loop_and_fill = [&] (unsigned int i_worker) { //get range const auto range_lo = i_worker*range; //prevent events to be cut off by setting this manually const typename std::decay::type range_hi = i_worker < static_cast(n_workers - 1) ? (i_worker+1)*range : nevents; printf("Worker %u: First entry: %lld last entry: %lld\n",i_worker,range_lo,range_hi); //open the chain in each thread auto wchain = get_chain(itn,ifn,wd);//this is where it breaks with xrootd printf("Worker %u: Got chain for reading\n",i_worker); if(trim_vars){ wchain->SetBranchStatus("*",0); for(const auto& var : *trim_vars) wchain->SetBranchStatus(var.first.data(),true); } //define the output TFile* of = new TFile((wd + "/" + std::to_string(i_worker) + "_" + ofn).data(),outpfopt.data()); auto otree = wchain->CloneTree(0); otree->SetNameTitle(outtreename.data(),(outtreename).data()); otree->SetDirectory(of); for(typename std::decay::type ev = range_lo; ev < range_hi; ev++) { wchain->GetEntry(ev); otree->Fill(); } of->cd(); otree->GetCurrentFile()->Write(); otree->GetCurrentFile()->Close(); SafeDelete(of); return 0u; }; //let the games begin ROOT::TProcessExecutor pex(n_workers); pex.Map(loop_and_fill,ROOT::TSeqU(n_workers)); clock.Stop(); clock.Print(); return 0; }