mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
websocket server running well now
This commit is contained in:
85
autogpt_platform/backend/websocket/.github/workflows/ci.yml
vendored
Normal file
85
autogpt_platform/backend/websocket/.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,85 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main, master ]
|
||||
pull_request:
|
||||
branches: [ main, master ]
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
RUSTFLAGS: "-D warnings"
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Test
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
redis:
|
||||
image: redis:7
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
ports:
|
||||
- 6379:6379
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Run tests
|
||||
run: cargo test
|
||||
env:
|
||||
REDIS_URL: redis://localhost:6379
|
||||
|
||||
clippy:
|
||||
name: Clippy
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Run clippy
|
||||
run: |
|
||||
cargo clippy -- \
|
||||
-D warnings \
|
||||
-D clippy::unwrap_used \
|
||||
-D clippy::panic \
|
||||
-D clippy::unimplemented \
|
||||
-D clippy::todo
|
||||
|
||||
fmt:
|
||||
name: Format
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
components: rustfmt
|
||||
- name: Check formatting
|
||||
run: cargo fmt -- --check
|
||||
|
||||
bench:
|
||||
name: Benchmarks
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
redis:
|
||||
image: redis:7
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
ports:
|
||||
- 6379:6379
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Build benchmarks
|
||||
run: cargo bench --no-run
|
||||
env:
|
||||
REDIS_URL: redis://localhost:6379
|
||||
1542
autogpt_platform/backend/websocket/Cargo.lock
generated
1542
autogpt_platform/backend/websocket/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -5,6 +5,14 @@ description = "WebSocket server for AutoGPT Platform"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
name = "websocket"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "websocket"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.7.5", features = ["ws"] }
|
||||
jsonwebtoken = "9.3.0"
|
||||
@@ -19,3 +27,34 @@ futures = "0.3"
|
||||
dotenvy = "0.15"
|
||||
clap = { version = "4.5.4", features = ["derive"] }
|
||||
toml = "0.8"
|
||||
|
||||
[dev-dependencies]
|
||||
# Load testing and profiling
|
||||
tokio-console = "0.1"
|
||||
criterion = { version = "0.5", features = ["async_tokio"] }
|
||||
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
|
||||
# Dependencies for benchmarks
|
||||
tokio-tungstenite = "0.24"
|
||||
futures-util = "0.3"
|
||||
chrono = "0.4"
|
||||
|
||||
[[bench]]
|
||||
name = "websocket_bench"
|
||||
harness = false
|
||||
|
||||
[[example]]
|
||||
name = "ws_client_example"
|
||||
required-features = []
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3 # Maximum optimization
|
||||
lto = true # Enable link-time optimization
|
||||
codegen-units = 1 # Reduce parallel code generation units to increase optimization
|
||||
panic = "abort" # Remove panic unwinding to reduce binary size
|
||||
strip = true # Strip symbols from binary
|
||||
|
||||
[profile.bench]
|
||||
opt-level = 3 # Maximum optimization
|
||||
lto = true # Enable link-time optimization
|
||||
codegen-units = 1 # Reduce parallel code generation units to increase optimization
|
||||
debug = true # Keep debug symbols for profiling
|
||||
|
||||
93
autogpt_platform/backend/websocket/benches/README.md
Normal file
93
autogpt_platform/backend/websocket/benches/README.md
Normal file
@@ -0,0 +1,93 @@
|
||||
# WebSocket Server Benchmarks
|
||||
|
||||
This directory contains performance benchmarks for the AutoGPT WebSocket server.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. Redis must be running locally or set `REDIS_URL` environment variable:
|
||||
```bash
|
||||
docker run -d -p 6379:6379 redis:latest
|
||||
```
|
||||
|
||||
2. Build the project in release mode:
|
||||
```bash
|
||||
cargo build --release
|
||||
```
|
||||
|
||||
## Running Benchmarks
|
||||
|
||||
Run all benchmarks:
|
||||
```bash
|
||||
cargo bench
|
||||
```
|
||||
|
||||
Run specific benchmark group:
|
||||
```bash
|
||||
cargo bench connection_establishment
|
||||
cargo bench subscriptions
|
||||
cargo bench message_throughput
|
||||
cargo bench concurrent_connections
|
||||
cargo bench message_parsing
|
||||
cargo bench redis_event_processing
|
||||
```
|
||||
|
||||
## Benchmark Categories
|
||||
|
||||
### Connection Establishment
|
||||
Tests the performance of establishing WebSocket connections with different authentication scenarios:
|
||||
- No authentication
|
||||
- Valid JWT authentication
|
||||
- Invalid JWT authentication (connection rejection)
|
||||
|
||||
### Subscriptions
|
||||
Measures the performance of subscription operations:
|
||||
- Subscribing to graph execution events
|
||||
- Unsubscribing from channels
|
||||
|
||||
### Message Throughput
|
||||
Tests how many messages the server can process per second with varying message counts (10, 100, 1000).
|
||||
|
||||
### Concurrent Connections
|
||||
Benchmarks the server's ability to handle multiple simultaneous connections (10, 50, 100, 500 clients).
|
||||
|
||||
### Message Parsing
|
||||
Tests JSON parsing performance with different message sizes (100B to 100KB).
|
||||
|
||||
### Redis Event Processing
|
||||
Benchmarks the parsing of execution events received from Redis.
|
||||
|
||||
## Profiling
|
||||
|
||||
To generate flamegraphs for CPU profiling:
|
||||
|
||||
1. Install flamegraph tools:
|
||||
```bash
|
||||
cargo install flamegraph
|
||||
```
|
||||
|
||||
2. Run benchmarks with profiling:
|
||||
```bash
|
||||
cargo bench --bench websocket_bench -- --profile-time=10
|
||||
```
|
||||
|
||||
## Interpreting Results
|
||||
|
||||
- **Throughput**: Higher is better (operations/second or elements/second)
|
||||
- **Time**: Lower is better (nanoseconds per operation)
|
||||
- **Error margins**: Look for stable results with low standard deviation
|
||||
|
||||
## Optimizing Performance
|
||||
|
||||
Based on benchmark results, consider:
|
||||
|
||||
1. **Connection pooling** for Redis connections
|
||||
2. **Message batching** for high-throughput scenarios
|
||||
3. **Async task tuning** for concurrent connection handling
|
||||
4. **JSON parsing optimization** using simd-json or other fast parsers
|
||||
5. **Memory allocation** optimization using arena allocators
|
||||
|
||||
## Notes
|
||||
|
||||
- Benchmarks create actual WebSocket servers on random ports
|
||||
- Each benchmark iteration properly cleans up resources
|
||||
- Results may vary based on system resources and Redis performance
|
||||
406
autogpt_platform/backend/websocket/benches/websocket_bench.rs
Normal file
406
autogpt_platform/backend/websocket/benches/websocket_bench.rs
Normal file
@@ -0,0 +1,406 @@
|
||||
#![allow(clippy::unwrap_used)] // Benchmarks can panic on setup errors
|
||||
|
||||
use axum::{routing::get, Router};
|
||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
|
||||
// Import the actual websocket server components
|
||||
use websocket::{models, ws_handler, AppState, Config, ConnectionManager, Stats};
|
||||
|
||||
// Helper to create a test server
|
||||
async fn create_test_server(enable_auth: bool) -> (String, tokio::task::JoinHandle<()>) {
|
||||
// Set environment variables for test config
|
||||
std::env::set_var("WEBSOCKET_SERVER_HOST", "127.0.0.1");
|
||||
std::env::set_var("WEBSOCKET_SERVER_PORT", "0");
|
||||
std::env::set_var("ENABLE_AUTH", enable_auth.to_string());
|
||||
std::env::set_var("SUPABASE_JWT_SECRET", "test_secret");
|
||||
std::env::set_var("DEFAULT_USER_ID", "test_user");
|
||||
if std::env::var("REDIS_URL").is_err() {
|
||||
std::env::set_var("REDIS_URL", "redis://localhost:6379");
|
||||
}
|
||||
|
||||
let mut config = Config::load(None);
|
||||
config.port = 0; // Force OS to assign port
|
||||
|
||||
let redis_client =
|
||||
redis::Client::open(config.redis_url.clone()).expect("Failed to connect to Redis");
|
||||
let stats = Arc::new(Stats::default());
|
||||
let mgr = Arc::new(ConnectionManager::new(
|
||||
redis_client,
|
||||
config.execution_event_bus_name.clone(),
|
||||
stats.clone(),
|
||||
));
|
||||
|
||||
// Start broadcaster
|
||||
let mgr_clone = mgr.clone();
|
||||
tokio::spawn(async move {
|
||||
mgr_clone.run_broadcaster().await;
|
||||
});
|
||||
|
||||
let state = AppState {
|
||||
mgr,
|
||||
config: Arc::new(config),
|
||||
stats,
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
.route("/ws", get(ws_handler))
|
||||
.layer(axum::Extension(state));
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let server_url = format!("ws://{addr}");
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
axum::serve(listener, app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// Give server time to start
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
(server_url, server_handle)
|
||||
}
|
||||
|
||||
// Helper to create a valid JWT token
|
||||
fn create_jwt_token(user_id: &str) -> String {
|
||||
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
aud: Vec<String>,
|
||||
exp: usize,
|
||||
}
|
||||
|
||||
let claims = Claims {
|
||||
sub: user_id.to_string(),
|
||||
aud: vec!["authenticated".to_string()],
|
||||
exp: (chrono::Utc::now() + chrono::Duration::hours(1)).timestamp() as usize,
|
||||
};
|
||||
|
||||
encode(
|
||||
&Header::new(Algorithm::HS256),
|
||||
&claims,
|
||||
&EncodingKey::from_secret(b"test_secret"),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// Benchmark connection establishment
|
||||
fn benchmark_connection_establishment(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("connection_establishment");
|
||||
group.measurement_time(Duration::from_secs(30));
|
||||
|
||||
// Test without auth
|
||||
group.bench_function("no_auth", |b| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(false).await;
|
||||
let url = format!("{server_url}/ws");
|
||||
let (ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
drop(ws_stream);
|
||||
server_handle.abort();
|
||||
});
|
||||
});
|
||||
|
||||
// Test with valid auth
|
||||
group.bench_function("valid_auth", |b| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(true).await;
|
||||
let token = create_jwt_token("test_user");
|
||||
let url = format!("{server_url}/ws?token={token}");
|
||||
let (ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
drop(ws_stream);
|
||||
server_handle.abort();
|
||||
});
|
||||
});
|
||||
|
||||
// Test with invalid auth
|
||||
group.bench_function("invalid_auth", |b| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(true).await;
|
||||
let url = format!("{server_url}/ws?token=invalid");
|
||||
let result = connect_async(&url).await;
|
||||
assert!(
|
||||
result.is_err() || {
|
||||
if let Ok((mut ws_stream, _)) = result {
|
||||
// Should receive close frame
|
||||
matches!(ws_stream.next().await, Some(Ok(Message::Close(_))))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
);
|
||||
server_handle.abort();
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark subscription operations
|
||||
fn benchmark_subscriptions(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("subscriptions");
|
||||
group.measurement_time(Duration::from_secs(20));
|
||||
|
||||
group.bench_function("subscribe_graph_execution", |b| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(false).await;
|
||||
let url = format!("{server_url}/ws");
|
||||
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
let msg = json!({
|
||||
"method": "subscribe_graph_execution",
|
||||
"data": {
|
||||
"graph_exec_id": "test_exec_123"
|
||||
}
|
||||
});
|
||||
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait for response
|
||||
if let Some(Ok(Message::Text(response))) = ws_stream.next().await {
|
||||
let resp: serde_json::Value = serde_json::from_str(&response).unwrap();
|
||||
assert_eq!(resp["success"], true);
|
||||
}
|
||||
|
||||
server_handle.abort();
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("unsubscribe", |b| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(false).await;
|
||||
let url = format!("{server_url}/ws");
|
||||
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
|
||||
// First subscribe
|
||||
let msg = json!({
|
||||
"method": "subscribe_graph_execution",
|
||||
"data": {
|
||||
"graph_exec_id": "test_exec_123"
|
||||
}
|
||||
});
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
ws_stream.next().await; // Consume response
|
||||
let msg = json!({
|
||||
"method": "unsubscribe",
|
||||
"data": {
|
||||
"channel": "test_user|graph_exec#test_exec_123"
|
||||
}
|
||||
});
|
||||
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait for response
|
||||
if let Some(Ok(Message::Text(response))) = ws_stream.next().await {
|
||||
let resp: serde_json::Value = serde_json::from_str(&response).unwrap();
|
||||
assert_eq!(resp["success"], true);
|
||||
}
|
||||
|
||||
server_handle.abort();
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark message throughput
|
||||
fn benchmark_message_throughput(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("message_throughput");
|
||||
group.measurement_time(Duration::from_secs(30));
|
||||
|
||||
for msg_count in [10, 100, 1000].iter() {
|
||||
group.throughput(Throughput::Elements(*msg_count as u64));
|
||||
group.bench_with_input(
|
||||
BenchmarkId::from_parameter(msg_count),
|
||||
msg_count,
|
||||
|b, &msg_count| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(false).await;
|
||||
let url = format!("{server_url}/ws");
|
||||
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
// Send multiple heartbeat messages
|
||||
for _ in 0..msg_count {
|
||||
let msg = json!({
|
||||
"method": "heartbeat",
|
||||
"data": "ping"
|
||||
});
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Receive all responses
|
||||
for _ in 0..msg_count {
|
||||
ws_stream.next().await;
|
||||
}
|
||||
|
||||
server_handle.abort();
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark concurrent connections
|
||||
fn benchmark_concurrent_connections(c: &mut Criterion) {
|
||||
let rt = Runtime::new().unwrap();
|
||||
|
||||
let mut group = c.benchmark_group("concurrent_connections");
|
||||
group.measurement_time(Duration::from_secs(60));
|
||||
group.sample_size(10);
|
||||
|
||||
for num_clients in [100, 500, 1000].iter() {
|
||||
group.throughput(Throughput::Elements(*num_clients as u64));
|
||||
group.bench_with_input(
|
||||
BenchmarkId::from_parameter(num_clients),
|
||||
num_clients,
|
||||
|b, &num_clients| {
|
||||
b.to_async(&rt).iter_with_large_drop(|| async {
|
||||
let (server_url, server_handle) = create_test_server(false).await;
|
||||
let url = format!("{server_url}/ws");
|
||||
|
||||
// Create multiple concurrent connections
|
||||
let mut handles = vec![];
|
||||
for i in 0..num_clients {
|
||||
let url = url.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
|
||||
|
||||
// Subscribe to a unique channel
|
||||
let msg = json!({
|
||||
"method": "subscribe_graph_execution",
|
||||
"data": {
|
||||
"graph_exec_id": format!("exec_{}", i)
|
||||
}
|
||||
});
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
ws_stream.next().await; // Wait for response
|
||||
|
||||
// Send a heartbeat
|
||||
let msg = json!({
|
||||
"method": "heartbeat",
|
||||
"data": "ping"
|
||||
});
|
||||
ws_stream
|
||||
.send(Message::Text(msg.to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
ws_stream.next().await; // Wait for response
|
||||
|
||||
ws_stream
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
// Wait for all connections to complete
|
||||
for handle in handles {
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
server_handle.abort();
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark message parsing
|
||||
fn benchmark_message_parsing(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("message_parsing");
|
||||
|
||||
// Test different message sizes
|
||||
for msg_size in [100, 1000, 10000].iter() {
|
||||
group.throughput(Throughput::Bytes(*msg_size as u64));
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("parse_json", msg_size),
|
||||
msg_size,
|
||||
|b, &msg_size| {
|
||||
let data_str = "x".repeat(msg_size);
|
||||
let json_msg = json!({
|
||||
"method": "subscribe_graph_execution",
|
||||
"data": {
|
||||
"graph_exec_id": data_str
|
||||
}
|
||||
});
|
||||
let json_str = json_msg.to_string();
|
||||
|
||||
b.iter(|| {
|
||||
let _: models::WSMessage = serde_json::from_str(&json_str).unwrap();
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
// Benchmark Redis event processing
|
||||
fn benchmark_redis_event_processing(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("redis_event_processing");
|
||||
|
||||
group.bench_function("parse_execution_event", |b| {
|
||||
let event = json!({
|
||||
"payload": {
|
||||
"event_type": "graph_execution_update",
|
||||
"id": "exec_123",
|
||||
"graph_id": "graph_456",
|
||||
"graph_version": 1,
|
||||
"user_id": "user_789",
|
||||
"status": "RUNNING",
|
||||
"started_at": "2024-01-01T00:00:00Z",
|
||||
"inputs": {"test": "data"},
|
||||
"outputs": {}
|
||||
}
|
||||
});
|
||||
let event_str = event.to_string();
|
||||
|
||||
b.iter(|| {
|
||||
let _: models::RedisEventWrapper = serde_json::from_str(&event_str).unwrap();
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
benchmark_connection_establishment,
|
||||
benchmark_subscriptions,
|
||||
benchmark_message_throughput,
|
||||
benchmark_concurrent_connections,
|
||||
benchmark_message_parsing,
|
||||
benchmark_redis_event_processing
|
||||
);
|
||||
criterion_main!(benches);
|
||||
10
autogpt_platform/backend/websocket/clippy.toml
Normal file
10
autogpt_platform/backend/websocket/clippy.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
# Clippy configuration for robust error handling
|
||||
|
||||
# Set the maximum cognitive complexity allowed
|
||||
cognitive-complexity-threshold = 30
|
||||
|
||||
# Warn on TODO/FIXME comments
|
||||
allow-dbg-in-tests = false
|
||||
|
||||
# Enforce documentation
|
||||
missing-docs-in-crate-items = true
|
||||
@@ -0,0 +1,75 @@
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use serde_json::json;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let url = "ws://localhost:8001/ws";
|
||||
|
||||
println!("Connecting to {url}");
|
||||
let (mut ws_stream, _) = connect_async(url).await?;
|
||||
println!("Connected!");
|
||||
|
||||
// Subscribe to a graph execution
|
||||
let subscribe_msg = json!({
|
||||
"method": "subscribe_graph_execution",
|
||||
"data": {
|
||||
"graph_exec_id": "test_exec_123"
|
||||
}
|
||||
});
|
||||
|
||||
println!("Sending subscription request...");
|
||||
ws_stream
|
||||
.send(Message::Text(subscribe_msg.to_string()))
|
||||
.await?;
|
||||
|
||||
// Wait for response
|
||||
if let Some(msg) = ws_stream.next().await {
|
||||
if let Message::Text(text) = msg? {
|
||||
println!("Received: {text}");
|
||||
}
|
||||
}
|
||||
|
||||
// Send heartbeat
|
||||
let heartbeat_msg = json!({
|
||||
"method": "heartbeat",
|
||||
"data": "ping"
|
||||
});
|
||||
|
||||
println!("Sending heartbeat...");
|
||||
ws_stream
|
||||
.send(Message::Text(heartbeat_msg.to_string()))
|
||||
.await?;
|
||||
|
||||
// Wait for pong
|
||||
if let Some(msg) = ws_stream.next().await {
|
||||
if let Message::Text(text) = msg? {
|
||||
println!("Received: {text}");
|
||||
}
|
||||
}
|
||||
|
||||
// Unsubscribe
|
||||
let unsubscribe_msg = json!({
|
||||
"method": "unsubscribe",
|
||||
"data": {
|
||||
"channel": "default|graph_exec#test_exec_123"
|
||||
}
|
||||
});
|
||||
|
||||
println!("Sending unsubscribe request...");
|
||||
ws_stream
|
||||
.send(Message::Text(unsubscribe_msg.to_string()))
|
||||
.await?;
|
||||
|
||||
// Wait for response
|
||||
if let Some(msg) = ws_stream.next().await {
|
||||
if let Message::Text(text) = msg? {
|
||||
println!("Received: {text}");
|
||||
}
|
||||
}
|
||||
|
||||
println!("Closing connection...");
|
||||
ws_stream.close(None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,7 +4,6 @@ use std::env;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use toml;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use futures::StreamExt;
|
||||
use redis::Client as RedisClient;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
use tracing::{error, info, warn, debug};
|
||||
use futures::StreamExt;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::models::{WSMessage, RedisEventWrapper, ExecutionEvent};
|
||||
use crate::models::{ExecutionEvent, RedisEventWrapper, WSMessage};
|
||||
use crate::stats::Stats;
|
||||
|
||||
pub struct ConnectionManager {
|
||||
@@ -33,106 +33,159 @@ impl ConnectionManager {
|
||||
}
|
||||
|
||||
pub async fn run_broadcaster(self: Arc<Self>) {
|
||||
info!("Starting Redis event broadcaster");
|
||||
|
||||
info!("🚀 Starting Redis event broadcaster");
|
||||
|
||||
loop {
|
||||
match self.run_broadcaster_inner().await {
|
||||
Ok(_) => {
|
||||
warn!("Event broadcaster stopped unexpectedly, restarting in 5 seconds");
|
||||
warn!("⚠️ Event broadcaster stopped unexpectedly, restarting in 5 seconds");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Event broadcaster error: {}, restarting in 5 seconds", e);
|
||||
error!("❌ Event broadcaster error: {}, restarting in 5 seconds", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_broadcaster_inner(self: &Arc<Self>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
|
||||
async fn run_broadcaster_inner(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut pubsub = self.redis_client.get_async_pubsub().await?;
|
||||
pubsub.psubscribe("*").await?;
|
||||
debug!("Listening to all Redis events, filtering for bus: {}", self.bus_name);
|
||||
info!(
|
||||
"📡 Listening to all Redis events, filtering for bus: {}",
|
||||
self.bus_name
|
||||
);
|
||||
|
||||
let mut pubsub_stream = pubsub.on_message();
|
||||
|
||||
|
||||
loop {
|
||||
let msg = pubsub_stream.next().await;
|
||||
match msg {
|
||||
Some(msg) => {
|
||||
let channel: String = msg.get_channel_name().to_string();
|
||||
debug!("Received message on Redis channel: {}", channel);
|
||||
self.stats.redis_messages_received.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
debug!("📨 Received message on Redis channel: {}", channel);
|
||||
self.stats
|
||||
.redis_messages_received
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let payload: String = match msg.get_payload() {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
warn!("Failed to get payload from Redis message: {}", e);
|
||||
self.stats.errors_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
warn!("⚠️ Failed to get payload from Redis message: {}", e);
|
||||
self.stats
|
||||
.errors_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Parse the channel format: execution_event/{user_id}/{graph_id}/{graph_exec_id}
|
||||
let parts: Vec<&str> = channel.split('/').collect();
|
||||
|
||||
|
||||
// Check if this is an execution event channel
|
||||
if parts.len() != 4 || parts[0] != &self.bus_name {
|
||||
debug!("Ignoring non-execution event channel: {} (parts: {:?}, bus_name: {})", channel, parts, self.bus_name);
|
||||
self.stats.redis_messages_ignored.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if parts.len() != 4 || parts[0] != self.bus_name {
|
||||
debug!(
|
||||
"🚫 Ignoring non-execution event channel: {} (parts: {:?}, bus_name: {})",
|
||||
channel, parts, self.bus_name
|
||||
);
|
||||
self.stats
|
||||
.redis_messages_ignored
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
let user_id = parts[1];
|
||||
let graph_id = parts[2];
|
||||
let graph_exec_id = parts[3];
|
||||
|
||||
debug!("Received event - user: {}, graph: {}, exec: {}", user_id, graph_id, graph_exec_id);
|
||||
|
||||
debug!(
|
||||
"📥 Received event - user: {}, graph: {}, exec: {}",
|
||||
user_id, graph_id, graph_exec_id
|
||||
);
|
||||
|
||||
// Parse the wrapped event
|
||||
let wrapped_event = match RedisEventWrapper::parse(&payload) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse event JSON: {}, payload: {}", e, payload);
|
||||
self.stats.errors_json_parse.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats.errors_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
warn!("⚠️ Failed to parse event JSON: {}, payload: {}", e, payload);
|
||||
self.stats
|
||||
.errors_json_parse
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats
|
||||
.errors_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let event = wrapped_event.payload;
|
||||
debug!("Event received: {:?}", event);
|
||||
|
||||
debug!("📦 Event received: {:?}", event);
|
||||
|
||||
let (method, event_json) = match &event {
|
||||
ExecutionEvent::GraphExecutionUpdate(graph_event) => {
|
||||
self.stats.graph_execution_events.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats.events_received_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
("graph_execution_event", serde_json::to_value(graph_event).unwrap())
|
||||
},
|
||||
self.stats
|
||||
.graph_execution_events
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats
|
||||
.events_received_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
(
|
||||
"graph_execution_event",
|
||||
match serde_json::to_value(graph_event) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("❌ Failed to serialize graph event: {}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
ExecutionEvent::NodeExecutionUpdate(node_event) => {
|
||||
self.stats.node_execution_events.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats.events_received_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
("node_execution_event", serde_json::to_value(node_event).unwrap())
|
||||
},
|
||||
self.stats
|
||||
.node_execution_events
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
self.stats
|
||||
.events_received_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
(
|
||||
"node_execution_event",
|
||||
match serde_json::to_value(node_event) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("❌ Failed to serialize node event: {}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Create the channel keys in the format expected by WebSocket clients
|
||||
let mut channels_to_notify = Vec::new();
|
||||
|
||||
|
||||
// For both event types, notify the specific execution channel
|
||||
let exec_channel = format!("{}|graph_exec#{}", user_id, graph_exec_id);
|
||||
let exec_channel = format!("{user_id}|graph_exec#{graph_exec_id}");
|
||||
channels_to_notify.push(exec_channel.clone());
|
||||
|
||||
|
||||
// For graph execution events, also notify the graph executions channel
|
||||
if matches!(&event, ExecutionEvent::GraphExecutionUpdate(_)) {
|
||||
let graph_channel = format!("{}|graph#{}|executions", user_id, graph_id);
|
||||
let graph_channel = format!("{user_id}|graph#{graph_id}|executions");
|
||||
channels_to_notify.push(graph_channel);
|
||||
}
|
||||
|
||||
debug!("Broadcasting {} event to channels: {:?}", method, channels_to_notify);
|
||||
|
||||
|
||||
debug!(
|
||||
"📢 Broadcasting {} event to channels: {:?}",
|
||||
method, channels_to_notify
|
||||
);
|
||||
|
||||
let subs = self.subscribers.read().await;
|
||||
|
||||
// Log current subscriber state
|
||||
debug!("📊 Current subscribers count: {}", subs.len());
|
||||
|
||||
for channel_key in channels_to_notify {
|
||||
let ws_msg = WSMessage {
|
||||
method: method.to_string(),
|
||||
@@ -142,51 +195,81 @@ impl ConnectionManager {
|
||||
};
|
||||
let json_msg = match serde_json::to_string(&ws_msg) {
|
||||
Ok(j) => {
|
||||
debug!("Sending WebSocket message: {}", j);
|
||||
debug!("📤 Sending WebSocket message: {}", j);
|
||||
j
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize WebSocket message: {}", e);
|
||||
error!("❌ Failed to serialize WebSocket message: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
if let Some(client_ids) = subs.get(&channel_key) {
|
||||
let clients = self.clients.read().await;
|
||||
let client_count = client_ids.len();
|
||||
debug!("Broadcasting to {} clients on channel: {}", client_count, channel_key);
|
||||
|
||||
for &cid in client_ids {
|
||||
if let Some((user_id, tx)) = clients.get(&cid) {
|
||||
match tx.try_send(json_msg.clone()) {
|
||||
Ok(_) => {
|
||||
debug!("Message sent immediately to client {} (user: {})", cid, user_id);
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
// Channel is full, try with a small timeout
|
||||
let tx_clone = tx.clone();
|
||||
let msg_clone = json_msg.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::time::timeout(
|
||||
std::time::Duration::from_millis(100),
|
||||
tx_clone.send(msg_clone)
|
||||
).await;
|
||||
});
|
||||
warn!("Channel full for client {} (user: {}), sending async", cid, user_id);
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
warn!("Channel closed for client {} (user: {})", cid, user_id);
|
||||
let clients = self.clients.read().await;
|
||||
let client_count = client_ids.len();
|
||||
debug!(
|
||||
"📣 Broadcasting to {} clients on channel: {}",
|
||||
client_count, channel_key
|
||||
);
|
||||
|
||||
for &cid in client_ids {
|
||||
if let Some((user_id, tx)) = clients.get(&cid) {
|
||||
match tx.try_send(json_msg.clone()) {
|
||||
Ok(_) => {
|
||||
debug!(
|
||||
"✅ Message sent immediately to client {} (user: {})",
|
||||
cid, user_id
|
||||
);
|
||||
self.stats
|
||||
.messages_sent_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
// Channel is full, try with a small timeout
|
||||
let tx_clone = tx.clone();
|
||||
let msg_clone = json_msg.clone();
|
||||
let stats_clone = self.stats.clone();
|
||||
tokio::spawn(async move {
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(100),
|
||||
tx_clone.send(msg_clone),
|
||||
)
|
||||
.await {
|
||||
Ok(Ok(_)) => {
|
||||
stats_clone
|
||||
.messages_sent_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
_ => {
|
||||
stats_clone
|
||||
.messages_failed_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
warn!("⚠️ Channel full for client {} (user: {}), sending async", cid, user_id);
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
warn!(
|
||||
"⚠️ Channel closed for client {} (user: {})",
|
||||
cid, user_id
|
||||
);
|
||||
self.stats
|
||||
.messages_failed_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("⚠️ Client {} not found in clients map", cid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("No subscribers for channel: {}", channel_key);
|
||||
info!("📭 No subscribers for channel: {}", channel_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
return Err("Redis pubsub stream ended".into());
|
||||
return Err("❌ Redis pubsub stream ended".into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,29 @@
|
||||
use axum::extract::ws::{CloseFrame, Message, WebSocket};
|
||||
use axum::{
|
||||
extract::{Query, WebSocketUpgrade},
|
||||
http::HeaderMap,
|
||||
response::IntoResponse,
|
||||
Extension,
|
||||
};
|
||||
use axum::extract::ws::{CloseFrame, Message, WebSocket};
|
||||
use jsonwebtoken::{decode, Validation, DecodingKey};
|
||||
use jsonwebtoken::{decode, DecodingKey, Validation};
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, info, warn, debug};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::connection_manager::ConnectionManager;
|
||||
use crate::models::{Claims, WSMessage};
|
||||
use crate::AppState;
|
||||
|
||||
// Helper function to safely serialize messages
|
||||
fn serialize_message(msg: &WSMessage) -> String {
|
||||
serde_json::to_string(msg).unwrap_or_else(|e| {
|
||||
error!("❌ Failed to serialize WebSocket message: {}", e);
|
||||
json!({"method": "error", "success": false, "error": "Internal serialization error"})
|
||||
.to_string()
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn ws_handler(
|
||||
ws: WebSocketUpgrade,
|
||||
query: Query<HashMap<String, String>>,
|
||||
@@ -28,36 +37,44 @@ pub async fn ws_handler(
|
||||
if state.config.enable_auth {
|
||||
match token {
|
||||
Some(token_str) => {
|
||||
debug!("Authenticating WebSocket connection");
|
||||
let mut validation = Validation::new(state.config.jwt_algorithm);
|
||||
validation.set_audience(&["authenticated"]);
|
||||
debug!("🔐 Authenticating WebSocket connection");
|
||||
let mut validation = Validation::new(state.config.jwt_algorithm);
|
||||
validation.set_audience(&["authenticated"]);
|
||||
|
||||
let key = DecodingKey::from_secret(state.config.jwt_secret.as_bytes());
|
||||
let key = DecodingKey::from_secret(state.config.jwt_secret.as_bytes());
|
||||
|
||||
match decode::<Claims>(&token_str, &key, &validation) {
|
||||
Ok(token_data) => {
|
||||
user_id = token_data.claims.sub.clone();
|
||||
debug!("WebSocket authenticated for user: {}", user_id);
|
||||
debug!("✅ WebSocket authenticated for user: {}", user_id);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("JWT validation failed: {}", e);
|
||||
warn!("⚠️ JWT validation failed: {}", e);
|
||||
auth_error_code = Some(4003);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("Missing authentication token in WebSocket connection");
|
||||
warn!("⚠️ Missing authentication token in WebSocket connection");
|
||||
auth_error_code = Some(4001);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("WebSocket connection without auth (auth disabled)");
|
||||
debug!("🔓 WebSocket connection without auth (auth disabled)");
|
||||
}
|
||||
|
||||
if let Some(code) = auth_error_code {
|
||||
error!("WebSocket authentication failed with code: {}", code);
|
||||
state.mgr.stats.connections_failed_auth.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
state.mgr.stats.connections_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
error!("❌ WebSocket authentication failed with code: {}", code);
|
||||
state
|
||||
.mgr
|
||||
.stats
|
||||
.connections_failed_auth
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
state
|
||||
.mgr
|
||||
.stats
|
||||
.connections_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
return ws
|
||||
.on_upgrade(move |mut socket: WebSocket| async move {
|
||||
let close_frame = Some(CloseFrame {
|
||||
@@ -70,24 +87,37 @@ pub async fn ws_handler(
|
||||
.into_response();
|
||||
}
|
||||
|
||||
debug!("WebSocket connection established for user: {}", user_id);
|
||||
debug!("✅ WebSocket connection established for user: {}", user_id);
|
||||
ws.on_upgrade(move |socket| {
|
||||
handle_socket(socket, user_id, state.mgr.clone(), state.config.max_message_size_limit)
|
||||
handle_socket(
|
||||
socket,
|
||||
user_id,
|
||||
state.mgr.clone(),
|
||||
state.config.max_message_size_limit,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn update_subscription_stats(mgr: &ConnectionManager, channel: &str, add: bool) {
|
||||
if add {
|
||||
mgr.stats.subscriptions_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats.subscriptions_active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
mgr.stats
|
||||
.subscriptions_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats
|
||||
.subscriptions_active
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let mut channel_stats = mgr.stats.channels_active.write().await;
|
||||
let count = channel_stats.entry(channel.to_string()).or_insert(0);
|
||||
*count += 1;
|
||||
} else {
|
||||
mgr.stats.unsubscriptions_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats.subscriptions_active.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
mgr.stats
|
||||
.unsubscriptions_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats
|
||||
.subscriptions_active
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let mut channel_stats = mgr.stats.channels_active.write().await;
|
||||
if let Some(count) = channel_stats.get_mut(channel) {
|
||||
*count = count.saturating_sub(1);
|
||||
@@ -104,14 +134,20 @@ pub async fn handle_socket(
|
||||
mgr: std::sync::Arc<ConnectionManager>,
|
||||
max_size: usize,
|
||||
) {
|
||||
let client_id = mgr.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let client_id = mgr
|
||||
.next_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let (tx, mut rx) = mpsc::channel::<String>(10);
|
||||
info!("New WebSocket client {} for user: {}", client_id, user_id);
|
||||
|
||||
info!("👋 New WebSocket client {} for user: {}", client_id, user_id);
|
||||
|
||||
// Update connection stats
|
||||
mgr.stats.connections_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats.connections_active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
mgr.stats
|
||||
.connections_total
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats
|
||||
.connections_active
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
// Update active users
|
||||
{
|
||||
let mut active_users = mgr.stats.active_users.write().await;
|
||||
@@ -148,13 +184,13 @@ pub async fn handle_socket(
|
||||
match msg {
|
||||
Message::Text(text) => {
|
||||
if text.len() > max_size {
|
||||
warn!("Message from client {} exceeds size limit: {} > {}", client_id, text.len(), max_size);
|
||||
let err_resp = serde_json::to_string(&WSMessage {
|
||||
warn!("⚠️ Message from client {} exceeds size limit: {} > {}", client_id, text.len(), max_size);
|
||||
let err_resp = serialize_message(&WSMessage {
|
||||
method: "error".to_string(),
|
||||
success: Some(false),
|
||||
error: Some("Message exceeds size limit".to_string()),
|
||||
..Default::default()
|
||||
}).unwrap();
|
||||
});
|
||||
if socket.send(Message::Text(err_resp)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
@@ -162,28 +198,28 @@ pub async fn handle_socket(
|
||||
}
|
||||
|
||||
mgr.stats.messages_received_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
|
||||
let ws_msg: WSMessage = match serde_json::from_str(&text) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
warn!("Invalid message format from client {}: {}", client_id, e);
|
||||
warn!("⚠️ Invalid message format from client {}: {}", client_id, e);
|
||||
mgr.stats.errors_json_parse.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
mgr.stats.errors_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let err_resp = serde_json::to_string(&WSMessage {
|
||||
let err_resp = serialize_message(&WSMessage {
|
||||
method: "error".to_string(),
|
||||
success: Some(false),
|
||||
error: Some("Invalid message format. Review the schema and retry".to_string()),
|
||||
..Default::default()
|
||||
}).unwrap();
|
||||
});
|
||||
if socket.send(Message::Text(err_resp)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Received {} message from client {}", ws_msg.method, client_id);
|
||||
|
||||
|
||||
debug!("📥 Received {} message from client {}", ws_msg.method, client_id);
|
||||
|
||||
match ws_msg.method.as_str() {
|
||||
"subscribe_graph_execution" => {
|
||||
let graph_exec_id = match &ws_msg.data {
|
||||
@@ -191,15 +227,15 @@ pub async fn handle_socket(
|
||||
_ => None,
|
||||
};
|
||||
let Some(graph_exec_id) = graph_exec_id else {
|
||||
warn!("Missing graph_exec_id in subscribe_graph_execution from client {}", client_id);
|
||||
warn!("⚠️ Missing graph_exec_id in subscribe_graph_execution from client {}", client_id);
|
||||
let err_resp = json!({"method": "error", "success": false, "error": "Missing graph_exec_id"});
|
||||
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
};
|
||||
let channel = format!("{}|graph_exec#{}", user_id, graph_exec_id);
|
||||
debug!("Client {} subscribing to channel: {}", client_id, channel);
|
||||
let channel = format!("{user_id}|graph_exec#{graph_exec_id}");
|
||||
debug!("📌 Client {} subscribing to channel: {}", client_id, channel);
|
||||
|
||||
{
|
||||
let mut subs = mgr.subscribers.write().await;
|
||||
@@ -211,7 +247,7 @@ pub async fn handle_socket(
|
||||
set.insert(channel.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Update subscription stats
|
||||
update_subscription_stats(&mgr, &channel, true).await;
|
||||
|
||||
@@ -221,7 +257,7 @@ pub async fn handle_socket(
|
||||
channel: Some(channel),
|
||||
..Default::default()
|
||||
};
|
||||
if socket.send(Message::Text(serde_json::to_string(&resp).unwrap())).await.is_err() {
|
||||
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -237,7 +273,7 @@ pub async fn handle_socket(
|
||||
}
|
||||
continue;
|
||||
};
|
||||
let channel = format!("{}|graph#{}|executions", user_id, graph_id);
|
||||
let channel = format!("{user_id}|graph#{graph_id}|executions");
|
||||
|
||||
{
|
||||
let mut subs = mgr.subscribers.write().await;
|
||||
@@ -249,7 +285,7 @@ pub async fn handle_socket(
|
||||
set.insert(channel.clone());
|
||||
}
|
||||
}
|
||||
|
||||
debug!("📌 Client {} subscribing to channel: {}", client_id, channel);
|
||||
// Update subscription stats
|
||||
update_subscription_stats(&mgr, &channel, true).await;
|
||||
|
||||
@@ -259,7 +295,7 @@ pub async fn handle_socket(
|
||||
channel: Some(channel),
|
||||
..Default::default()
|
||||
};
|
||||
if socket.send(Message::Text(serde_json::to_string(&resp).unwrap())).await.is_err() {
|
||||
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -278,7 +314,7 @@ pub async fn handle_socket(
|
||||
};
|
||||
let channel = channel.to_string();
|
||||
|
||||
if !channel.starts_with(&format!("{}|", user_id)) {
|
||||
if !channel.starts_with(&format!("{user_id}|")) {
|
||||
let err_resp = json!({"method": "error", "success": false, "error": "Unauthorized channel"});
|
||||
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
|
||||
break;
|
||||
@@ -301,7 +337,7 @@ pub async fn handle_socket(
|
||||
set.remove(&channel);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Update subscription stats
|
||||
update_subscription_stats(&mgr, &channel, false).await;
|
||||
|
||||
@@ -311,7 +347,7 @@ pub async fn handle_socket(
|
||||
channel: Some(channel),
|
||||
..Default::default()
|
||||
};
|
||||
if socket.send(Message::Text(serde_json::to_string(&resp).unwrap())).await.is_err() {
|
||||
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -323,7 +359,7 @@ pub async fn handle_socket(
|
||||
success: Some(true),
|
||||
..Default::default()
|
||||
};
|
||||
if socket.send(Message::Text(serde_json::to_string(&resp).unwrap())).await.is_err() {
|
||||
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
@@ -334,7 +370,7 @@ pub async fn handle_socket(
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("Unknown method '{}' from client {}", ws_msg.method, client_id);
|
||||
warn!("❓ Unknown method '{}' from client {}", ws_msg.method, client_id);
|
||||
let err_resp = json!({"method": "error", "success": false, "error": "Unknown method"});
|
||||
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
|
||||
break;
|
||||
@@ -357,11 +393,13 @@ pub async fn handle_socket(
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
debug!("WebSocket client {} disconnected, cleaning up", client_id);
|
||||
|
||||
debug!("👋 WebSocket client {} disconnected, cleaning up", client_id);
|
||||
|
||||
// Update connection stats
|
||||
mgr.stats.connections_active.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
mgr.stats
|
||||
.connections_active
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
// Update active users
|
||||
{
|
||||
let mut active_users = mgr.stats.active_users.write().await;
|
||||
@@ -372,7 +410,7 @@ pub async fn handle_socket(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
let channels = {
|
||||
let mut client_channels = mgr.client_channels.write().await;
|
||||
client_channels.remove(&client_id).unwrap_or_default()
|
||||
@@ -389,7 +427,7 @@ pub async fn handle_socket(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Update subscription stats for all channels the client was subscribed to
|
||||
for channel in &channels {
|
||||
update_subscription_stats(&mgr, channel, false).await;
|
||||
@@ -399,6 +437,6 @@ pub async fn handle_socket(
|
||||
let mut clients = mgr.clients.write().await;
|
||||
clients.remove(&client_id);
|
||||
}
|
||||
|
||||
debug!("Cleanup completed for client {}", client_id);
|
||||
|
||||
debug!("✨ Cleanup completed for client {}", client_id);
|
||||
}
|
||||
|
||||
26
autogpt_platform/backend/websocket/src/lib.rs
Normal file
26
autogpt_platform/backend/websocket/src/lib.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
#![deny(warnings)]
|
||||
#![deny(clippy::unwrap_used)]
|
||||
#![deny(clippy::panic)]
|
||||
#![deny(clippy::unimplemented)]
|
||||
#![deny(clippy::todo)]
|
||||
|
||||
|
||||
pub mod config;
|
||||
pub mod connection_manager;
|
||||
pub mod handlers;
|
||||
pub mod models;
|
||||
pub mod stats;
|
||||
|
||||
pub use config::Config;
|
||||
pub use connection_manager::ConnectionManager;
|
||||
pub use handlers::ws_handler;
|
||||
pub use stats::Stats;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub mgr: Arc<ConnectionManager>,
|
||||
pub config: Arc<Config>,
|
||||
pub stats: Arc<Stats>,
|
||||
}
|
||||
@@ -1,16 +1,16 @@
|
||||
use axum::{
|
||||
body::Body,
|
||||
http::{header, StatusCode},
|
||||
response::Response,
|
||||
routing::get,
|
||||
Router,
|
||||
response::Response,
|
||||
http::{header, StatusCode},
|
||||
body::Body,
|
||||
};
|
||||
use clap::Parser;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use tracing::{debug, error, info};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use clap::Parser;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::connection_manager::ConnectionManager;
|
||||
@@ -28,12 +28,12 @@ async fn prometheus_handler(
|
||||
) -> Result<Response, StatusCode> {
|
||||
let snapshot = state.stats.snapshot().await;
|
||||
let prometheus_text = state.stats.to_prometheus_format(&snapshot);
|
||||
|
||||
Ok(Response::builder()
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
|
||||
.body(Body::from(prometheus_text))
|
||||
.unwrap())
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
|
||||
mod config;
|
||||
@@ -63,34 +63,44 @@ async fn main() {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "ws_api=info,tower_http=debug".into()),
|
||||
.unwrap_or_else(|_| "websocket=info,tower_http=debug".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
info!("Starting WebSocket API server");
|
||||
|
||||
info!("🚀 Starting WebSocket API server");
|
||||
|
||||
let cli = Cli::parse();
|
||||
let config = Arc::new(Config::load(cli.config.as_deref()));
|
||||
debug!("Configuration loaded - host: {}, port: {}, auth: {}", config.host, config.port, config.enable_auth);
|
||||
|
||||
info!(
|
||||
"⚙️ Configuration loaded - host: {}, port: {}, auth: {}",
|
||||
config.host, config.port, config.enable_auth
|
||||
);
|
||||
|
||||
let redis_client = match redis::Client::open(config.redis_url.clone()) {
|
||||
Ok(client) => {
|
||||
debug!("Redis client created successfully");
|
||||
debug!("✅ Redis client created successfully");
|
||||
client
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create Redis client: {}. Please check REDIS_URL environment variable", e);
|
||||
error!(
|
||||
"❌ Failed to create Redis client: {}. Please check REDIS_URL environment variable",
|
||||
e
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let stats = Arc::new(stats::Stats::default());
|
||||
let mgr = Arc::new(ConnectionManager::new(redis_client, config.execution_event_bus_name.clone(), stats.clone()));
|
||||
let mgr = Arc::new(ConnectionManager::new(
|
||||
redis_client,
|
||||
config.execution_event_bus_name.clone(),
|
||||
stats.clone(),
|
||||
));
|
||||
|
||||
let mgr_clone = mgr.clone();
|
||||
tokio::spawn(async move {
|
||||
debug!("Starting event broadcaster task");
|
||||
debug!("📡 Starting event broadcaster task");
|
||||
mgr_clone.run_broadcaster().await;
|
||||
});
|
||||
|
||||
@@ -141,19 +151,22 @@ async fn main() {
|
||||
let addr = format!("{}:{}", config.host, config.port);
|
||||
let listener = match TcpListener::bind(&addr).await {
|
||||
Ok(listener) => {
|
||||
info!("WebSocket server listening on: {}", addr);
|
||||
info!("🎧 WebSocket server listening on: {}", addr);
|
||||
listener
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to bind to {}: {}. Please check if the port is already in use", addr, e);
|
||||
error!(
|
||||
"❌ Failed to bind to {}: {}. Please check if the port is already in use",
|
||||
addr, e
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
info!("WebSocket API server ready to accept connections");
|
||||
|
||||
|
||||
info!("✨ WebSocket API server ready to accept connections");
|
||||
|
||||
if let Err(e) = axum::serve(listener, app.into_make_service()).await {
|
||||
error!("Server error: {}", e);
|
||||
error!("💥 Server error: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ pub struct GraphExecutionEvent {
|
||||
pub ended_at: Option<String>,
|
||||
pub preset_id: Option<String>,
|
||||
pub stats: Option<ExecutionStats>,
|
||||
|
||||
|
||||
// Keep these as JSON since they vary by graph
|
||||
pub inputs: Value,
|
||||
pub outputs: Value,
|
||||
@@ -61,7 +61,7 @@ pub struct NodeExecutionEvent {
|
||||
pub queue_time: Option<String>,
|
||||
pub start_time: Option<String>,
|
||||
pub end_time: Option<String>,
|
||||
|
||||
|
||||
// Keep these as JSON since they vary by node type
|
||||
pub input_data: Value,
|
||||
pub output_data: Value,
|
||||
@@ -87,6 +87,7 @@ pub enum ExecutionStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Incomplete,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
// Wrapper for the Redis event that includes the payload
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use tokio::sync::RwLock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Stats {
|
||||
@@ -9,32 +9,32 @@ pub struct Stats {
|
||||
pub connections_total: AtomicU64,
|
||||
pub connections_active: AtomicU64,
|
||||
pub connections_failed_auth: AtomicU64,
|
||||
|
||||
|
||||
// Message metrics
|
||||
pub messages_received_total: AtomicU64,
|
||||
pub messages_sent_total: AtomicU64,
|
||||
pub messages_failed_total: AtomicU64,
|
||||
|
||||
|
||||
// Subscription metrics
|
||||
pub subscriptions_total: AtomicU64,
|
||||
pub subscriptions_active: AtomicU64,
|
||||
pub unsubscriptions_total: AtomicU64,
|
||||
|
||||
|
||||
// Event metrics by type
|
||||
pub events_received_total: AtomicU64,
|
||||
pub graph_execution_events: AtomicU64,
|
||||
pub node_execution_events: AtomicU64,
|
||||
|
||||
|
||||
// Redis metrics
|
||||
pub redis_messages_received: AtomicU64,
|
||||
pub redis_messages_ignored: AtomicU64,
|
||||
|
||||
|
||||
// Channel metrics
|
||||
pub channels_active: RwLock<HashMap<String, usize>>, // channel -> subscriber count
|
||||
|
||||
|
||||
// User metrics
|
||||
pub active_users: RwLock<HashMap<String, usize>>, // user_id -> connection count
|
||||
|
||||
|
||||
// Error metrics
|
||||
pub errors_total: AtomicU64,
|
||||
pub errors_json_parse: AtomicU64,
|
||||
@@ -47,33 +47,33 @@ pub struct StatsSnapshot {
|
||||
pub connections_total: u64,
|
||||
pub connections_active: u64,
|
||||
pub connections_failed_auth: u64,
|
||||
|
||||
|
||||
// Message metrics
|
||||
pub messages_received_total: u64,
|
||||
pub messages_sent_total: u64,
|
||||
pub messages_failed_total: u64,
|
||||
|
||||
|
||||
// Subscription metrics
|
||||
pub subscriptions_total: u64,
|
||||
pub subscriptions_active: u64,
|
||||
pub unsubscriptions_total: u64,
|
||||
|
||||
|
||||
// Event metrics
|
||||
pub events_received_total: u64,
|
||||
pub graph_execution_events: u64,
|
||||
pub node_execution_events: u64,
|
||||
|
||||
|
||||
// Redis metrics
|
||||
pub redis_messages_received: u64,
|
||||
pub redis_messages_ignored: u64,
|
||||
|
||||
|
||||
// Channel metrics
|
||||
pub channels_active_count: usize,
|
||||
pub total_subscribers: usize,
|
||||
|
||||
|
||||
// User metrics
|
||||
pub active_users_count: usize,
|
||||
|
||||
|
||||
// Error metrics
|
||||
pub errors_total: u64,
|
||||
pub errors_json_parse: u64,
|
||||
@@ -87,103 +87,152 @@ impl Stats {
|
||||
let total_subscribers: usize = channels.values().sum();
|
||||
let channels_active_count = channels.len();
|
||||
drop(channels); // Release lock early
|
||||
|
||||
|
||||
let users = self.active_users.read().await;
|
||||
let active_users_count = users.len();
|
||||
drop(users); // Release lock early
|
||||
|
||||
|
||||
StatsSnapshot {
|
||||
connections_total: self.connections_total.load(Ordering::Relaxed),
|
||||
connections_active: self.connections_active.load(Ordering::Relaxed),
|
||||
connections_failed_auth: self.connections_failed_auth.load(Ordering::Relaxed),
|
||||
|
||||
|
||||
messages_received_total: self.messages_received_total.load(Ordering::Relaxed),
|
||||
messages_sent_total: self.messages_sent_total.load(Ordering::Relaxed),
|
||||
messages_failed_total: self.messages_failed_total.load(Ordering::Relaxed),
|
||||
|
||||
|
||||
subscriptions_total: self.subscriptions_total.load(Ordering::Relaxed),
|
||||
subscriptions_active: self.subscriptions_active.load(Ordering::Relaxed),
|
||||
unsubscriptions_total: self.unsubscriptions_total.load(Ordering::Relaxed),
|
||||
|
||||
|
||||
events_received_total: self.events_received_total.load(Ordering::Relaxed),
|
||||
graph_execution_events: self.graph_execution_events.load(Ordering::Relaxed),
|
||||
node_execution_events: self.node_execution_events.load(Ordering::Relaxed),
|
||||
|
||||
|
||||
redis_messages_received: self.redis_messages_received.load(Ordering::Relaxed),
|
||||
redis_messages_ignored: self.redis_messages_ignored.load(Ordering::Relaxed),
|
||||
|
||||
|
||||
channels_active_count,
|
||||
total_subscribers,
|
||||
active_users_count,
|
||||
|
||||
|
||||
errors_total: self.errors_total.load(Ordering::Relaxed),
|
||||
errors_json_parse: self.errors_json_parse.load(Ordering::Relaxed),
|
||||
errors_message_size: self.errors_message_size.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn to_prometheus_format(&self, snapshot: &StatsSnapshot) -> String {
|
||||
let mut output = String::new();
|
||||
|
||||
|
||||
// Connection metrics
|
||||
output.push_str("# HELP ws_connections_total Total number of WebSocket connections\n");
|
||||
output.push_str("# TYPE ws_connections_total counter\n");
|
||||
output.push_str(&format!("ws_connections_total {}\n\n", snapshot.connections_total));
|
||||
|
||||
output.push_str("# HELP ws_connections_active Current number of active WebSocket connections\n");
|
||||
output.push_str(&format!(
|
||||
"ws_connections_total {}\n\n",
|
||||
snapshot.connections_total
|
||||
));
|
||||
|
||||
output.push_str(
|
||||
"# HELP ws_connections_active Current number of active WebSocket connections\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_connections_active gauge\n");
|
||||
output.push_str(&format!("ws_connections_active {}\n\n", snapshot.connections_active));
|
||||
|
||||
output.push_str("# HELP ws_connections_failed_auth Total number of failed authentications\n");
|
||||
output.push_str(&format!(
|
||||
"ws_connections_active {}\n\n",
|
||||
snapshot.connections_active
|
||||
));
|
||||
|
||||
output
|
||||
.push_str("# HELP ws_connections_failed_auth Total number of failed authentications\n");
|
||||
output.push_str("# TYPE ws_connections_failed_auth counter\n");
|
||||
output.push_str(&format!("ws_connections_failed_auth {}\n\n", snapshot.connections_failed_auth));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_connections_failed_auth {}\n\n",
|
||||
snapshot.connections_failed_auth
|
||||
));
|
||||
|
||||
// Message metrics
|
||||
output.push_str("# HELP ws_messages_received_total Total number of messages received from clients\n");
|
||||
output.push_str(
|
||||
"# HELP ws_messages_received_total Total number of messages received from clients\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_messages_received_total counter\n");
|
||||
output.push_str(&format!("ws_messages_received_total {}\n\n", snapshot.messages_received_total));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_messages_received_total {}\n\n",
|
||||
snapshot.messages_received_total
|
||||
));
|
||||
|
||||
output.push_str("# HELP ws_messages_sent_total Total number of messages sent to clients\n");
|
||||
output.push_str("# TYPE ws_messages_sent_total counter\n");
|
||||
output.push_str(&format!("ws_messages_sent_total {}\n\n", snapshot.messages_sent_total));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_messages_sent_total {}\n\n",
|
||||
snapshot.messages_sent_total
|
||||
));
|
||||
|
||||
// Subscription metrics
|
||||
output.push_str("# HELP ws_subscriptions_active Current number of active subscriptions\n");
|
||||
output.push_str("# TYPE ws_subscriptions_active gauge\n");
|
||||
output.push_str(&format!("ws_subscriptions_active {}\n\n", snapshot.subscriptions_active));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_subscriptions_active {}\n\n",
|
||||
snapshot.subscriptions_active
|
||||
));
|
||||
|
||||
// Event metrics
|
||||
output.push_str("# HELP ws_events_received_total Total number of events received from Redis\n");
|
||||
output.push_str(
|
||||
"# HELP ws_events_received_total Total number of events received from Redis\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_events_received_total counter\n");
|
||||
output.push_str(&format!("ws_events_received_total {}\n\n", snapshot.events_received_total));
|
||||
|
||||
output.push_str("# HELP ws_graph_execution_events_total Total number of graph execution events\n");
|
||||
output.push_str(&format!(
|
||||
"ws_events_received_total {}\n\n",
|
||||
snapshot.events_received_total
|
||||
));
|
||||
|
||||
output.push_str(
|
||||
"# HELP ws_graph_execution_events_total Total number of graph execution events\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_graph_execution_events_total counter\n");
|
||||
output.push_str(&format!("ws_graph_execution_events_total {}\n\n", snapshot.graph_execution_events));
|
||||
|
||||
output.push_str("# HELP ws_node_execution_events_total Total number of node execution events\n");
|
||||
output.push_str(&format!(
|
||||
"ws_graph_execution_events_total {}\n\n",
|
||||
snapshot.graph_execution_events
|
||||
));
|
||||
|
||||
output.push_str(
|
||||
"# HELP ws_node_execution_events_total Total number of node execution events\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_node_execution_events_total counter\n");
|
||||
output.push_str(&format!("ws_node_execution_events_total {}\n\n", snapshot.node_execution_events));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_node_execution_events_total {}\n\n",
|
||||
snapshot.node_execution_events
|
||||
));
|
||||
|
||||
// Channel metrics
|
||||
output.push_str("# HELP ws_channels_active Number of active channels\n");
|
||||
output.push_str("# TYPE ws_channels_active gauge\n");
|
||||
output.push_str(&format!("ws_channels_active {}\n\n", snapshot.channels_active_count));
|
||||
|
||||
output.push_str("# HELP ws_total_subscribers Total number of subscribers across all channels\n");
|
||||
output.push_str(&format!(
|
||||
"ws_channels_active {}\n\n",
|
||||
snapshot.channels_active_count
|
||||
));
|
||||
|
||||
output.push_str(
|
||||
"# HELP ws_total_subscribers Total number of subscribers across all channels\n",
|
||||
);
|
||||
output.push_str("# TYPE ws_total_subscribers gauge\n");
|
||||
output.push_str(&format!("ws_total_subscribers {}\n\n", snapshot.total_subscribers));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_total_subscribers {}\n\n",
|
||||
snapshot.total_subscribers
|
||||
));
|
||||
|
||||
// User metrics
|
||||
output.push_str("# HELP ws_active_users Number of unique users with active connections\n");
|
||||
output.push_str("# TYPE ws_active_users gauge\n");
|
||||
output.push_str(&format!("ws_active_users {}\n\n", snapshot.active_users_count));
|
||||
|
||||
output.push_str(&format!(
|
||||
"ws_active_users {}\n\n",
|
||||
snapshot.active_users_count
|
||||
));
|
||||
|
||||
// Error metrics
|
||||
output.push_str("# HELP ws_errors_total Total number of errors\n");
|
||||
output.push_str("# TYPE ws_errors_total counter\n");
|
||||
output.push_str(&format!("ws_errors_total {}\n", snapshot.errors_total));
|
||||
|
||||
|
||||
output
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user