codex/files-formats/parquet_pyarrow.cat
# Lecture/écriture Parquet avec PyArrow
# Parquet est le format columnar standard pour le big data
#
# Exécuter: catnip -m pyarrow -m tempfile -m pathlib -m shutil parquet_pyarrow.cat
pyarrow = import("pyarrow")
tempfile = import("tempfile")
pathlib = import("pathlib")
shutil = import("shutil")
# Création de données
print("⇒ Création de données")
data = dict(
id=list(1, 2, 3, 4, 5),
name=list("Alice", "Bob", "Charlie", "Diana", "Eve"),
age=list(28, 35, 42, 31, 27),
salary=list(75000.0, 85000.0, 95000.0, 72000.0, 68000.0),
department=list("Engineering", "Sales", "Engineering", "Marketing", "Engineering")
)
# Créer une Table Arrow
table = pyarrow.table(data)
print(" Colonnes:", table.column_names)
print(" Lignes:", table.num_rows)
print(" Schema:")
for field in table.schema {
print(" ", field.name, ":", field.type)
}
# Écriture Parquet
temp_dir = pathlib.Path(tempfile.mkdtemp())
parquet_file = temp_dir / "employees.parquet"
print("\n⇒ Écriture Parquet")
pyarrow.parquet.write_table(table, str(parquet_file))
print(" Fichier créé:", parquet_file)
print(" Taille:", parquet_file.stat().st_size, "bytes")
# Lecture Parquet
print("\n⇒ Lecture Parquet")
table2 = pyarrow.parquet.read_table(str(parquet_file))
print(" Lignes lues:", table2.num_rows)
# Accès aux colonnes
print("\n⇒ Accès aux colonnes")
names = table2.column("name").to_pylist()
print(" Noms:", names)
ages = table2.column("age").to_pylist()
print(" Ages:", ages)
# Lecture partielle (colonnes spécifiques)
print("\n⇒ Lecture partielle")
partial = pyarrow.parquet.read_table(str(parquet_file), columns=list("name", "salary"))
print(" Colonnes lues:", partial.column_names)
print(" Données:")
i = 0
while i < partial.num_rows {
print(" ", partial.column("name")[i].as_py(), "-", partial.column("salary")[i].as_py(), "€")
i = i + 1
}
# Conversion vers dictionnaire Python
print("\n⇒ Conversion vers dict")
py_dict = table2.to_pydict()
print(" Clés:", py_dict.keys())
print(" Premier employé:")
print(" Name:", py_dict["name"][0])
print(" Age:", py_dict["age"][0])
print(" Salary:", py_dict["salary"][0])
# Métadonnées du fichier
print("\n⇒ Métadonnées Parquet")
parquet_meta = pyarrow.parquet.read_metadata(str(parquet_file))
print(" Nombre de row groups:", parquet_meta.num_row_groups)
print(" Nombre de colonnes:", parquet_meta.num_columns)
print(" Lignes totales:", parquet_meta.num_rows)
# Statistiques par colonne
print("\n⇒ Statistiques colonnes")
schema_meta = pyarrow.parquet.read_schema(str(parquet_file))
print(" Schema Parquet:")
i = 0
while i < len(schema_meta) {
print(" ", schema_meta.field(i))
i = i + 1
}
# Écriture avec compression
compressed_file = temp_dir / "employees_compressed.parquet"
print("\n⇒ Écriture avec compression")
pyarrow.parquet.write_table(table, str(compressed_file), compression="snappy")
print(" Fichier original:", parquet_file.stat().st_size, "bytes")
print(" Fichier compressé:", compressed_file.stat().st_size, "bytes")
# Partitionnement
print("\n⇒ Écriture partitionnée par département")
partitioned_dir = temp_dir / "partitioned"
pyarrow.parquet.write_to_dataset(table, root_path=str(partitioned_dir), partition_cols=list("department"))
# Lister les partitions
for item in partitioned_dir.iterdir() {
if item.is_dir() {
print(" Partition:", item.name)
for f in item.glob("*.parquet") {
print(" -", f.name)
}
}
}
# Lecture du dataset partitionné
print("\n⇒ Lecture dataset partitionné")
dataset = pyarrow.parquet.read_table(str(partitioned_dir))
print(" Lignes totales:", dataset.num_rows)
# Nettoyage
shutil.rmtree(str(temp_dir))
print("\n⇒ Nettoyage effectué")