Files
de-mls/tests/consensus_realtime_test.rs
Ekaterina Broslavskaya 4ea1136012 chore: implement consensus mechanism (#43)
* chore: implement consensus mechanism

- Updated `Cargo.lock`
- Refactored `Cargo.toml`
- Enhanced `action_handlers.rs` to introduce a ban request feature, allowing users to send ban requests through Waku.
- Implemented a new consensus module to manage proposal and voting processes, including state transitions for steward epochs.
- Updated error handling in `error.rs` to accommodate new consensus-related errors.
- Refactored `group.rs` and `user_actor.rs` to integrate the new consensus logic and improve state management.
- Added tests for the consensus mechanism to ensure reliability and correctness of the voting process.

* chore: update dependencies and refactor code for clarity

* refactor: update voting mechanism and clean up user actor logic

- Changed the return type of `complete_voting_for_steward` to return a vector of messages instead of a boolean.
- Removed unused request types and their implementations related to proposal handling.
- Updated the `handle_steward_flow_per_epoch` function to reflect changes in the voting process and improved logging.
- Refactored tests to align with the new voting mechanism and ensure proper message handling.
- Enhanced consensus logic to better handle vote counting and state transitions.

* feat: implement real voting for all users

- Improved logging messages for clarity during WebSocket message handling.
- Added new serverless functions and shims for better integration with the frontend.
- Introduced new manifest files for server configuration and routing.
- Implemented initial setup for handling user votes and proposals in the consensus mechanism.
- Updated error handling to accommodate new user vote actions and related messages.

* consensus: update test

* Enhance steward state management and consensus mechanism

- Added detailed documentation in README for steward state management, including state definitions, transitions, and flow scenarios.
- Updated `WakuNode` connection logic to include a timeout for peer connections.
- Refactored message handling in `action_handlers.rs` to utilize new `AppMessage` structures for improved clarity.
- Enhanced error handling in `error.rs` to cover new scenarios related to consensus and state transitions.
- Updated tests to reflect changes in the consensus mechanism and steward flow, ensuring robustness and reliability.
- Improved state machine logic to handle edge cases and guarantee proper state transitions during steward epochs.

* Refactor

- Updated `WakuMessageToSend` constructor to accept a slice for `app_id`, improving memory efficiency.
- Enhanced error handling in `Group` struct to provide more descriptive error messages for invalid states.
- Added detailed documentation for new methods in `Group` and `User` structs, clarifying their functionality and usage.
- Refactored state machine logic to ensure proper transitions during steward epochs and voting processes.
- Improved test coverage for group state management and message processing, ensuring robustness in various scenarios.

* Update README for improved clarity and formatting

* fix: fix lint issues and updates test flow

* test: update user test
2025-09-10 16:42:43 +03:00

600 lines
22 KiB
Rust

use alloy::signers::local::PrivateKeySigner;
use de_mls::consensus::{compute_vote_hash, ConsensusEvent, ConsensusService};
use de_mls::protos::messages::v1::consensus::v1::Vote;
use de_mls::LocalSigner;
use prost::Message;
use std::time::Duration;
use uuid::Uuid;
#[tokio::test]
async fn test_realtime_consensus_waiting() {
// Create consensus service
let consensus_service = ConsensusService::new();
let group_name = "test_group_realtime";
let expected_voters_count = 3;
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
// Create a proposal
let proposal = consensus_service
.create_proposal(
group_name,
"Test Proposal".to_string(),
"Test payload".to_string(),
proposal_owner,
expected_voters_count,
300,
true,
)
.await
.expect("Failed to create proposal");
let proposal = consensus_service
.vote_on_proposal(group_name, proposal.proposal_id, true, signer)
.await
.expect("Failed to vote on proposal");
println!("Created proposal with ID: {}", proposal.proposal_id);
// Subscribe to consensus events
let mut consensus_events = consensus_service.subscribe_to_events();
let proposal_id = proposal.proposal_id;
// Start a background task that waits for consensus events
let group_name_clone = group_name;
let consensus_waiter = tokio::spawn(async move {
println!("Starting consensus event waiter for proposal {proposal_id:?}");
// Wait for consensus event with timeout
let timeout_duration = Duration::from_secs(10);
match tokio::time::timeout(timeout_duration, async {
while let Ok((event_group_name, event)) = consensus_events.recv().await {
if event_group_name == group_name_clone {
match event {
ConsensusEvent::ConsensusReached {
proposal_id: event_proposal_id,
result,
} => {
if event_proposal_id == proposal_id {
println!("Consensus reached for proposal {proposal_id}: {result}");
return Ok(result);
}
}
ConsensusEvent::ConsensusFailed {
proposal_id: event_proposal_id,
reason,
} => {
if event_proposal_id == proposal_id {
println!("Consensus failed for proposal {proposal_id}: {reason}");
return Err(format!("Consensus failed: {reason}"));
}
}
}
}
}
Err("Event channel closed".to_string())
})
.await
{
Ok(result) => {
println!("Consensus event waiter result: {result:?}");
result
}
Err(_) => {
println!("Consensus event waiter timed out");
Err("Timeout waiting for consensus".to_string())
}
}
});
// Wait a bit to ensure the waiter is running
tokio::time::sleep(Duration::from_millis(100)).await;
// Add votes to reach consensus
let mut previous_vote_hash = proposal.votes[0].vote_hash.clone(); // Start with steward's vote hash
for i in 1..expected_voters_count {
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
let mut vote = Vote {
vote_id: Uuid::new_v4().as_u128() as u32,
vote_owner: proposal_owner,
proposal_id: proposal.proposal_id,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Failed to get current time")
.as_secs(),
vote: true,
parent_hash: Vec::new(),
received_hash: previous_vote_hash.clone(), // Reference previous vote's hash
vote_hash: Vec::new(),
signature: Vec::new(),
};
// Compute vote hash
vote.vote_hash = compute_vote_hash(&vote);
let vote_bytes = vote.encode_to_vec();
vote.signature = signer
.local_sign_message(&vote_bytes)
.await
.expect("Failed to sign vote");
println!("Adding vote {} for proposal {}", i, proposal.proposal_id);
consensus_service
.process_incoming_vote(group_name, vote.clone())
.await
.expect("Failed to process vote");
// Update previous vote hash for next iteration
previous_vote_hash = vote.vote_hash.clone();
// Small delay between votes
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Wait for consensus result
let consensus_result = consensus_waiter
.await
.expect("Consensus waiter task failed");
// Verify consensus was reached
assert!(consensus_result.is_ok());
let result = consensus_result.unwrap();
assert!(result); // Should be true (yes votes)
println!("Test completed successfully - consensus reached!");
}
#[tokio::test]
async fn test_consensus_timeout() {
// Create consensus service
let consensus_service = ConsensusService::new();
let group_name = "test_group_timeout";
let expected_voters_count = 5;
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
// Need 4 votes for consensus
// Create a proposal
let proposal = consensus_service
.create_proposal(
group_name,
"Test Proposal".to_string(),
"Test payload".to_string(),
proposal_owner,
expected_voters_count,
300,
true,
)
.await
.expect("Failed to create proposal");
let proposal = consensus_service
.vote_on_proposal(group_name, proposal.proposal_id, true, signer)
.await
.expect("Failed to vote on proposal");
println!("Created proposal with ID: {}", proposal.proposal_id);
// Subscribe to consensus events for timeout test
let mut consensus_events = consensus_service.subscribe_to_events();
let proposal_id = proposal.proposal_id;
// Start consensus event waiter with timeout
let group_name_clone = group_name;
let consensus_waiter = tokio::spawn(async move {
println!("Starting consensus event waiter with timeout for proposal {proposal_id:?}");
// Wait for consensus event - should timeout and trigger liveness criteria
let timeout_duration = Duration::from_secs(12); // Wait longer than consensus timeout (10s)
match tokio::time::timeout(timeout_duration, async {
while let Ok((event_group_name, event)) = consensus_events.recv().await {
if event_group_name == group_name_clone {
match event {
ConsensusEvent::ConsensusReached { proposal_id: event_proposal_id, result } => {
if event_proposal_id == proposal_id {
println!("Consensus reached for proposal {proposal_id}: {result} (via timeout/liveness criteria)");
return Ok(result);
}
}
ConsensusEvent::ConsensusFailed { proposal_id: event_proposal_id, reason } => {
if event_proposal_id == proposal_id {
println!("Consensus failed for proposal {proposal_id}: {reason}");
return Err(format!("Consensus failed: {reason}"));
}
}
}
}
}
Err("Event channel closed".to_string())
}).await {
Ok(result) => result,
Err(_) => Err("Test timeout waiting for consensus event".to_string())
}
});
// Don't add any additional votes - should timeout and apply liveness criteria
// Wait for consensus result
let consensus_result = consensus_waiter
.await
.expect("Consensus waiter task failed");
// Verify timeout occurred and liveness criteria was applied
// With liveness_criteria_yes = true, should return Ok(true)
assert!(consensus_result.is_ok());
let result = consensus_result.unwrap();
assert!(result); // Should be true due to liveness criteria
println!("Test completed successfully - timeout occurred and liveness criteria applied!");
}
#[tokio::test]
async fn test_consensus_with_mixed_votes() {
// Create consensus service
let consensus_service = ConsensusService::new();
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
let group_name = "test_group_mixed";
let expected_voters_count = 3;
// Create a proposal
let proposal = consensus_service
.create_proposal(
group_name,
"Test Proposal".to_string(),
"Test payload".to_string(),
proposal_owner,
expected_voters_count,
300,
true,
)
.await
.expect("Failed to create proposal");
let proposal = consensus_service
.vote_on_proposal(group_name, proposal.proposal_id, true, signer)
.await
.expect("Failed to vote on proposal");
println!("Created proposal with ID: {}", proposal.proposal_id);
// Subscribe to consensus events
let mut consensus_events = consensus_service.subscribe_to_events();
let proposal_id = proposal.proposal_id;
// Start a background task that waits for consensus events
let group_name_clone = group_name;
let consensus_waiter = tokio::spawn(async move {
println!("Starting consensus event waiter for proposal {proposal_id:?}");
// Wait for consensus event with timeout
let timeout_duration = Duration::from_secs(15); // Allow time for votes to be processed
match tokio::time::timeout(timeout_duration, async {
while let Ok((event_group_name, event)) = consensus_events.recv().await {
if event_group_name == group_name_clone {
match event {
ConsensusEvent::ConsensusReached {
proposal_id: event_proposal_id,
result,
} => {
if event_proposal_id == proposal_id {
println!("Consensus reached for proposal {proposal_id}: {result}");
return Ok(result);
}
}
ConsensusEvent::ConsensusFailed {
proposal_id: event_proposal_id,
reason,
} => {
if event_proposal_id == proposal_id {
println!("Consensus failed for proposal {proposal_id}: {reason}");
return Err(format!("Consensus failed: {reason}"));
}
}
}
}
}
Err("Event channel closed".to_string())
})
.await
{
Ok(result) => {
println!("Consensus event waiter result: {result:?}");
result
}
Err(_) => {
println!("Consensus event waiter timed out");
Err("Timeout waiting for consensus".to_string())
}
}
});
// Wait a bit to ensure the waiter is running
tokio::time::sleep(Duration::from_millis(100)).await;
// Add mixed votes: one yes, one no
let votes = vec![(2, false), (3, false)];
let mut previous_vote_hash = proposal.votes[0].vote_hash.clone(); // Start with steward's vote hash
for (i, vote_value) in votes {
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
let mut vote = Vote {
vote_id: Uuid::new_v4().as_u128() as u32,
vote_owner: proposal_owner,
proposal_id: proposal.proposal_id,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Failed to get current time")
.as_secs(),
vote: vote_value,
parent_hash: Vec::new(),
received_hash: previous_vote_hash.clone(), // Reference previous vote's hash
vote_hash: Vec::new(),
signature: Vec::new(),
};
// Compute vote hash
vote.vote_hash = compute_vote_hash(&vote);
let vote_bytes = vote.encode_to_vec();
vote.signature = signer
.local_sign_message(&vote_bytes)
.await
.expect("Failed to sign vote");
println!(
"Adding vote {} (value: {}) for proposal {}",
i, vote_value, proposal.proposal_id
);
consensus_service
.process_incoming_vote(group_name, vote.clone())
.await
.expect("Failed to process vote");
// Update previous vote hash for next iteration
previous_vote_hash = vote.vote_hash.clone();
// Small delay between votes
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Wait for consensus result
let consensus_result = consensus_waiter
.await
.expect("Consensus waiter task failed");
// Verify consensus was reached
assert!(consensus_result.is_ok());
let result = consensus_result.unwrap();
// With 2 no votes and 1 yes vote, consensus should be no (false)
// However, if it times out, liveness criteria (true) will be applied
println!("Mixed votes test result: {result}");
// Don't assert specific result since it depends on timing vs. liveness criteria
println!("Test completed successfully - consensus reached with mixed votes!");
}
#[tokio::test]
async fn test_rfc_vote_chain_validation() {
use de_mls::consensus::compute_vote_hash;
use de_mls::LocalSigner;
// Create consensus service
let consensus_service = ConsensusService::new();
let group_name = "test_rfc_validation";
let expected_voters_count = 3;
let signer1 = PrivateKeySigner::random();
let signer2 = PrivateKeySigner::random();
let _signer3 = PrivateKeySigner::random();
// Create first proposal with steward vote
let proposal = consensus_service
.create_proposal(
group_name,
"Test Proposal".to_string(),
"Test payload".to_string(),
signer1.address_bytes(),
expected_voters_count,
300,
true,
)
.await
.expect("Failed to create proposal");
let proposal = consensus_service
.vote_on_proposal(group_name, proposal.proposal_id, true, signer1)
.await
.expect("Failed to vote on proposal");
println!("Created proposal with ID: {}", proposal.proposal_id);
// Create second vote from different voter
let mut vote2 = Vote {
vote_id: Uuid::new_v4().as_u128() as u32,
vote_owner: signer2.address_bytes(),
proposal_id: proposal.proposal_id,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Failed to get current time")
.as_secs(),
vote: true,
parent_hash: Vec::new(), // Different voter, no parent
received_hash: proposal.votes[0].vote_hash.clone(), // Should be hash of first vote
vote_hash: Vec::new(),
signature: Vec::new(),
};
// Compute vote hash and signature
vote2.vote_hash = compute_vote_hash(&vote2);
let vote2_bytes = vote2.encode_to_vec();
vote2.signature = signer2
.local_sign_message(&vote2_bytes)
.await
.expect("Failed to sign vote");
// Create proposal with two votes from different voters
let mut test_proposal = proposal.clone();
test_proposal.votes.push(vote2.clone());
// Validate the proposal - should pass RFC validation
let validation_result = consensus_service.validate_proposal(&test_proposal);
assert!(
validation_result.is_ok(),
"RFC validation should pass: {validation_result:?}"
);
// Test invalid vote chain (wrong received_hash)
let mut invalid_proposal = test_proposal.clone();
invalid_proposal.votes[1].received_hash = vec![0; 32]; // Wrong hash
let invalid_result = consensus_service.validate_proposal(&invalid_proposal);
assert!(
invalid_result.is_err(),
"Invalid vote chain should be rejected"
);
println!("RFC vote chain validation test completed successfully!");
}
#[tokio::test]
async fn test_event_driven_timeout() {
// Create consensus service
let consensus_service = ConsensusService::new();
let group_name = "test_group_event_timeout";
let expected_voters_count = 3;
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
// Create a proposal with only one vote (steward vote) - should timeout and apply liveness criteria
let proposal = consensus_service
.create_proposal(
group_name,
"Test Proposal".to_string(),
"Test payload".to_string(),
proposal_owner,
expected_voters_count,
300,
true, // liveness criteria = true
)
.await
.expect("Failed to create proposal");
let proposal = consensus_service
.vote_on_proposal(group_name, proposal.proposal_id, true, signer)
.await
.expect("Failed to vote on proposal");
println!(
"Created proposal with ID: {} - waiting for timeout",
proposal.proposal_id
);
// Subscribe to consensus events
let mut consensus_events = consensus_service.subscribe_to_events();
let proposal_id = proposal.proposal_id;
let group_name_clone = group_name;
// Wait for consensus event (should timeout after 10 seconds and apply liveness criteria)
let timeout_duration = Duration::from_secs(12); // Wait longer than consensus timeout (10s)
let consensus_result = tokio::time::timeout(timeout_duration, async {
while let Ok((event_group_name, event)) = consensus_events.recv().await {
if event_group_name == group_name_clone {
match event {
ConsensusEvent::ConsensusReached {
proposal_id: event_proposal_id,
result,
} => {
if event_proposal_id == proposal_id {
println!("Consensus reached for proposal {proposal_id}: {result} (via timeout/liveness criteria)");
return result;
}
}
ConsensusEvent::ConsensusFailed {
proposal_id: event_proposal_id,
reason,
} => {
if event_proposal_id == proposal_id {
panic!("Consensus failed for proposal {proposal_id}: {reason}");
}
}
}
}
}
panic!("Event channel closed unexpectedly");
})
.await
.expect("Timeout waiting for consensus event");
// Should be true due to liveness criteria
assert!(consensus_result);
println!("Test completed successfully - event-driven timeout worked!");
}
#[tokio::test]
async fn test_liveness_criteria_functionality() {
// Create consensus service
let consensus_service = ConsensusService::new();
let group_name = "test_group_liveness";
let expected_voters_count = 3;
let signer = PrivateKeySigner::random();
let proposal_owner = signer.address_bytes();
// Test liveness criteria = false
let proposal_false = consensus_service
.create_proposal(
group_name,
"Test Proposal False".to_string(),
"Test payload".to_string(),
proposal_owner.clone(),
expected_voters_count,
300,
false, // liveness criteria = false
)
.await
.expect("Failed to create proposal with liveness_criteria_yes = false");
// Test liveness criteria getter
let liveness_false = consensus_service
.get_proposal_liveness_criteria(group_name, proposal_false.proposal_id)
.await;
assert_eq!(liveness_false, Some(false));
// Test liveness criteria = true
let proposal_true = consensus_service
.create_proposal(
group_name,
"Test Proposal True".to_string(),
"Test payload".to_string(),
proposal_owner,
expected_voters_count,
300,
true, // liveness criteria = true
)
.await
.expect("Failed to create proposal with liveness_criteria_yes = true");
// Test liveness criteria getter
let liveness_true = consensus_service
.get_proposal_liveness_criteria(group_name, proposal_true.proposal_id)
.await;
assert_eq!(liveness_true, Some(true));
// Test non-existent proposal
let liveness_none = consensus_service
.get_proposal_liveness_criteria("nonexistent", 99999)
.await;
assert_eq!(liveness_none, None);
println!("Test completed successfully - liveness criteria functionality verified!");
}