codex/files-formats/parquet_pyarrow.cat
# Lecture/écriture Parquet avec PyArrow
# Parquet est le format columnar standard pour le big data
#
# Exécuter: catnip -m pyarrow -m tempfile -m pathlib -m shutil parquet_pyarrow.cat

pyarrow = import("pyarrow")
tempfile = import("tempfile")
pathlib = import("pathlib")
shutil = import("shutil")

# Création de données

print("⇒ Création de données")
data = dict(
    id=list(1, 2, 3, 4, 5),
    name=list("Alice", "Bob", "Charlie", "Diana", "Eve"),
    age=list(28, 35, 42, 31, 27),
    salary=list(75000.0, 85000.0, 95000.0, 72000.0, 68000.0),
    department=list("Engineering", "Sales", "Engineering", "Marketing", "Engineering")
)

# Créer une Table Arrow

table = pyarrow.table(data)
print("  Colonnes:", table.column_names)
print("  Lignes:", table.num_rows)
print("  Schema:")
for field in table.schema {
    print("    ", field.name, ":", field.type)
}

# Écriture Parquet

temp_dir = pathlib.Path(tempfile.mkdtemp())
parquet_file = temp_dir / "employees.parquet"

print("\n⇒ Écriture Parquet")
pyarrow.parquet.write_table(table, str(parquet_file))
print("  Fichier créé:", parquet_file)
print("  Taille:", parquet_file.stat().st_size, "bytes")

# Lecture Parquet

print("\n⇒ Lecture Parquet")
table2 = pyarrow.parquet.read_table(str(parquet_file))
print("  Lignes lues:", table2.num_rows)

# Accès aux colonnes

print("\n⇒ Accès aux colonnes")
names = table2.column("name").to_pylist()
print("  Noms:", names)

ages = table2.column("age").to_pylist()
print("  Ages:", ages)

# Lecture partielle (colonnes spécifiques)

print("\n⇒ Lecture partielle")
partial = pyarrow.parquet.read_table(str(parquet_file), columns=list("name", "salary"))
print("  Colonnes lues:", partial.column_names)
print("  Données:")
i = 0
while i < partial.num_rows {
    print("    ", partial.column("name")[i].as_py(), "-", partial.column("salary")[i].as_py(), "€")
    i = i + 1
}

# Conversion vers dictionnaire Python

print("\n⇒ Conversion vers dict")
py_dict = table2.to_pydict()
print("  Clés:", py_dict.keys())
print("  Premier employé:")
print("    Name:", py_dict["name"][0])
print("    Age:", py_dict["age"][0])
print("    Salary:", py_dict["salary"][0])

# Métadonnées du fichier

print("\n⇒ Métadonnées Parquet")
parquet_meta = pyarrow.parquet.read_metadata(str(parquet_file))
print("  Nombre de row groups:", parquet_meta.num_row_groups)
print("  Nombre de colonnes:", parquet_meta.num_columns)
print("  Lignes totales:", parquet_meta.num_rows)

# Statistiques par colonne

print("\n⇒ Statistiques colonnes")
schema_meta = pyarrow.parquet.read_schema(str(parquet_file))
print("  Schema Parquet:")
i = 0
while i < len(schema_meta) {
    print("    ", schema_meta.field(i))
    i = i + 1
}

# Écriture avec compression

compressed_file = temp_dir / "employees_compressed.parquet"

print("\n⇒ Écriture avec compression")
pyarrow.parquet.write_table(table, str(compressed_file), compression="snappy")
print("  Fichier original:", parquet_file.stat().st_size, "bytes")
print("  Fichier compressé:", compressed_file.stat().st_size, "bytes")

# Partitionnement

print("\n⇒ Écriture partitionnée par département")
partitioned_dir = temp_dir / "partitioned"
pyarrow.parquet.write_to_dataset(table, root_path=str(partitioned_dir), partition_cols=list("department"))

# Lister les partitions
for item in partitioned_dir.iterdir() {
    if item.is_dir() {
        print("  Partition:", item.name)
        for f in item.glob("*.parquet") {
            print("    -", f.name)
        }
    }
}

# Lecture du dataset partitionné

print("\n⇒ Lecture dataset partitionné")
dataset = pyarrow.parquet.read_table(str(partitioned_dir))
print("  Lignes totales:", dataset.num_rows)

# Nettoyage

shutil.rmtree(str(temp_dir))
print("\n⇒ Nettoyage effectué")