#!/usr/bin/env catnip
# Polars : ingestion parallèle avec ND-map

pl = import('polars')
tempfile = import('tempfile')
import('pathlib', 'Path')

# Mode ND pour paralléliser le map
pragma('nd_mode', ND.process)
pragma('nd_workers', 4)

# Crée un mini jeu de fichiers pour forcer des lectures concurrentes
tmp_dir = Path(tempfile.mkdtemp(prefix="catnip-polars-"))

struct Dataset { name; content; }

datasets = list(
    Dataset(name="north.csv", content="year,temp\n2020,1.2\n2021,1.6\n2022,1.1\n"),
    Dataset(name="south.csv", content="year,temp\n2020,2.4\n2021,2.1\n2022,2.8\n"),
    Dataset(name="east.csv", content="year,temp\n2020,0.6\n2021,0.9\n2022,1.0\n"),
)

for d in datasets {
    (tmp_dir / d.name).write_text(d.content)
}

paths = datasets.[(d) => { str(tmp_dir / d.name) }]

# ND-map: limiter chaque lecture à un résumé local
read_stats = (path) => {
    df = pl.read_csv(path)
    region = path.split("/")[-1].split(".")[0]
    dict(
        region=region,
        rows=df.height,
        mean_temp=round(df['temp'].mean(), 3),
    )
}

stats = paths.[~> read_stats]

print("⇒ Parallel stats per region")
for s in stats {
    print(f"  {s['region']} rows: {s['rows']} | mean temp: {s['mean_temp']}")
}

# ND-map encore: charger en parallèle, concaténer en une seule passe
frames = paths.[~>(path) => { pl.read_csv(path) }]
all_df = pl.concat(frames)

print()
print(f"⇒ Total rows: {all_df.height}")

# Variante lazy: construire le plan en parallèle, exécuter au collect
lazy_frames = paths.[~>(path) => { pl.scan_csv(path) }]
lazy_all = pl.concat(lazy_frames).collect()

print()
print(f"⇒ Lazy total rows: {lazy_all.height}")