I’m working with ROOT’s RDataFrame distributed with Dask to process very large datasets (terabyte scale). I want to save selected branches of a distriburted_rdf to a CSV file, but memory usage becomes a bottleneck when converting the RDF to a pandas DataFrame. Here’s a minimal snippet:
df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(
tree_name, root_file_list, daskclient=client, npartitions=800
)
chunk_size = 400000
for start in range(0, num_entries, chunk_size):
end = min(start + chunk_size, num_entries)
df_pd = pd.DataFrame({col: list(df.AsNumpy([col])[col]) for col in column_order})
df_pd.to_csv("output.csv", index=False, mode='a', header=(start == 0))
I have also tried other approaches like
- Dask Delayed to DataFrame, then
.compute()
to pandas — still memory-heavy. - Manual partition writing after
client.gather()
— doesn’t scale well. - However, chunking significantly slows down the process and is now the main bottleneck.
Full notebook: /github.com/asrithhep/Anomaly-detection-with-transformer-model-in-HEP/blob/main/RDataframe_main.ipynb (see cell 8 for the problem area)
Is there a way to directly save a ROOT RDataFrame
(distributed with Dask) to CSV efficiently without converting the full dataset to pandas or hitting memory constraints? Any suggestions for optimizing this IO operation while maintaining performance for TB-scale data?
ROOT Version: 6.34
Platform:_ swan.cern.ch
Other methods
- Dask Delayed + Dask DataFrame conversion:
# Convert dict to delayed DataFrame
delayed_df = dask.delayed(pd.DataFrame)(data_dict)
# Wrap into Dask dataframe
dask_df = dd.from_delayed([delayed_df])
dask_df = dask_df.repartition(npartitions=1000).persist()
# Compute and write to CSV
df_pd = dask_df.compute()
df_pd.to_csv("output.csv", index=False, mode=mode, header=header)
- Manual partition writer:
def write_partition(partition_id, data):
import os
output_dir = "csv_chunks"
os.makedirs(output_dir, exist_ok=True)
filename = os.path.join(output_dir, f"part_{partition_id:04d}.csv")
with open(filename, "w") as f:
f.write(",".join(columns) + "\n")
for row in zip(*[data[col] for col in columns]):
f.write(",".join(map(str, row)) + "\n")
return filename
partition_data = client.gather(df.AsNumpy(columns))
written_files = [write_partition(0, partition_data)]