Hook into our actual Kubernetes

This commit is contained in:
Benjamin Arntzen
2025-04-02 12:58:53 +01:00
parent ea9a8c09ef
commit 59c7859ca6
5 changed files with 560 additions and 265 deletions

349
lars/Cargo.lock generated
View File

@@ -132,10 +132,10 @@ dependencies = [
"base64 0.22.1",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
@@ -149,7 +149,7 @@ dependencies = [
"serde_path_to_error",
"serde_urlencoded",
"sha1",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tokio-tungstenite",
"tower",
@@ -167,13 +167,13 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tower-layer",
"tower-service",
"tracing",
@@ -510,6 +510,15 @@ dependencies = [
"serde",
]
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "enum-ordinalize"
version = "4.3.0"
@@ -619,6 +628,21 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@@ -788,6 +812,25 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "h2"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.15.2"
@@ -817,7 +860,7 @@ dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http",
"http 1.3.1",
"httpdate",
"mime",
"sha1",
@@ -829,7 +872,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http",
"http 1.3.1",
]
[[package]]
@@ -882,6 +925,17 @@ dependencies = [
"windows",
]
[[package]]
name = "http"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http"
version = "1.3.1"
@@ -893,6 +947,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
@@ -900,7 +965,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
"http 1.3.1",
]
[[package]]
@@ -911,8 +976,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -934,6 +999,30 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "0.14.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper"
version = "1.6.0"
@@ -943,8 +1032,8 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
"httpdate",
"itoa",
@@ -963,8 +1052,8 @@ dependencies = [
"bytes",
"futures-util",
"headers",
"http",
"hyper",
"http 1.3.1",
"hyper 1.6.0",
"hyper-rustls",
"hyper-util",
"pin-project-lite",
@@ -981,8 +1070,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
dependencies = [
"futures-util",
"http",
"hyper",
"http 1.3.1",
"hyper 1.6.0",
"hyper-util",
"log",
"rustls",
@@ -999,13 +1088,26 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.32",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.11"
@@ -1015,9 +1117,9 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http",
"http-body",
"hyper",
"http 1.3.1",
"http-body 1.0.1",
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
@@ -1225,6 +1327,12 @@ dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "itoa"
version = "1.0.15"
@@ -1334,10 +1442,10 @@ dependencies = [
"either",
"futures",
"home",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.6.0",
"hyper-http-proxy",
"hyper-rustls",
"hyper-timeout",
@@ -1367,7 +1475,7 @@ checksum = "ff0d0793db58e70ca6d689489183816cb3aa481673e7433dc618cf7e8007c675"
dependencies = [
"chrono",
"form_urlencoded",
"http",
"http 1.3.1",
"json-patch",
"k8s-openapi",
"schemars",
@@ -1434,6 +1542,7 @@ dependencies = [
"minijinja-autoreload",
"prometheus-client",
"rand",
"reqwest",
"serde",
"serde_json",
"sqlx",
@@ -1624,6 +1733,23 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "native-tls"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework 2.11.1",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "notify"
version = "5.2.0"
@@ -1713,12 +1839,50 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "openssl"
version = "0.10.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e14130c6a98cd258fdcb0fb6d744152343ff729cbfcb28c656a9d12b999fbcd"
dependencies = [
"bitflags 2.9.0",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "openssl-probe"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-sys"
version = "0.9.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "ordered-float"
version = "2.10.1"
@@ -2031,6 +2195,46 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 0.1.2",
"system-configuration",
"tokio",
"tokio-native-tls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "ring"
version = "0.17.14"
@@ -2106,7 +2310,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"schannel",
"security-framework 2.11.1",
@@ -2124,6 +2328,15 @@ dependencies = [
"security-framework 3.2.0",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64 0.21.7",
]
[[package]]
name = "rustls-pemfile"
version = "2.2.0"
@@ -2687,6 +2900,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sync_wrapper"
version = "1.0.2"
@@ -2704,6 +2923,27 @@ dependencies = [
"syn",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation 0.9.4",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "tempfile"
version = "3.19.1"
@@ -2821,6 +3061,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.2"
@@ -2877,7 +3127,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tokio-util",
"tower-layer",
@@ -2895,8 +3145,8 @@ dependencies = [
"bitflags 2.9.0",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"http-range-header",
"httpdate",
@@ -2924,8 +3174,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa6b29b17d4540f2bd9ec304ad39d280c4bdf291d0ea6c4123eeba10939af84"
dependencies = [
"bytes",
"http",
"http-body",
"http 1.3.1",
"http-body 1.0.1",
"pin-project-lite",
"tokio",
"tower",
@@ -3014,7 +3264,7 @@ dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"http 1.3.1",
"httparse",
"log",
"rand",
@@ -3203,6 +3453,19 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61"
dependencies = [
"cfg-if",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
@@ -3235,6 +3498,16 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "web-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "whoami"
version = "1.6.0"
@@ -3568,6 +3841,16 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"

View File

@@ -23,6 +23,7 @@ uuid = { version = "1.7", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "macros", "chrono", "uuid", "migrate"] }
dotenvy = "0.15"
reqwest = { version = "0.11", features = ["json"] }
# Optional, but often useful for config management
# config = { version = "0.14", features = ["yaml", "json", "toml", "env"] }

View File

@@ -24,12 +24,7 @@ use dotenvy::dotenv;
use std::env;
use axum::debug_handler;
use chrono::{DateTime, Utc};
use kube::{
api::{Api, ListParams, ResourceExt},
Client,
};
use k8s_openapi::api::core::v1::{Node, Pod};
use futures::TryStreamExt;
use reqwest;
// --- Data Structures ---
@@ -211,6 +206,26 @@ impl Default for KubernetesMetrics {
}
}
// Structure to hold Prometheus metrics response
#[derive(Deserialize, Debug, Clone)]
struct PrometheusResponse {
status: String,
data: PrometheusData,
}
#[derive(Deserialize, Debug, Clone)]
struct PrometheusData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<PrometheusResult>,
}
#[derive(Deserialize, Debug, Clone)]
struct PrometheusResult {
metric: serde_json::Value,
value: (f64, String), // Timestamp and value
}
// --- Application State ---
#[derive(Clone)]
@@ -484,140 +499,140 @@ async fn api_history_handler(State(state): State<AppState>) -> impl IntoResponse
}
}
// Function to fetch metrics from Kubernetes
async fn fetch_kubernetes_metrics(namespace: &str) -> Result<KubernetesMetrics, kube::Error> {
// Initialize Kubernetes client
let client = Client::try_default().await?;
// Fetch metrics from Prometheus
async fn fetch_prometheus_metrics(namespace: &str) -> Result<KubernetesMetrics, Box<dyn std::error::Error + Send + Sync>> {
let prometheus_url = "https://metrics.riff.cc/select/0/prometheus/api/v1/";
let http_client = reqwest::Client::new();
// Get all nodes to calculate cluster capacity
let nodes_api: Api<Node> = Api::all(client.clone());
let nodes = nodes_api.list(&ListParams::default()).await?;
// Get pods in the specified namespace
let pods_api: Api<Pod> = Api::namespaced(client, namespace);
let pods = pods_api.list(&ListParams::default()).await?;
// Initialize metrics with defaults
// Initialize metrics with default values
let mut metrics = KubernetesMetrics::default();
metrics.namespace = namespace.to_string();
// Calculate cluster metrics from nodes
for node in nodes.items {
if let Some(status) = node.status {
if let Some(allocatable) = status.allocatable {
// Extract CPU and memory
if let Some(cpu_str) = allocatable.get("cpu") {
if let Ok(cpu) = cpu_str.0.parse::<f32>() {
metrics.cluster_total_cpu += cpu;
}
}
if let Some(memory_str) = allocatable.get("memory") {
// Memory is typically in Ki, Mi, Gi format
// For simplicity, assume it's in GB, would need proper parsing
if memory_str.0.ends_with("Ki") {
if let Ok(mem) = memory_str.0.trim_end_matches("Ki").parse::<f32>() {
metrics.cluster_total_memory_gb += mem / (1024.0 * 1024.0);
}
} else if memory_str.0.ends_with("Mi") {
if let Ok(mem) = memory_str.0.trim_end_matches("Mi").parse::<f32>() {
metrics.cluster_total_memory_gb += mem / 1024.0;
}
} else if memory_str.0.ends_with("Gi") {
if let Ok(mem) = memory_str.0.trim_end_matches("Gi").parse::<f32>() {
metrics.cluster_total_memory_gb += mem;
}
}
}
}
// Fetch total cluster CPU capacity
let cluster_cpu_query = format!("{}query?query={}", prometheus_url,
"sum(kube_node_status_capacity{resource=\"cpu\"})");
let response = http_client.get(&cluster_cpu_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.cluster_total_cpu = value;
tracing::info!("Prometheus: Cluster CPU capacity: {}", value);
}
}
// Calculate namespace metrics from pods
for pod in pods.items {
if let Some(status) = pod.status {
if status.phase.as_deref() == Some("Running") {
if let Some(spec) = pod.spec {
for container in &spec.containers {
if let Some(resources) = &container.resources {
// Calculate CPU requests
if let Some(requests) = &resources.requests {
if let Some(cpu_str) = requests.get("cpu") {
// CPU can be in cores or millicores
let cpu_val = if cpu_str.0.ends_with("m") {
if let Ok(cpu) = cpu_str.0.trim_end_matches("m").parse::<f32>() {
cpu / 1000.0
} else {
0.0
}
} else {
cpu_str.0.parse::<f32>().unwrap_or(0.0)
};
metrics.namespace_cpu_requests += cpu_val;
// For simplicity, use requests as "used" since we can't get actual usage without metrics server
metrics.namespace_used_cpu += cpu_val;
}
if let Some(memory_str) = requests.get("memory") {
// Memory parsing, similar to above
if memory_str.0.ends_with("Ki") {
if let Ok(mem) = memory_str.0.trim_end_matches("Ki").parse::<f32>() {
let mem_gb = mem / (1024.0 * 1024.0);
metrics.namespace_memory_requests_gb += mem_gb;
metrics.namespace_used_memory_gb += mem_gb;
}
} else if memory_str.0.ends_with("Mi") {
if let Ok(mem) = memory_str.0.trim_end_matches("Mi").parse::<f32>() {
let mem_gb = mem / 1024.0;
metrics.namespace_memory_requests_gb += mem_gb;
metrics.namespace_used_memory_gb += mem_gb;
}
} else if memory_str.0.ends_with("Gi") {
if let Ok(mem) = memory_str.0.trim_end_matches("Gi").parse::<f32>() {
metrics.namespace_memory_requests_gb += mem;
metrics.namespace_used_memory_gb += mem;
}
}
}
}
}
}
}
}
// Fetch total cluster memory capacity (in GB)
let cluster_mem_query = format!("{}query?query={}", prometheus_url,
"sum(kube_node_status_capacity{resource=\"memory\"}) / 1024 / 1024 / 1024");
let response = http_client.get(&cluster_mem_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.cluster_total_memory_gb = value;
tracing::info!("Prometheus: Cluster memory capacity: {} GB", value);
}
}
// For cluster used metrics, we'll just simulate for demo purposes
// In a real system, you'd get this from metrics-server or prometheus
metrics.cluster_used_cpu = metrics.namespace_used_cpu + (metrics.cluster_total_cpu * 0.25); // 25% base + namespace
metrics.cluster_used_memory_gb = metrics.namespace_used_memory_gb + (metrics.cluster_total_memory_gb * 0.3); // 30% base + namespace
// Fetch cluster CPU usage
let cluster_cpu_usage_query = format!("{}query?query={}", prometheus_url,
"sum(rate(container_cpu_usage_seconds_total[5m]))");
let response = http_client.get(&cluster_cpu_usage_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.cluster_used_cpu = value;
tracing::info!("Prometheus: Cluster CPU usage: {}", value);
}
}
// Fetch cluster memory usage (in GB)
let cluster_mem_usage_query = format!("{}query?query={}", prometheus_url,
"sum(container_memory_working_set_bytes) / 1024 / 1024 / 1024");
let response = http_client.get(&cluster_mem_usage_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.cluster_used_memory_gb = value;
tracing::info!("Prometheus: Cluster memory usage: {} GB", value);
}
}
// Fetch namespace CPU usage
let namespace_cpu_query = format!("{}query?query={}", prometheus_url,
format!("sum(rate(container_cpu_usage_seconds_total{{namespace=\"{}\"}}[5m]))", namespace));
let response = http_client.get(&namespace_cpu_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.namespace_used_cpu = value;
tracing::info!("Prometheus: Namespace {} CPU usage: {}", namespace, value);
}
}
// Fetch namespace memory usage (in GB)
let namespace_mem_query = format!("{}query?query={}", prometheus_url,
format!("sum(container_memory_working_set_bytes{{namespace=\"{}\"}}) / 1024 / 1024 / 1024", namespace));
let response = http_client.get(&namespace_mem_query).send().await?;
let prom_response: PrometheusResponse = response.json().await?;
if prom_response.status == "success" && !prom_response.data.result.is_empty() {
if let Ok(value) = prom_response.data.result[0].value.1.parse::<f32>() {
metrics.namespace_used_memory_gb = value;
tracing::info!("Prometheus: Namespace {} memory usage: {} GB", namespace, value);
}
}
Ok(metrics)
}
// Function to update cluster and namespace utilization from Kubernetes metrics
async fn update_utilization_from_k8s(state: AppState) -> Result<(), kube::Error> {
// Update the existing function to use Prometheus data
async fn update_utilization_from_k8s(state: AppState) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Specify the simulation namespace
let namespace = "larstesting";
// Fetch metrics from Kubernetes
match fetch_kubernetes_metrics(namespace).await {
// Fetch metrics from Prometheus instead of Kubernetes API
match fetch_prometheus_metrics(namespace).await {
Ok(metrics) => {
// Update cluster utilization
// Add the active simulations to these base values
let active_sims = state.active_simulations.lock().await;
let sim_cpu: f32 = active_sims.values().map(|sim| sim.actual_cost.cpu_cores).sum();
let sim_memory: f32 = active_sims.values().map(|sim| sim.actual_cost.memory_gb).sum();
// Update cluster utilization - Include simulations in cluster usage
let mut cluster_util = state.cluster_utilization.lock().await;
cluster_util.total_cpu_cores = metrics.cluster_total_cpu;
cluster_util.total_memory_gb = metrics.cluster_total_memory_gb;
cluster_util.used_cpu_cores = metrics.cluster_used_cpu;
cluster_util.used_memory_gb = metrics.cluster_used_memory_gb;
cluster_util.cpu_percent = (metrics.cluster_used_cpu / metrics.cluster_total_cpu) * 100.0;
cluster_util.memory_percent = (metrics.cluster_used_memory_gb / metrics.cluster_total_memory_gb) * 100.0;
// Update namespace utilization
// Add simulation resource usage to cluster metrics
cluster_util.used_cpu_cores = metrics.cluster_used_cpu + sim_cpu;
cluster_util.used_memory_gb = metrics.cluster_used_memory_gb + sim_memory;
// Recalculate percentages with simulation usage included
cluster_util.cpu_percent = (cluster_util.used_cpu_cores / cluster_util.total_cpu_cores) * 100.0;
cluster_util.memory_percent = (cluster_util.used_memory_gb / cluster_util.total_memory_gb) * 100.0;
// Update namespace utilization with the real metrics plus our simulations
let mut namespace_util = state.namespace_utilization.lock().await;
namespace_util.allocated_cpu_cores = metrics.namespace_cpu_limits;
namespace_util.allocated_memory_gb = metrics.namespace_memory_limits_gb;
namespace_util.used_cpu_cores = metrics.namespace_used_cpu;
namespace_util.used_memory_gb = metrics.namespace_used_memory_gb;
// Base namespace values from Prometheus
let base_namespace_cpu = metrics.namespace_used_cpu;
let base_namespace_memory = metrics.namespace_used_memory_gb;
// Total namespace usage = real metrics + simulation usage
namespace_util.used_cpu_cores = base_namespace_cpu + sim_cpu;
namespace_util.used_memory_gb = base_namespace_memory + sim_memory;
namespace_util.allocated_cpu_cores = metrics.namespace_cpu_limits.max(256.0); // Use at least 256 cores
namespace_util.allocated_memory_gb = metrics.namespace_memory_limits_gb.max(1024.0); // Use at least 1024 GB
namespace_util.cpu_percent = (namespace_util.used_cpu_cores / namespace_util.allocated_cpu_cores) * 100.0;
namespace_util.memory_percent = (namespace_util.used_memory_gb / namespace_util.allocated_memory_gb) * 100.0;
namespace_util.cluster_total_cpu = metrics.cluster_total_cpu;
@@ -627,12 +642,12 @@ async fn update_utilization_from_k8s(state: AppState) -> Result<(), kube::Error>
let _ = state.event_sender.send(AppEvent::ClusterUtilizationUpdated(cluster_util.clone()));
let _ = state.event_sender.send(AppEvent::NamespaceUtilizationUpdated(namespace_util.clone()));
tracing::info!("Updated utilization metrics from Kubernetes for namespace '{}'", namespace);
tracing::info!("Updated utilization metrics from Prometheus for namespace '{}'", namespace);
Ok(())
},
Err(e) => {
tracing::error!("Failed to fetch Kubernetes metrics for namespace '{}': {}", namespace, e);
Err(e)
tracing::error!("Failed to fetch Prometheus metrics: {}", e);
Err(e.into())
}
}
}
@@ -702,80 +717,80 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// --- Mock Monitoring Task ---
let monitor_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1)); // Update frequently
let mut interval = tokio::time::interval(Duration::from_secs(5)); // Update every 5 seconds
loop {
interval.tick().await;
let mut active_sims = monitor_state.active_simulations.lock().await;
let mut updated = false;
if active_sims.len() > 0 {
// Get resource limits
let mut cluster_util = monitor_state.cluster_utilization.lock().await;
let mut namespace_util = monitor_state.namespace_utilization.lock().await;
// Update actual costs for all active simulations first
{
let mut active_sims = monitor_state.active_simulations.lock().await;
let cluster_util = monitor_state.cluster_utilization.lock().await;
// Extract limits
let total_cluster_cpu = cluster_util.total_cpu_cores;
let total_cluster_memory = cluster_util.total_memory_gb;
let allocated_namespace_cpu = namespace_util.allocated_cpu_cores;
let allocated_namespace_memory = namespace_util.allocated_memory_gb;
// Mock overall cluster utilization (independent of our namespace)
// This would come from external monitoring in a real system
// For this mock, we'll simulate cluster having base load plus our namespace
let base_cluster_cpu_usage = total_cluster_cpu * 0.25; // 25% base load from other namespaces
let base_cluster_memory_usage = total_cluster_memory * 0.30; // 30% base load from other namespaces
// Calculate total resource usage from our simulations
let mut namespace_cpu = 0.0;
let mut namespace_memory = 0.0;
// Simulate cost changes for active simulations
for sim in active_sims.values_mut() {
// Apply realistic variation with small jitter
let cpu_jitter = 0.85 + rand::random::<f32>() * 0.3; // 85% to 115% variation
sim.actual_cost.cpu_cores = sim.predicted_cost.cpu_cores * cpu_jitter;
sim.actual_cost.memory_gb = sim.predicted_cost.memory_gb * (0.9 + rand::random::<f32>() * 0.2);
let mem_jitter = 0.9 + rand::random::<f32>() * 0.2; // 90% to 110% variation
// Add to namespace totals
namespace_cpu += sim.actual_cost.cpu_cores;
namespace_memory += sim.actual_cost.memory_gb;
updated = true;
// Calculate actual cost based on predicted cost + jitter
// Removed the efficiency multiplier which was causing exponential growth
sim.actual_cost.cpu_cores = sim.predicted_cost.cpu_cores * cpu_jitter;
sim.actual_cost.memory_gb = sim.predicted_cost.memory_gb * mem_jitter;
}
// Update namespace utilization metrics
namespace_util.used_cpu_cores = namespace_cpu;
namespace_util.used_memory_gb = namespace_memory;
namespace_util.cpu_percent = (namespace_cpu / allocated_namespace_cpu) * 100.0;
namespace_util.memory_percent = (namespace_memory / allocated_namespace_memory) * 100.0;
namespace_util.cluster_total_cpu = total_cluster_cpu;
namespace_util.cluster_total_memory = total_cluster_memory;
// Update cluster utilization metrics
// (total of our namespace plus the base load from other namespaces)
cluster_util.used_cpu_cores = namespace_cpu + base_cluster_cpu_usage;
cluster_util.used_memory_gb = namespace_memory + base_cluster_memory_usage;
cluster_util.cpu_percent = (cluster_util.used_cpu_cores / total_cluster_cpu) * 100.0;
cluster_util.memory_percent = (cluster_util.used_memory_gb / total_cluster_memory) * 100.0;
// Send utilization updates
let _ = monitor_state.event_sender.send(AppEvent::ClusterUtilizationUpdated(cluster_util.clone()));
let _ = monitor_state.event_sender.send(AppEvent::NamespaceUtilizationUpdated(namespace_util.clone()));
tracing::debug!(
"Resource update - Namespace: CPU {:.1}% ({:.1}/{:.1}), Memory {:.1}% ({:.1}/{:.1}GB) | Cluster: CPU {:.1}% ({:.1}/{:.1}), Memory {:.1}% ({:.1}/{:.1}GB)",
namespace_util.cpu_percent, namespace_util.used_cpu_cores, namespace_util.allocated_cpu_cores,
namespace_util.memory_percent, namespace_util.used_memory_gb, namespace_util.allocated_memory_gb,
cluster_util.cpu_percent, cluster_util.used_cpu_cores, cluster_util.total_cpu_cores,
cluster_util.memory_percent, cluster_util.used_memory_gb, cluster_util.total_memory_gb
);
// If there are active simulations, broadcast an update
if !active_sims.is_empty() {
let active_list: Vec<ActiveSimulation> = active_sims.values().cloned().collect();
let _ = monitor_state.event_sender.send(AppEvent::ActiveUpdated(active_list));
}
}
if updated {
// Collect current active sims to send full state
let active_list: Vec<ActiveSimulation> = active_sims.values().cloned().collect();
// Send update event - ignore error if no receivers yet
let _ = monitor_state.event_sender.send(AppEvent::ActiveUpdated(active_list));
tracing::debug!("Sent ActiveUpdated event via broadcast");
// Try to get real metrics from Prometheus
if let Err(e) = update_utilization_from_k8s(monitor_state.clone()).await {
tracing::warn!("Failed to fetch Prometheus metrics: {:?}", e);
// If Prometheus fetch fails, fall back to calculating metrics from active simulations
let active_sims = monitor_state.active_simulations.lock().await;
if active_sims.len() > 0 {
// Calculate total namespace usage from active simulations
let sim_cpu: f32 = active_sims.values().map(|sim| sim.actual_cost.cpu_cores).sum();
let sim_memory: f32 = active_sims.values().map(|sim| sim.actual_cost.memory_gb).sum();
tracing::debug!("Active simulations resource usage: CPU {:.2} cores, Memory {:.2} GB", sim_cpu, sim_memory);
// Update namespace utilization with simulation data
let mut namespace_util = monitor_state.namespace_utilization.lock().await;
namespace_util.used_cpu_cores = sim_cpu;
namespace_util.used_memory_gb = sim_memory;
namespace_util.cpu_percent = (sim_cpu / namespace_util.allocated_cpu_cores) * 100.0;
namespace_util.memory_percent = (sim_memory / namespace_util.allocated_memory_gb) * 100.0;
// Get current cluster state
let mut cluster_util = monitor_state.cluster_utilization.lock().await;
// Update namespace with cluster totals
namespace_util.cluster_total_cpu = cluster_util.total_cpu_cores;
namespace_util.cluster_total_memory = cluster_util.total_memory_gb;
// Update cluster metrics using our simulations plus base load
// This is only a fallback if Prometheus fails
let base_cluster_cpu = cluster_util.total_cpu_cores * 0.25; // 25% base load
let base_cluster_memory = cluster_util.total_memory_gb * 0.3; // 30% base load
cluster_util.used_cpu_cores = sim_cpu + base_cluster_cpu;
cluster_util.used_memory_gb = sim_memory + base_cluster_memory;
cluster_util.cpu_percent = (cluster_util.used_cpu_cores / cluster_util.total_cpu_cores) * 100.0;
cluster_util.memory_percent = (cluster_util.used_memory_gb / cluster_util.total_memory_gb) * 100.0;
// Send updates
let _ = monitor_state.event_sender.send(AppEvent::NamespaceUtilizationUpdated(namespace_util.clone()));
let _ = monitor_state.event_sender.send(AppEvent::ClusterUtilizationUpdated(cluster_util.clone()));
tracing::debug!(
"Resource update (simulations only) - Namespace: CPU {:.1} cores, Memory {:.1} GB",
sim_cpu, sim_memory
);
}
}
}
});
@@ -784,7 +799,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let scheduler_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
let max_concurrent_sims = 5;
let mut mock_completion_timers: HashMap<Uuid, tokio::time::Instant> = HashMap::new();
loop {
@@ -861,7 +875,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// --------------------------------------------------------
// 2. Try to schedule new simulations
while scheduler_state.active_simulations.lock().await.len() < max_concurrent_sims && !queue.is_empty() {
while !queue.is_empty() {
// Check current CPU usage - only admit if usage is below 80%
let cluster_util = scheduler_state.cluster_utilization.lock().await;
if cluster_util.cpu_percent >= 80.0 {
tracing::info!("Current CPU usage at {}%, pausing admissions", cluster_util.cpu_percent);
break; // Stop admitting new simulations
}
// Also check if admitting would exceed 100% (prevent overcommit)
if let Some(next_sim) = queue.front() {
let predicted_cpu = next_sim.predicted_cost.cpu_cores;
let total_cpu_if_admitted = cluster_util.used_cpu_cores + predicted_cpu;
let percent_if_admitted = (total_cpu_if_admitted / cluster_util.total_cpu_cores) * 100.0;
if percent_if_admitted > 99.0 {
tracing::info!("Admitting next simulation would exceed CPU capacity ({}%), pausing", percent_if_admitted);
break;
}
}
// Release the lock before continuing
drop(cluster_util);
if let Some(queued_sim) = queue.pop_front() {
queue_updated = true;
active_updated = true; // Need to broadcast active update too
@@ -902,20 +938,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Add a task to periodically fetch Kubernetes metrics
let k8s_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
// Try to update from K8s, but fallback to mock data if it fails
if let Err(e) = update_utilization_from_k8s(k8s_state.clone()).await {
tracing::warn!("Using mock utilization data: {}", e);
// We'll continue using the mock data from the monitoring task
}
}
});
// Build our application router
let mut app = Router::new()
// Conditionally register the correct root handler

View File

@@ -221,21 +221,12 @@
}
}
@layer utilities {
.absolute {
position: absolute;
}
.relative {
position: relative;
}
.static {
position: static;
}
.inset-y-0 {
inset-block: calc(var(--spacing) * 0);
}
.right-0 {
right: calc(var(--spacing) * 0);
}
.isolate {
isolation: isolate;
}
@@ -320,9 +311,6 @@
.h-10 {
height: calc(var(--spacing) * 10);
}
.h-full {
height: 100%;
}
.min-h-screen {
min-height: 100vh;
}
@@ -502,9 +490,6 @@
.p-6 {
padding: calc(var(--spacing) * 6);
}
.px-3 {
padding-inline: calc(var(--spacing) * 3);
}
.px-4 {
padding-inline: calc(var(--spacing) * 4);
}
@@ -651,13 +636,6 @@
}
}
}
.hover\:text-white {
&:hover {
@media (hover: hover) {
color: var(--color-white);
}
}
}
.focus\:border-blue-500 {
&:focus {
border-color: var(--color-blue-500);

View File

@@ -96,29 +96,33 @@
// Auto-submit functionality
const autoSubmitButton = document.getElementById('auto-submit-button');
let autoSubmitInterval = null;
let autoSubmitEnabled = false;
let queueLength = 0;
// Function to check queue size and add more simulations if needed
function checkQueueAndSubmit() {
if (autoSubmitEnabled && queueLength < 3) {
console.log("Queue low, automatically submitting more simulations");
submitMockSimulations();
}
}
autoSubmitButton.addEventListener('click', () => {
if (autoSubmitInterval) {
if (autoSubmitEnabled) {
// Stop auto-submission
clearInterval(autoSubmitInterval);
autoSubmitInterval = null;
autoSubmitEnabled = false;
autoSubmitButton.textContent = 'Auto Submit';
autoSubmitButton.classList.remove('bg-red-600', 'hover:bg-red-700');
autoSubmitButton.classList.add('bg-green-600', 'hover:bg-green-700');
} else {
// Start auto-submission every 15 seconds
// Start auto-submission
autoSubmitEnabled = true;
autoSubmitButton.textContent = 'Stop Auto';
autoSubmitButton.classList.remove('bg-green-600', 'hover:bg-green-700');
autoSubmitButton.classList.add('bg-red-600', 'hover:bg-red-700');
// Submit immediately
submitMockSimulations();
// Then set up interval
autoSubmitInterval = setInterval(() => {
submitMockSimulations();
}, 15000); // 15 seconds
}
});
@@ -277,11 +281,14 @@
const activeClass = resourceType === 'cpu' ? 'bg-blue-500' : 'bg-purple-500';
const inactiveClass = 'bg-gray-300 dark:bg-gray-600';
console.log(`Updating ${resourceType} blocks with value: ${usageValue}`);
// Clear existing blocks
container.innerHTML = '';
if (usageValue <= 0) {
// No usage, no blocks
console.log(`No ${resourceType} usage to display`);
return;
}
@@ -384,6 +391,10 @@
// Update the queued title with count
queuedTitle.textContent = `Queued Simulations (${queueItems.length})`;
// Update queue length and check if we need to add more
queueLength = queueItems.length;
checkQueueAndSubmit();
} else if (data.type === "ActiveUpdated") {
// Update active simulations display
const activeItems = data.data;