#!/usr/bin/env catnip
# Lecture/écriture Parquet avec PyArrow
# Parquet est le format columnar standard pour le big data
pyarrow = import('pyarrow')
pq = import('pyarrow.parquet')
tempfile = import('tempfile')
pathlib = import('pathlib')
shutil = import('shutil')
# Struct Employee
struct Employee {
id; name; age; salary; department;
to_dict(self) => {
dict(id=self.id, name=self.name, age=self.age, salary=self.salary, department=self.department)
}
display(self) => {
f"#{self.id} {self.name} ({self.age}ans, {self.department}) - {self.salary} EUR"
}
}
# Écriture factorisée
write_parquet = (table, path, compression=None) => {
if compression == None {
pq.write_table(table, str(path))
} else {
pq.write_table(table, str(path), compression=compression)
}
}
# Création de données via structs
print("⇒ Création de données")
employees = list(
Employee(1, "Alice", 28, 75000.0, "Engineering"),
Employee(2, "Bob", 35, 85000.0, "Sales"),
Employee(3, "Charlie", 42, 95000.0, "Engineering"),
Employee(4, "Diana", 31, 72000.0, "Marketing"),
Employee(5, "Eve", 27, 68000.0, "Engineering"),
)
for e in employees {
print(f" {e.display()}")
}
# Conversion structs → dict columnar pour Arrow
data = dict(
id=employees.[(e) => { e.id }],
name=employees.[(e) => { e.name }],
age=employees.[(e) => { e.age }],
salary=employees.[(e) => { e.salary }],
department=employees.[(e) => { e.department }],
)
# Créer une Table Arrow
table = pyarrow.table(data)
print(f" Colonnes: {table.column_names}")
print(f" Lignes: {table.num_rows}")
print(" Schema:")
for field in table.schema {
print(f" {field.name} : {field.type}")
}
# Écriture Parquet
temp_dir = pathlib.Path(tempfile.mkdtemp())
parquet_file = temp_dir / "employees.parquet"
print()
print("⇒ Écriture Parquet")
write_parquet(table, parquet_file, None)
print(f" Fichier créé: {parquet_file}")
print(f" Taille: {parquet_file.stat().st_size} bytes")
# Lecture Parquet
print()
print("⇒ Lecture Parquet")
table2 = pq.read_table(str(parquet_file))
print(f" Lignes lues: {table2.num_rows}")
# Accès aux colonnes
print()
print("⇒ Accès aux colonnes")
names = table2.column("name").to_pylist()
print(f" Noms: {names}")
ages = table2.column("age").to_pylist()
print(f" Ages: {ages}")
# Lecture partielle (colonnes spécifiques)
print()
print("⇒ Lecture partielle")
partial = pq.read_table(str(parquet_file), columns=list("name", "salary"))
print(f" Colonnes lues: {partial.column_names}")
print(" Données:")
for i in range(partial.num_rows) {
n = partial.column("name")[i].as_py()
s = partial.column("salary")[i].as_py()
print(f" {n} - {s} EUR")
}
# Conversion vers dictionnaire Python
print()
print("⇒ Conversion vers dict")
py_dict = table2.to_pydict()
print(f" Clés: {py_dict.keys()}")
print(" Premier employé:")
print(f" Name: {py_dict['name'][0]}")
print(f" Age: {py_dict['age'][0]}")
print(f" Salary: {py_dict['salary'][0]}")
# Métadonnées du fichier
print()
print("⇒ Métadonnées Parquet")
parquet_meta = pq.read_metadata(str(parquet_file))
print(f" Nombre de row groups: {parquet_meta.num_row_groups}")
print(f" Nombre de colonnes: {parquet_meta.num_columns}")
print(f" Lignes totales: {parquet_meta.num_rows}")
# Statistiques par colonne
print()
print("⇒ Statistiques colonnes")
schema_meta = pq.read_schema(str(parquet_file))
print(" Schema Parquet:")
for i in range(len(schema_meta)) {
print(f" {schema_meta.field(i)}")
}
# Écriture avec compression
compressed_file = temp_dir / "employees_compressed.parquet"
print()
print("⇒ Écriture avec compression")
write_parquet(table, compressed_file, "snappy")
print(f" Fichier original: {parquet_file.stat().st_size} bytes")
print(f" Fichier compressé: {compressed_file.stat().st_size} bytes")
# Partitionnement
print()
print("⇒ Écriture partitionnée par département")
partitioned_dir = temp_dir / "partitioned"
pq.write_to_dataset(table, root_path=str(partitioned_dir), partition_cols=list("department"))
# Lister les partitions
for item in partitioned_dir.iterdir() {
if item.is_dir() {
print(f" Partition: {item.name}")
for f in item.glob("*.parquet") {
print(f" - {f.name}")
}
}
}
# Lecture du dataset partitionné
print()
print("⇒ Lecture dataset partitionné")
dataset = pq.read_table(str(partitioned_dir))
print(f" Lignes totales: {dataset.num_rows}")
# Nettoyage
shutil.rmtree(str(temp_dir))
print()
print("⇒ Nettoyage effectué")