Saving a large RDF distributed with Dask to CSV

I’m working with ROOT’s RDataFrame distributed with Dask to process very large datasets (terabyte scale). I want to save selected branches of a distriburted_rdf to a CSV file, but memory usage becomes a bottleneck when converting the RDF to a pandas DataFrame. Here’s a minimal snippet:

df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(
    tree_name, root_file_list, daskclient=client, npartitions=800
)

chunk_size = 400000
for start in range(0, num_entries, chunk_size):
    end = min(start + chunk_size, num_entries)
    df_pd = pd.DataFrame({col: list(df.AsNumpy([col])[col]) for col in column_order})
    df_pd.to_csv("output.csv", index=False, mode='a', header=(start == 0))

I have also tried other approaches like

  • Dask Delayed to DataFrame, then .compute() to pandas — still memory-heavy.
  • Manual partition writing after client.gather() — doesn’t scale well.
  • However, chunking significantly slows down the process and is now the main bottleneck.

:link: Full notebook: /github.com/asrithhep/Anomaly-detection-with-transformer-model-in-HEP/blob/main/RDataframe_main.ipynb (see cell 8 for the problem area)

Is there a way to directly save a ROOT RDataFrame (distributed with Dask) to CSV efficiently without converting the full dataset to pandas or hitting memory constraints? Any suggestions for optimizing this IO operation while maintaining performance for TB-scale data?
ROOT Version: 6.34
Platform:_ swan.cern.ch

Other methods

  1. Dask Delayed + Dask DataFrame conversion:
# Convert dict to delayed DataFrame
delayed_df = dask.delayed(pd.DataFrame)(data_dict)

# Wrap into Dask dataframe
dask_df = dd.from_delayed([delayed_df])
dask_df = dask_df.repartition(npartitions=1000).persist()

# Compute and write to CSV
df_pd = dask_df.compute()
df_pd.to_csv("output.csv", index=False, mode=mode, header=header)
  1. Manual partition writer:
def write_partition(partition_id, data):
    import os
    output_dir = "csv_chunks"
    os.makedirs(output_dir, exist_ok=True)
    filename = os.path.join(output_dir, f"part_{partition_id:04d}.csv")
    
    with open(filename, "w") as f:
        f.write(",".join(columns) + "\n")
        for row in zip(*[data[col] for col in columns]):
            f.write(",".join(map(str, row)) + "\n")
    return filename

partition_data = client.gather(df.AsNumpy(columns))
written_files = [write_partition(0, partition_data)]

First, welcome to the ROOT Forum!
Then I think maybe @vpadulan can help

Just posting here for others who might find this topic.

As discussed in person, TB-scale CSV is absolutely not advisable. Due to ROOT’s very efficient compression, the dataset would blow up to several its current size.
For looking at a few events, the way to go via numpy is viable, but when feeding ML, something like a batch generator from ROOT directly to ML is a better way forward: ROOT: tutorials/machine_learning/RBatchGenerator_PyTorch.py File Reference