←
codex/visualization/hvplot_dashboard.cat
#!/usr/bin/env catnip
# Dashboard monitoring capteurs bâtiment - hvPlot
#
# Simulation 7 jours de 3 capteurs (température, humidité, CO2),
# détection d'anomalies par écart au signal attendu, export HTML.
#
# DEPS : numpy, pandas, hvplot, holoviews, math, random
pd = import('pandas')
import('hvplot.pandas')
hv = import('holoviews')
math = import('math')
random = import('random')
random.seed(42)
# Configuration capteurs
struct SensorConfig {
name; unit; base; amplitude; noise_std; anomaly_threshold;
}
sensors = list(
SensorConfig(
name='temperature', unit="°C",
base=21.0, amplitude=4.0, noise_std=0.5,
anomaly_threshold=3.0,
),
SensorConfig(
name='humidity', unit="%",
base=45.0, amplitude=10.0, noise_std=2.0,
anomaly_threshold=8.0,
),
SensorConfig(
name='co2', unit="ppm",
base=600.0, amplitude=200.0, noise_std=30.0,
anomaly_threshold=120.0,
),
)
# Structures de données
struct Reading {
timestamp; sensor; value; expected; deviation;
}
struct Anomaly {
timestamp; sensor; value; deviation; severity;
}
# Génération de séries temporelles
# Sinusoïde journalière + bruit gaussien + spikes aléatoires (3%).
total_hours = 168
hours = list()
i = 0
while i < total_hours {
hours = hours + list(i)
i = i + 1
}
generate_series = (cfg) => {
hours.[(h) => {
day_phase=(h % 24) / 24.0 * 2.0 * math.pi
expected=cfg.base + cfg.amplitude * math.sin(day_phase - math.pi / 2.0)
noise=random.gauss(0, cfg.noise_std)
spike=if random.random() < 0.03 {
cfg.anomaly_threshold * 1.5 * (if random.random() < 0.5 { 1 } else { -1 })
} else { 0.0 }
value=expected + noise + spike
Reading(
timestamp=h,
sensor=cfg.name,
value=value,
expected=expected,
deviation=value - expected,
)
}]
}
print("⇒ Dashboard monitoring capteurs bâtiment (7 jours)")
# Génération des lectures par capteur
all_readings = list()
for cfg in sensors {
series = generate_series(cfg)
all_readings = all_readings + series
}
print(f" {total_hours * 3} lectures générées ({total_hours}h x 3 capteurs)")
# Détection d'anomalies
classify_severity = (dev, threshold) => {
abs_dev = if dev < 0 { 0 - dev } else { dev }
ratio = abs_dev / threshold
match True {
_ if ratio >= 2.0 => { 'critical' }
_ if ratio >= 1.5 => { 'warning' }
_ if ratio >= 1.0 => { 'minor' }
_ => { 'normal' }
}
}
anomalies = list()
for r in all_readings {
cfg_threshold = match r.sensor {
'temperature' => { 3.0 }
'humidity' => { 8.0 }
'co2' => { 120.0 }
_ => { 10.0 }
}
sev = classify_severity(r.deviation, cfg_threshold)
if sev != 'normal' {
anomalies = anomalies + list(Anomaly(
timestamp=r.timestamp,
sensor=r.sensor,
value=r.value,
deviation=r.deviation,
severity=sev,
))
}
}
print(f" {len(anomalies)} anomalies détectées")
# Table des anomalies
print()
print("⇒ Anomalies détectées (extrait)")
print()
print(" Jour Heure Capteur Déviation Sévérité")
print(" ---- ----- ------- --------- --------")
shown = 0
for a in anomalies {
if shown < 15 {
day = math.floor(a.timestamp / 24) + 1
hour = a.timestamp % 24
hh = if hour < 10 { f"0{hour}" } else { f"{hour}" }
sign = if a.deviation >= 0 { "+" } else { "-" }
abs_dev = if a.deviation < 0 { 0 - a.deviation } else { a.deviation }
print(f" J{day} {hh}:00 {a.sensor:13s} {sign}{abs_dev:6.1f} {a.severity}")
shown = shown + 1
}
}
if len(anomalies) > 15 {
print(f" ... et {len(anomalies) - 15} autres")
}
# Construction DataFrames pour hvPlot
# Extraction de colonnes par broadcasting sur les structs.
temp_readings = list()
hum_readings = list()
co2_readings = list()
for r in all_readings {
match r.sensor {
'temperature' => { temp_readings = temp_readings + list(r) }
'humidity' => { hum_readings = hum_readings + list(r) }
'co2' => { co2_readings = co2_readings + list(r) }
_ => {}
}
}
make_df = (readings) => {
pd.DataFrame(dict(
hour=readings.[(r) => { r.timestamp }],
value=readings.[(r) => { r.value }],
expected=readings.[(r) => { r.expected }],
))
}
df_temp = make_df(temp_readings)
df_hum = make_df(hum_readings)
df_co2 = make_df(co2_readings)
# DataFrame anomalies
df_anomalies = pd.DataFrame(dict(
hour=anomalies.[(a) => { a.timestamp }],
sensor=anomalies.[(a) => { a.sensor }],
value=anomalies.[(a) => { a.value }],
deviation=anomalies.[(a) => { a.deviation }],
severity=anomalies.[(a) => { a.severity }],
))
# panneaux hvPlot
print()
print("⇒ Génération des graphiques")
p_temp = df_temp.hvplot.line(
x="hour", y="value", label="mesuré",
width=800, height=250, ylim=tuple(10, 30),
title="Température (°C)",
xlabel="Heure", ylabel="°C",
) * df_temp.hvplot.line(
x="hour", y="expected", label="attendu",
line_dash="dashed", color="gray",
)
p_temp = p_temp.opts(axiswise=True, shared_axes=False)
p_hum = df_hum.hvplot.line(
x="hour", y="value", label="mesuré",
width=800, height=250, ylim=tuple(20, 70),
title="Humidité (%)",
xlabel="Heure", ylabel="%",
) * df_hum.hvplot.line(
x="hour", y="expected", label="attendu",
line_dash="dashed", color="gray",
)
p_hum = p_hum.opts(axiswise=True, shared_axes=False)
p_co2 = df_co2.hvplot.line(
x="hour", y="value", label="mesuré",
width=800, height=250,
title="CO2 (ppm)",
xlabel="Heure", ylabel="ppm",
) * df_co2.hvplot.line(
x="hour", y="expected", label="attendu",
line_dash="dashed", color="gray",
)
p_co2 = p_co2.opts(axiswise=True, shared_axes=False)
# Scatter anomalies coloré par sévérité
p_anomalies = df_anomalies.hvplot.scatter(
x="hour", y="deviation",
by="severity",
width=800, height=250,
title="Anomalies par sévérité",
xlabel="Heure", ylabel="Déviation",
s=40,
)
p_anomalies = p_anomalies.opts(axiswise=True, shared_axes=False)
layout = (p_temp + p_hum + p_co2 + p_anomalies).cols(1)
layout = layout.opts(axiswise=True, shared_axes=False)
# Agrégation journalière
print()
print("⇒ Résumé journalier")
print()
print(" Jour Temp moy Hum moy CO2 moy Anomalies")
print(" -------- -------- -------- ------- ---------")
day_idx = list(1, 2, 3, 4, 5, 6, 7)
for d in day_idx {
start_h = (d - 1) * 24
end_h = d * 24
t_sum = 0.0
h_sum = 0.0
c_sum = 0.0
t_count = 0
h_count = 0
c_count = 0
a_count = 0
for r in all_readings {
if r.timestamp >= start_h {
if r.timestamp < end_h {
match r.sensor {
'temperature' => { t_sum = t_sum + r.value; t_count = t_count + 1 }
'humidity' => { h_sum = h_sum + r.value; h_count = h_count + 1 }
'co2' => { c_sum = c_sum + r.value; c_count = c_count + 1 }
_ => {}
}
}
}
}
for a in anomalies {
if a.timestamp >= start_h {
if a.timestamp < end_h {
a_count = a_count + 1
}
}
}
t_avg = if t_count > 0 { t_sum / t_count } else { 0.0 }
h_avg = if h_count > 0 { h_sum / h_count } else { 0.0 }
c_avg = if c_count > 0 { c_sum / c_count } else { 0.0 }
day_label = match d {
1 => { "Lun" }
2 => { "Mar" }
3 => { "Mer" }
4 => { "Jeu" }
5 => { "Ven" }
6 => { "Sam" }
7 => { "Dim" }
_ => { "???" }
}
print(f" J{d} {day_label} {t_avg:5.1f}°C {h_avg:5.1f}% {c_avg:6.0f}ppm {a_count}")
}
# Export HTML + ouverture navigateur
# One-shot HTTP via SimpleHTTPRequestHandler(directory=...).
# handle_request() bloque jusqu'a ce que Chrome ait recu le contenu.
http_server = import('http.server')
functools = import('functools')
webbrowser = import('webbrowser')
tempfile_mod = import('tempfile')
shutil = import('shutil')
temp_dir = tempfile_mod.mkdtemp(prefix="catnip_dashboard_")
hv.save(layout, temp_dir + "/index.html")
handler = functools.partial(http_server.SimpleHTTPRequestHandler, directory=temp_dir)
server = http_server.HTTPServer(tuple("127.0.0.1", 0), handler)
port = server.server_address[1]
webbrowser.open(f"http://127.0.0.1:{port}")
print(f"\n⇒ Dashboard : http://127.0.0.1:{port}")
server.handle_request()
server.server_close()
shutil.rmtree(temp_dir)