#!/usr/bin/env catnip
# Lecture/écriture Parquet avec PyArrow
# Parquet est le format columnar standard pour le big data

pyarrow = import('pyarrow')
pq = import('pyarrow.parquet')
tempfile = import('tempfile')
pathlib = import('pathlib')
shutil = import('shutil')

# Struct Employee

struct Employee {
    id; name; age; salary; department;
    to_dict(self) => {
        dict(id=self.id, name=self.name, age=self.age, salary=self.salary, department=self.department)
    }
    display(self) => {
        f"#{self.id} {self.name} ({self.age}ans, {self.department}) - {self.salary} EUR"
    }
}

# Écriture factorisée

write_parquet = (table, path, compression=None) => {
    if compression == None {
        pq.write_table(table, str(path))
    } else {
        pq.write_table(table, str(path), compression=compression)
    }
}

# Création de données via structs

print("⇒ Création de données")
employees = list(
    Employee(1, "Alice", 28, 75000.0, "Engineering"),
    Employee(2, "Bob", 35, 85000.0, "Sales"),
    Employee(3, "Charlie", 42, 95000.0, "Engineering"),
    Employee(4, "Diana", 31, 72000.0, "Marketing"),
    Employee(5, "Eve", 27, 68000.0, "Engineering"),
)

for e in employees {
    print(f"  {e.display()}")
}

# Conversion structs → dict columnar pour Arrow
data = dict(
    id=employees.[(e) => { e.id }],
    name=employees.[(e) => { e.name }],
    age=employees.[(e) => { e.age }],
    salary=employees.[(e) => { e.salary }],
    department=employees.[(e) => { e.department }],
)

# Créer une Table Arrow

table = pyarrow.table(data)
print(f"  Colonnes: {table.column_names}")
print(f"  Lignes: {table.num_rows}")
print("  Schema:")
for field in table.schema {
    print(f"     {field.name} : {field.type}")
}

# Écriture Parquet

temp_dir = pathlib.Path(tempfile.mkdtemp())
parquet_file = temp_dir / "employees.parquet"

print()
print("⇒ Écriture Parquet")
write_parquet(table, parquet_file, None)
print(f"  Fichier créé: {parquet_file}")
print(f"  Taille: {parquet_file.stat().st_size} bytes")

# Lecture Parquet

print()
print("⇒ Lecture Parquet")
table2 = pq.read_table(str(parquet_file))
print(f"  Lignes lues: {table2.num_rows}")

# Accès aux colonnes

print()
print("⇒ Accès aux colonnes")
names = table2.column("name").to_pylist()
print(f"  Noms: {names}")

ages = table2.column("age").to_pylist()
print(f"  Ages: {ages}")

# Lecture partielle (colonnes spécifiques)

print()
print("⇒ Lecture partielle")
partial = pq.read_table(str(parquet_file), columns=list("name", "salary"))
print(f"  Colonnes lues: {partial.column_names}")
print("  Données:")
for i in range(partial.num_rows) {
    n = partial.column("name")[i].as_py()
    s = partial.column("salary")[i].as_py()
    print(f"     {n} - {s} EUR")
}

# Conversion vers dictionnaire Python

print()
print("⇒ Conversion vers dict")
py_dict = table2.to_pydict()
print(f"  Clés: {py_dict.keys()}")
print("  Premier employé:")
print(f"     Name: {py_dict['name'][0]}")
print(f"     Age: {py_dict['age'][0]}")
print(f"     Salary: {py_dict['salary'][0]}")

# Métadonnées du fichier

print()
print("⇒ Métadonnées Parquet")
parquet_meta = pq.read_metadata(str(parquet_file))
print(f"  Nombre de row groups: {parquet_meta.num_row_groups}")
print(f"  Nombre de colonnes: {parquet_meta.num_columns}")
print(f"  Lignes totales: {parquet_meta.num_rows}")

# Statistiques par colonne

print()
print("⇒ Statistiques colonnes")
schema_meta = pq.read_schema(str(parquet_file))
print("  Schema Parquet:")
for i in range(len(schema_meta)) {
    print(f"     {schema_meta.field(i)}")
}

# Écriture avec compression

compressed_file = temp_dir / "employees_compressed.parquet"

print()
print("⇒ Écriture avec compression")
write_parquet(table, compressed_file, "snappy")
print(f"  Fichier original: {parquet_file.stat().st_size} bytes")
print(f"  Fichier compressé: {compressed_file.stat().st_size} bytes")

# Partitionnement

print()
print("⇒ Écriture partitionnée par département")
partitioned_dir = temp_dir / "partitioned"
pq.write_to_dataset(table, root_path=str(partitioned_dir), partition_cols=list("department"))

# Lister les partitions
for item in partitioned_dir.iterdir() {
    if item.is_dir() {
        print(f"  Partition: {item.name}")
        for f in item.glob("*.parquet") {
            print(f"    - {f.name}")
        }
    }
}

# Lecture du dataset partitionné

print()
print("⇒ Lecture dataset partitionné")
dataset = pq.read_table(str(partitioned_dir))
print(f"  Lignes totales: {dataset.num_rows}")

# Nettoyage

shutil.rmtree(str(temp_dir))
print()
print("⇒ Nettoyage effectué")