codex/data-analytics/polars_parallel_ingest.cat
# Polars : ingestion parallèle avec ND-map
#
# Exécuter: catnip -m polars -m tempfile -m pathlib data-analytics/polars_parallel_ingest.cat
pl = import("polars")
tempfile = import("tempfile")
pathlib = import("pathlib")
# Mode ND pour paralléliser le map
pragma("nd_mode", "parallel")
pragma("nd_workers", "4")
# Crée un mini jeu de fichiers pour forcer des lectures concurrentes
tmp_dir = pathlib.Path(tempfile.mkdtemp(prefix="catnip-polars-"))
datasets = list(
dict(name="north.csv", content="year,temp\n2020,1.2\n2021,1.6\n2022,1.1\n"),
dict(name="south.csv", content="year,temp\n2020,2.4\n2021,2.1\n2022,2.8\n"),
dict(name="east.csv", content="year,temp\n2020,0.6\n2021,0.9\n2022,1.0\n")
)
for d in datasets {
(tmp_dir / d["name"]).write_text(d["content"])
}
paths = datasets.[(d) => { str(tmp_dir / d["name"]) }]
# ND-map: limiter chaque lecture à un résumé local
read_stats = (path) => {
df = pl.read_csv(path)
region = path.split("/")[-1].split(".")[0]
dict(
region=region,
rows=df.height,
mean_temp=round(df["temp"].mean(), 3)
)
}
stats = paths.[~> read_stats]
print("⇒ Parallel stats per region")
for s in stats {
print(" ", s["region"], "rows:", s["rows"], "| mean temp:", s["mean_temp"])
}
# ND-map encore: charger en parallèle, concaténer en une seule passe
frames = paths.[~> (path) => { pl.read_csv(path) }]
all_df = pl.concat(frames)
print("\n⇒ Total rows:", all_df.height)
# Variante lazy: construire le plan en parallèle, exécuter au collect
lazy_frames = paths.[~> (path) => { pl.scan_csv(path) }]
lazy_all = pl.concat(lazy_frames).collect()
print("\n⇒ Lazy total rows:", lazy_all.height)