mirror of
https://github.com/vacp2p/dst-prefect-workflows.git
synced 2026-01-09 13:28:12 -05:00
Fixes for various issues
This commit is contained in:
114
lars/src/main.rs
114
lars/src/main.rs
@@ -1522,46 +1522,81 @@ async fn fetch_simulation_usage(namespace: &str, release_name: &str) -> Result<S
|
||||
let prometheus_url = "https://metrics.riff.cc/select/0/prometheus/api/v1/";
|
||||
let http_client = reqwest::Client::new();
|
||||
|
||||
// Escape special characters in release name for the Prometheus query
|
||||
let release_name_escaped = release_name.replace("-", "\\-");
|
||||
// Log the original release name for debugging
|
||||
tracing::debug!("Fetching resource usage for: namespace={}, release_name={}", namespace, release_name);
|
||||
|
||||
// More robust release name escaping for Prometheus regex
|
||||
// Escape special regex characters: ., +, *, ?, ^, $, (, ), [, ], {, }, |, \
|
||||
let release_pattern = release_name
|
||||
.replace(".", "\\\\.")
|
||||
.replace("+", "\\\\+")
|
||||
.replace("*", "\\\\*")
|
||||
.replace("?", "\\\\?")
|
||||
.replace("^", "\\\\^")
|
||||
.replace("$", "\\\\$")
|
||||
.replace("(", "\\\\(")
|
||||
.replace(")", "\\\\)")
|
||||
.replace("[", "\\\\[")
|
||||
.replace("]", "\\\\]")
|
||||
.replace("{", "\\\\{")
|
||||
.replace("}", "\\\\}")
|
||||
.replace("|", "\\\\|")
|
||||
.replace("\\", "\\\\\\\\");
|
||||
|
||||
// Use a more precise regex pattern to match only the pods for this release
|
||||
// This pattern matches pods that start with the release name followed by a dash
|
||||
// and then anything, e.g., "release-name-*"
|
||||
let pod_pattern = format!("^{}(-|$)", release_pattern);
|
||||
|
||||
// CPU query: Get the sum of CPU usage for all pods with this release name in the namespace
|
||||
let cpu_query = format!(
|
||||
"sum(rate(container_cpu_usage_seconds_total{{namespace=\"{}\", pod=~\"{}.*\"}}[1m]))",
|
||||
namespace, release_name_escaped
|
||||
namespace, pod_pattern
|
||||
);
|
||||
|
||||
// Log the constructed Prometheus query for debugging
|
||||
tracing::debug!("Prometheus CPU query: {}", cpu_query);
|
||||
|
||||
let cpu_query_url = format!("{}query?query={}", prometheus_url, urlencoding::encode(&cpu_query));
|
||||
|
||||
let response = http_client.get(&cpu_query_url).send().await?;
|
||||
if !response.status().is_success() {
|
||||
return Err(format!("Prometheus request failed for CPU: {}", response.status()).into());
|
||||
let status = response.status();
|
||||
let error_text = response.text().await.unwrap_or_else(|_| "Could not extract error text".to_string());
|
||||
return Err(format!("Prometheus request failed for CPU: {} - {}", status, error_text).into());
|
||||
}
|
||||
|
||||
let prom_response: PrometheusResponse = response.json().await?;
|
||||
let cpu_cores = if prom_response.status == "success" && !prom_response.data.result.is_empty() {
|
||||
prom_response.data.result[0].value.1.parse::<f32>().unwrap_or(0.0)
|
||||
} else {
|
||||
tracing::warn!("No CPU data found for namespace={}, release_name={}", namespace, release_name);
|
||||
0.0
|
||||
};
|
||||
|
||||
// Memory query: Get the sum of memory usage for all pods with this release name in the namespace
|
||||
let memory_query = format!(
|
||||
"sum(container_memory_working_set_bytes{{namespace=\"{}\", pod=~\"{}.*\"}}) / (1024*1024*1024)",
|
||||
namespace, release_name_escaped
|
||||
namespace, pod_pattern
|
||||
);
|
||||
|
||||
// Log the constructed Prometheus query for debugging
|
||||
tracing::debug!("Prometheus Memory query: {}", memory_query);
|
||||
|
||||
let memory_query_url = format!("{}query?query={}", prometheus_url, urlencoding::encode(&memory_query));
|
||||
|
||||
let response = http_client.get(&memory_query_url).send().await?;
|
||||
if !response.status().is_success() {
|
||||
return Err(format!("Prometheus request failed for Memory: {}", response.status()).into());
|
||||
let status = response.status();
|
||||
let error_text = response.text().await.unwrap_or_else(|_| "Could not extract error text".to_string());
|
||||
return Err(format!("Prometheus request failed for Memory: {} - {}", status, error_text).into());
|
||||
}
|
||||
|
||||
let prom_response: PrometheusResponse = response.json().await?;
|
||||
let memory_gb = if prom_response.status == "success" && !prom_response.data.result.is_empty() {
|
||||
prom_response.data.result[0].value.1.parse::<f32>().unwrap_or(0.0)
|
||||
} else {
|
||||
tracing::warn!("No Memory data found for namespace={}, release_name={}", namespace, release_name);
|
||||
0.0
|
||||
};
|
||||
|
||||
@@ -1688,7 +1723,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
|
||||
// Get active simulations and measure their actual resource usage
|
||||
let mut active_sims_updated = false;
|
||||
{
|
||||
let active_guard = match state.active_simulations.try_lock() {
|
||||
Ok(guard) => guard,
|
||||
@@ -1715,23 +1749,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.collect();
|
||||
drop(active_guard);
|
||||
|
||||
// Whether any updates occurred during this monitoring cycle
|
||||
let mut updated_any = false;
|
||||
// Keep track of whether any simulations were updated
|
||||
let mut any_updated = false;
|
||||
|
||||
// Measure each simulation's resource usage (outside the lock)
|
||||
for (sim_id, _chart, release_name) in sim_data {
|
||||
// Skip mock simulations or those with invalid/placeholder release names
|
||||
if release_name.starts_with("mock-") || release_name.contains("mock") {
|
||||
tracing::debug!("Skipping resource measurement for mock simulation {}", sim_id);
|
||||
tracing::debug!("Skipping resource measurement for mock simulation {}: release_name='{}'", sim_id, release_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if release name doesn't look like a valid Kubernetes resource name
|
||||
if !is_valid_release_name(&release_name) {
|
||||
tracing::debug!("Skipping resource measurement for simulation {} with invalid release name: {}", sim_id, release_name);
|
||||
tracing::debug!("Skipping resource measurement for simulation {} with invalid release name: '{}'", sim_id, release_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Additional check: release name should be for a real deployment
|
||||
// Most real Kubernetes deployments will have nodes-X, statefulset-X, or deployment-X patterns
|
||||
if !release_name.contains("-nodes") &&
|
||||
!release_name.contains("-deployment") &&
|
||||
!release_name.contains("-statefulset") &&
|
||||
!release_name.contains("-pod") {
|
||||
// This is probably not a real deployment but we'll try anyway with a warning
|
||||
tracing::warn!("Release name '{}' doesn't follow expected naming pattern for deployments, measurement may fail", release_name);
|
||||
}
|
||||
|
||||
// Use the stored release_name from the report_start request
|
||||
match fetch_simulation_usage("larstesting", &release_name).await {
|
||||
Ok(usage) => {
|
||||
@@ -1756,7 +1800,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
sim.params.duration_secs
|
||||
));
|
||||
|
||||
updated_any = true;
|
||||
any_updated = true;
|
||||
tracing::info!(
|
||||
%sim_id,
|
||||
"Updated actual resource usage. CPU: {:.2} cores, Memory: {:.2} GB",
|
||||
@@ -1771,16 +1815,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
}
|
||||
|
||||
// Set the flag for broadcasting if we updated anything
|
||||
active_sims_updated = updated_any;
|
||||
}
|
||||
|
||||
// Broadcast an update if we modified any simulations
|
||||
if active_sims_updated {
|
||||
if let Ok(active_guard) = state.active_simulations.try_lock() {
|
||||
let active_sims = active_guard.values().cloned().collect::<Vec<ActiveSimulation>>();
|
||||
drop(active_guard);
|
||||
let _ = state.event_sender.send(AppEvent::ActiveUpdated(active_sims));
|
||||
// Broadcast an update if we modified any simulations
|
||||
if any_updated {
|
||||
if let Ok(active_guard) = state.active_simulations.try_lock() {
|
||||
let active_sims = active_guard.values().cloned().collect::<Vec<ActiveSimulation>>();
|
||||
drop(active_guard);
|
||||
let _ = state.event_sender.send(AppEvent::ActiveUpdated(active_sims));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1857,6 +1898,20 @@ fn is_valid_release_name(name: &str) -> bool {
|
||||
if name.len() < 3 {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Print the chart name/release name for debugging
|
||||
tracing::debug!("Checking release name: {}", name);
|
||||
// For waku-specific charts, make sure they follow the expected format
|
||||
if name.contains("waku") {
|
||||
// Match the expected format pattern like "waku-100-node"
|
||||
let is_expected_format = name.starts_with("waku-") &&
|
||||
name.split('-').count() >= 2 &&
|
||||
name.split('-').nth(1).map_or(false, |s| s.parse::<u32>().is_ok());
|
||||
|
||||
if !is_expected_format {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Must start with a letter or number (common for real releases)
|
||||
if !name.chars().next().unwrap().is_alphanumeric() {
|
||||
@@ -1868,5 +1923,18 @@ fn is_valid_release_name(name: &str) -> bool {
|
||||
c.is_alphanumeric() || c == '-' || c == '.'
|
||||
});
|
||||
|
||||
valid_chars
|
||||
// Make sure we don't have high-risk characters that could cause Prometheus errors
|
||||
let has_risky_chars = name.contains("\\") ||
|
||||
name.contains("[") ||
|
||||
name.contains("]") ||
|
||||
name.contains("^") ||
|
||||
name.contains("$") ||
|
||||
name.contains("+") ||
|
||||
name.contains("+") ||
|
||||
name.contains("*") ||
|
||||
name.contains("?") ||
|
||||
name.contains("{") ||
|
||||
name.contains("}");
|
||||
|
||||
valid_chars && !has_risky_chars
|
||||
}
|
||||
@@ -16,7 +16,7 @@ load_dotenv()
|
||||
|
||||
AUTHORIZED_USERS = ["zorlin", "AlbertoSoutullo", "michatinkers"]
|
||||
|
||||
LARS_API_URL = os.getenv("LARS_API_URL", "http://localhost:9930")
|
||||
LARS_API_URL = os.getenv("LARS_API_URL", "http://10.1.20.78:9930")
|
||||
|
||||
@task
|
||||
def find_valid_issue(repo_name: str, github_token: str):
|
||||
@@ -336,7 +336,7 @@ def deploy_config(config: dict):
|
||||
# Generate descriptive release name
|
||||
if chart == "waku":
|
||||
nodecount = config.get("nodecount", 50)
|
||||
message_rate = 1000 // config.get("publisher_delay", 10) # messages per second
|
||||
message_rate = config.get("publisher_delay", 10) # messages per second
|
||||
message_size = config.get("publisher_message_size", 1)
|
||||
k_value = nodecount/1000
|
||||
k_str = f"{int(k_value)}K" if k_value >= 1 else f"{int(nodecount)}"
|
||||
|
||||
Reference in New Issue
Block a user