#!/usr/bin/env catnip
# Polars : ingestion parallèle avec ND-map
pl = import('polars')
tempfile = import('tempfile')
import('pathlib', 'Path')
# Mode ND pour paralléliser le map
pragma('nd_mode', ND.process)
pragma('nd_workers', 4)
# Crée un mini jeu de fichiers pour forcer des lectures concurrentes
tmp_dir = Path(tempfile.mkdtemp(prefix="catnip-polars-"))
struct Dataset { name; content; }
datasets = list(
Dataset(name="north.csv", content="year,temp\n2020,1.2\n2021,1.6\n2022,1.1\n"),
Dataset(name="south.csv", content="year,temp\n2020,2.4\n2021,2.1\n2022,2.8\n"),
Dataset(name="east.csv", content="year,temp\n2020,0.6\n2021,0.9\n2022,1.0\n"),
)
for d in datasets {
(tmp_dir / d.name).write_text(d.content)
}
paths = datasets.[(d) => { str(tmp_dir / d.name) }]
# ND-map: limiter chaque lecture à un résumé local
read_stats = (path) => {
df = pl.read_csv(path)
region = path.split("/")[-1].split(".")[0]
dict(
region=region,
rows=df.height,
mean_temp=round(df['temp'].mean(), 3),
)
}
stats = paths.[~> read_stats]
print("⇒ Parallel stats per region")
for s in stats {
print(f" {s['region']} rows: {s['rows']} | mean temp: {s['mean_temp']}")
}
# ND-map encore: charger en parallèle, concaténer en une seule passe
frames = paths.[~>(path) => { pl.read_csv(path) }]
all_df = pl.concat(frames)
print()
print(f"⇒ Total rows: {all_df.height}")
# Variante lazy: construire le plan en parallèle, exécuter au collect
lazy_frames = paths.[~>(path) => { pl.scan_csv(path) }]
lazy_all = pl.concat(lazy_frames).collect()
print()
print(f"⇒ Lazy total rows: {lazy_all.height}")