codex/visualization/hvplot_dashboard.cat
#!/usr/bin/env catnip
# Dashboard monitoring capteurs bâtiment - hvPlot
#
# Simulation 7 jours de 3 capteurs (température, humidité, CO2),
# détection d'anomalies par écart au signal attendu, export HTML.
#
# DEPS : numpy, pandas, hvplot, holoviews, math, random

pd = import('pandas')
import('hvplot.pandas')
hv = import('holoviews')
math = import('math')
random = import('random')

random.seed(42)

# Configuration capteurs

struct SensorConfig {
    name; unit; base; amplitude; noise_std; anomaly_threshold;
}

sensors = list(
    SensorConfig(
        name='temperature', unit="°C",
        base=21.0, amplitude=4.0, noise_std=0.5,
        anomaly_threshold=3.0,
    ),
    SensorConfig(
        name='humidity', unit="%",
        base=45.0, amplitude=10.0, noise_std=2.0,
        anomaly_threshold=8.0,
    ),
    SensorConfig(
        name='co2', unit="ppm",
        base=600.0, amplitude=200.0, noise_std=30.0,
        anomaly_threshold=120.0,
    ),
)

# Structures de données

struct Reading {
    timestamp; sensor; value; expected; deviation;
}

struct Anomaly {
    timestamp; sensor; value; deviation; severity;
}

# Génération de séries temporelles
# Sinusoïde journalière + bruit gaussien + spikes aléatoires (3%).

total_hours = 168
hours = list()
i = 0
while i < total_hours {
    hours = hours + list(i)
    i = i + 1
}

generate_series = (cfg) => {
    hours.[(h) => {
            day_phase=(h % 24) / 24.0 * 2.0 * math.pi
            expected=cfg.base + cfg.amplitude * math.sin(day_phase - math.pi / 2.0)
            noise=random.gauss(0, cfg.noise_std)
            spike=if random.random() < 0.03 {
                cfg.anomaly_threshold * 1.5 * (if random.random() < 0.5 { 1 } else { -1 })
            } else { 0.0 }
            value=expected + noise + spike
            Reading(
                timestamp=h,
                sensor=cfg.name,
                value=value,
                expected=expected,
                deviation=value - expected,
            )
        }]
}

print("⇒ Dashboard monitoring capteurs bâtiment (7 jours)")

# Génération des lectures par capteur

all_readings = list()
for cfg in sensors {
    series = generate_series(cfg)
    all_readings = all_readings + series
}

print(f"  {total_hours * 3} lectures générées ({total_hours}h x 3 capteurs)")

# Détection d'anomalies

classify_severity = (dev, threshold) => {
    abs_dev = if dev < 0 { 0 - dev } else { dev }
    ratio = abs_dev / threshold
    match True {
        _ if ratio >= 2.0 => { 'critical' }
        _ if ratio >= 1.5 => { 'warning' }
        _ if ratio >= 1.0 => { 'minor' }
        _                 => { 'normal' }
    }
}

anomalies = list()
for r in all_readings {
    cfg_threshold = match r.sensor {
        'temperature' => { 3.0 }
        'humidity'    => { 8.0 }
        'co2'         => { 120.0 }
        _             => { 10.0 }
    }
    sev = classify_severity(r.deviation, cfg_threshold)
    if sev != 'normal' {
        anomalies = anomalies + list(Anomaly(
                timestamp=r.timestamp,
                sensor=r.sensor,
                value=r.value,
                deviation=r.deviation,
                severity=sev,
            ))
    }
}

print(f"  {len(anomalies)} anomalies détectées")

# Table des anomalies

print()
print("⇒ Anomalies détectées (extrait)")
print()
print("  Jour  Heure  Capteur       Déviation  Sévérité")
print("  ----  -----  -------       ---------  --------")

shown = 0
for a in anomalies {
    if shown < 15 {
        day = math.floor(a.timestamp / 24) + 1
        hour = a.timestamp % 24
        hh = if hour < 10 { f"0{hour}" } else { f"{hour}" }
        sign = if a.deviation >= 0 { "+" } else { "-" }
        abs_dev = if a.deviation < 0 { 0 - a.deviation } else { a.deviation }
        print(f"  J{day}    {hh}:00  {a.sensor:13s} {sign}{abs_dev:6.1f}  {a.severity}")
        shown = shown + 1
    }
}

if len(anomalies) > 15 {
    print(f"  ... et {len(anomalies) - 15} autres")
}

# Construction DataFrames pour hvPlot
# Extraction de colonnes par broadcasting sur les structs.

temp_readings = list()
hum_readings = list()
co2_readings = list()

for r in all_readings {
    match r.sensor {
        'temperature' => { temp_readings = temp_readings + list(r) }
        'humidity'    => { hum_readings = hum_readings + list(r) }
        'co2'         => { co2_readings = co2_readings + list(r) }
        _             => {}
    }
}

make_df = (readings) => {
    pd.DataFrame(dict(
            hour=readings.[(r) => { r.timestamp }],
            value=readings.[(r) => { r.value }],
            expected=readings.[(r) => { r.expected }],
        ))
}

df_temp = make_df(temp_readings)
df_hum = make_df(hum_readings)
df_co2 = make_df(co2_readings)

# DataFrame anomalies

df_anomalies = pd.DataFrame(dict(
        hour=anomalies.[(a) => { a.timestamp }],
        sensor=anomalies.[(a) => { a.sensor }],
        value=anomalies.[(a) => { a.value }],
        deviation=anomalies.[(a) => { a.deviation }],
        severity=anomalies.[(a) => { a.severity }],
    ))

# panneaux hvPlot

print()
print("⇒ Génération des graphiques")

p_temp = df_temp.hvplot.line(
    x="hour", y="value", label="mesuré",
    width=800, height=250, ylim=tuple(10, 30),
    title="Température (°C)",
    xlabel="Heure", ylabel="°C",
) * df_temp.hvplot.line(
    x="hour", y="expected", label="attendu",
    line_dash="dashed", color="gray",
)
p_temp = p_temp.opts(axiswise=True, shared_axes=False)

p_hum = df_hum.hvplot.line(
    x="hour", y="value", label="mesuré",
    width=800, height=250, ylim=tuple(20, 70),
    title="Humidité (%)",
    xlabel="Heure", ylabel="%",
) * df_hum.hvplot.line(
    x="hour", y="expected", label="attendu",
    line_dash="dashed", color="gray",
)
p_hum = p_hum.opts(axiswise=True, shared_axes=False)

p_co2 = df_co2.hvplot.line(
    x="hour", y="value", label="mesuré",
    width=800, height=250,
    title="CO2 (ppm)",
    xlabel="Heure", ylabel="ppm",
) * df_co2.hvplot.line(
    x="hour", y="expected", label="attendu",
    line_dash="dashed", color="gray",
)
p_co2 = p_co2.opts(axiswise=True, shared_axes=False)

# Scatter anomalies coloré par sévérité
p_anomalies = df_anomalies.hvplot.scatter(
    x="hour", y="deviation",
    by="severity",
    width=800, height=250,
    title="Anomalies par sévérité",
    xlabel="Heure", ylabel="Déviation",
    s=40,
)
p_anomalies = p_anomalies.opts(axiswise=True, shared_axes=False)

layout = (p_temp + p_hum + p_co2 + p_anomalies).cols(1)
layout = layout.opts(axiswise=True, shared_axes=False)

# Agrégation journalière

print()
print("⇒ Résumé journalier")
print()
print("  Jour      Temp moy  Hum moy   CO2 moy  Anomalies")
print("  --------  --------  --------  -------  ---------")

day_idx = list(1, 2, 3, 4, 5, 6, 7)

for d in day_idx {
    start_h = (d - 1) * 24
    end_h = d * 24

    t_sum   = 0.0
    h_sum   = 0.0
    c_sum   = 0.0
    t_count = 0
    h_count = 0
    c_count = 0
    a_count = 0

    for r in all_readings {
        if r.timestamp >= start_h {
            if r.timestamp < end_h {
                match r.sensor {
                    'temperature' => { t_sum = t_sum + r.value; t_count = t_count + 1 }
                    'humidity'    => { h_sum = h_sum + r.value; h_count = h_count + 1 }
                    'co2'         => { c_sum = c_sum + r.value; c_count = c_count + 1 }
                    _             => {}
                }
            }
        }
    }

    for a in anomalies {
        if a.timestamp >= start_h {
            if a.timestamp < end_h {
                a_count = a_count + 1
            }
        }
    }

    t_avg = if t_count > 0 { t_sum / t_count } else { 0.0 }
    h_avg = if h_count > 0 { h_sum / h_count } else { 0.0 }
    c_avg = if c_count > 0 { c_sum / c_count } else { 0.0 }

    day_label = match d {
        1 => { "Lun" }
        2 => { "Mar" }
        3 => { "Mer" }
        4 => { "Jeu" }
        5 => { "Ven" }
        6 => { "Sam" }
        7 => { "Dim" }
        _ => { "???" }
    }

    print(f"  J{d} {day_label}   {t_avg:5.1f}°C   {h_avg:5.1f}%  {c_avg:6.0f}ppm   {a_count}")
}

# Export HTML + ouverture navigateur
# One-shot HTTP via SimpleHTTPRequestHandler(directory=...).
# handle_request() bloque jusqu'a ce que Chrome ait recu le contenu.

http_server  = import('http.server')
functools    = import('functools')
webbrowser   = import('webbrowser')
tempfile_mod = import('tempfile')
shutil       = import('shutil')

temp_dir = tempfile_mod.mkdtemp(prefix="catnip_dashboard_")
hv.save(layout, temp_dir + "/index.html")

handler = functools.partial(http_server.SimpleHTTPRequestHandler, directory=temp_dir)
server = http_server.HTTPServer(tuple("127.0.0.1", 0), handler)
port = server.server_address[1]
webbrowser.open(f"http://127.0.0.1:{port}")
print(f"\n⇒ Dashboard : http://127.0.0.1:{port}")
server.handle_request()
server.server_close()
shutil.rmtree(temp_dir)