ci: optimize workflow with parallel jobs and better caching (#42)

* ci: optimize performance with better caching and parallel execution

* ci: hybrid pipeline with strategic build->test sequencing

* ci: try other way

* ci: seperate lint too, try ubuntu-latest-arm64 and some cargo optimize keys

* ci: remove ubuntu-latest-arm64

* ci: back to manual cache

* ci: add needs: [build-stable-release] for lint

* ci: fix CI lint and simplify command for lint and doc CI jobs

* chore: alot of clippy warning fix because of new Rust version

* ci: make all the task parallel

* chore: rerun CI workflow
This commit is contained in:
Vinh Trịnh
2025-07-09 12:35:02 +07:00
committed by GitHub
parent 559cc856c0
commit e37a4b435f
9 changed files with 85 additions and 212 deletions

View File

@@ -8,7 +8,7 @@ on:
name: "CI"
env:
PROTOC_VERSION: '3.25.3'
PROTOC_VERSION: "3.25.3"
jobs:
lint:
@@ -16,148 +16,70 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: 'recursive'
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy
- name: Cache protoc
uses: actions/cache@v4
- uses: Swatinem/rust-cache@v2
with:
path: ~/.local/bin
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }}
restore-keys: |
protoc-${{ env.PROTOC_VERSION }}-
protoc-
- name: install protoc
shared-key: "stable"
- name: Install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- name: Cache Rust dependencies
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
rust-${{ runner.os }}-
- name: Check formatting
run: cargo fmt --all --check
- name: Run clippy
run: cargo clippy --all-features --tests -- -D warnings
unused_dependencies:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: 'recursive'
- uses: dtolnay/rust-toolchain@nightly
- name: Cache protoc
uses: actions/cache@v4
with:
path: ~/.local/bin
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }}
restore-keys: |
protoc-${{ env.PROTOC_VERSION }}-
protoc-
- name: install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- name: Cache cargo-udeps
uses: actions/cache@v4
with:
path: ~/.cargo/bin
key: cargo-udeps-${{ runner.os }}
- name: install cargo-udeps
uses: taiki-e/install-action@cargo-udeps
- name: Cache Rust dependencies
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
rust-${{ runner.os }}-
- name: cargo udeps
run: cargo +nightly udeps
- name: cargo udeps all-targets
run: cargo +nightly udeps --all-targets
docs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: 'recursive'
- name: Cache protoc
uses: actions/cache@v4
with:
path: ~/.local/bin
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }}
restore-keys: |
protoc-${{ env.PROTOC_VERSION }}-
protoc-
- name: install protoc
- name: Install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- uses: dtolnay/rust-toolchain@stable
- name: Cache Rust dependencies
uses: actions/cache@v4
- uses: Swatinem/rust-cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
rust-${{ runner.os }}-
- name: "doc --lib --all-features"
shared-key: "stable"
- name: Generate documentation
run: |
cargo doc --lib --no-deps --all-features --document-private-items
env:
RUSTDOCFLAGS: -Dwarnings
RUSTDOCFLAGS: -Dwarnings
test:
needs: [lint, unused_dependencies]
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
submodules: 'recursive'
- name: Cache protoc
uses: actions/cache@v4
with:
path: ~/.local/bin
key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }}
restore-keys: |
protoc-${{ env.PROTOC_VERSION }}-
protoc-
- name: install protoc
shared-key: "stable"
- name: Install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- uses: dtolnay/rust-toolchain@stable
- name: Cache Rust dependencies
uses: actions/cache@v4
- name: Test
run: cargo test --release
unused_dependencies:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- uses: Swatinem/rust-cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
rust-${{ runner.os }}-
- name: Build and test
run: |
cargo build --release
cargo test --release
shared-key: "nightly"
- name: Install protoc
uses: taiki-e/install-action@v2
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- name: Install cargo-udeps
uses: taiki-e/install-action@cargo-udeps
- name: Check for unused dependencies
run: cargo +nightly udeps --all-targets

View File

@@ -49,12 +49,12 @@ impl WakuNode<Initialized> {
.expect("Failed to send message to waku");
}
WakuEvent::RelayTopicHealthChange(evt) => {
info!("Relay topic change evt: {:?}", evt);
info!("Relay topic change evt: {evt:?}");
}
WakuEvent::ConnectionChange(evt) => {
info!("Conn change evt: {:?}", evt);
info!("Conn change evt: {evt:?}");
}
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
WakuEvent::Unrecognized(e) => panic!("Unrecognized waku event: {e:?}"),
_ => panic!("event case not expected"),
};
}
@@ -65,7 +65,7 @@ impl WakuNode<Initialized> {
.expect("set event call back working");
let waku = self.node.start().await.map_err(|e| {
debug!("Failed to start the Waku Node: {:?}", e);
debug!("Failed to start the Waku Node: {e:?}");
DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string())
})?;
@@ -94,7 +94,7 @@ impl WakuNode<Running> {
.relay_publish_message(&waku_message, &pubsub_topic(), None)
.await
.map_err(|e| {
error!("Failed to relay publish the message: {:?}", e);
error!("Failed to relay publish the message: {e:?}");
DeliveryServiceError::WakuPublishMessageError(e)
})?;
@@ -106,19 +106,19 @@ impl WakuNode<Running> {
peer_addresses: Vec<Multiaddr>,
) -> Result<(), DeliveryServiceError> {
for peer_address in peer_addresses {
info!("Connecting to peer: {:?}", peer_address);
info!("Connecting to peer: {peer_address:?}");
self.node
.connect(&peer_address, None)
.await
.map_err(|e| DeliveryServiceError::WakuConnectPeerError(e.to_string()))?;
info!("Connected to peer: {:?}", peer_address);
info!("Connected to peer: {peer_address:?}");
}
Ok(())
}
pub async fn listen_addresses(&self) -> Result<Vec<Multiaddr>, DeliveryServiceError> {
let addresses = self.node.listen_addresses().await.map_err(|e| {
debug!("Failed to get the listen addresses: {:?}", e);
debug!("Failed to get the listen addresses: {e:?}");
DeliveryServiceError::WakuGetListenAddressesError(e)
})?;
@@ -194,7 +194,7 @@ pub async fn run_waku_node(
while let Some(msg) = reciever.recv().await {
info!("Received message to send to waku");
let id = waku_node.send_message(msg).await?;
info!("Successfully publish message with id: {:?}", id);
info!("Successfully publish message with id: {id:?}");
}
Ok(())

View File

@@ -44,7 +44,7 @@ pub async fn handle_user_actions(
.retain(|topic| topic.application_name != group_name);
info!("Leave group: {:?}", &group_name);
let app_message = wrap_conversation_message_into_application_msg(
format!("You're removed from the group {}", group_name).into_bytes(),
format!("You're removed from the group {group_name}").into_bytes(),
"system".to_string(),
group_name.clone(),
);
@@ -93,11 +93,8 @@ pub async fn handle_ws_action(
.await?;
let app_message = wrap_conversation_message_into_application_msg(
format!(
"Remove proposal for user {} added to steward queue",
user_to_ban
)
.into_bytes(),
format!("Remove proposal for user {user_to_ban} added to steward queue")
.into_bytes(),
"system".to_string(),
group_name.clone(),
);

View File

@@ -325,14 +325,14 @@ impl Group {
/// Get the number of pending proposals for the current epoch
pub async fn get_pending_proposals_count(&self) -> usize {
let count = self.state_machine.get_current_epoch_proposals_count().await;
info!("State machine reports {} current epoch proposals", count);
info!("State machine reports {count} current epoch proposals");
count
}
/// Get the number of pending proposals for the voting epoch
pub async fn get_voting_proposals_count(&self) -> usize {
let count = self.state_machine.get_voting_epoch_proposals_count().await;
info!("State machine reports {} voting proposals", count);
info!("State machine reports {count} voting proposals");
count
}

View File

@@ -83,12 +83,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::select! {
result = recv_messages => {
if let Err(w) = result {
error!("Error receiving messages from waku: {}", w);
error!("Error receiving messages from waku: {w}");
}
}
result = server_task => {
if let Err(e) = result {
error!("Error hosting server: {}", e);
error!("Error hosting server: {e}");
}
}
}
@@ -110,7 +110,7 @@ async fn run_server(
.with_state(app_state)
.layer(cors);
info!("Hosted on {:?}", addr);
info!("Hosted on {addr:?}");
axum::Server::bind(&addr)
.serve(app.into_make_service())
@@ -142,14 +142,14 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
if !rooms.contains(&connect.group_id.clone()) {
rooms.insert(connect.group_id.clone());
}
info!("Prepare info for main loop: {:?}", main_loop_connection);
info!("Prepare info for main loop: {main_loop_connection:?}");
break;
}
Ok(_) => {
info!("Got chat message for non-existent user");
}
Err(e) => error!("Error handling message: {}", e),
Err(e) => error!("Error handling message: {e}"),
}
}
@@ -169,10 +169,10 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
let content_topic = msg.content_topic.clone();
// Check if message belongs to a relevant topic
if !match_content_topic(&state_clone.content_topics, &content_topic) {
error!("Content topic not match: {:?}", content_topic);
error!("Content topic not match: {content_topic:?}");
return;
};
info!("Received message from waku that matches content topic",);
info!("Received message from waku that matches content topic");
let res = handle_user_actions(
msg,
state_clone.waku_node.clone(),
@@ -183,7 +183,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
)
.await;
if let Err(e) = res {
error!("Error handling waku message: {}", e);
error!("Error handling waku message: {e}");
}
}
});
@@ -201,7 +201,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
)
.await;
if let Err(e) = res {
error!("Error handling websocket message: {}", e);
error!("Error handling websocket message: {e}");
}
}
})

View File

@@ -44,7 +44,7 @@ impl Display for GroupState {
GroupState::Waiting => "Waiting - Steward epoch active",
GroupState::Voting => "Voting - Vote in progress",
};
write!(f, "{}", state)
write!(f, "{state}")
}
}

View File

@@ -328,7 +328,7 @@ impl User {
return Ok(UserAction::DoNothing);
}
let ct_name = msg.content_topic.content_topic_name.to_string();
info!("Processing waku message from content topic: {:?}", ct_name);
info!("Processing waku message from content topic: {ct_name:?}");
match ct_name.as_str() {
WELCOME_SUBTOPIC => self.process_welcome_subtopic(msg, group_name).await,
APP_MSG_SUBTOPIC => self.process_app_subtopic(msg, group_name).await,
@@ -469,7 +469,7 @@ impl User {
}
// There are proposals, start the steward epoch
info!("Starting steward epoch with {} proposals", proposal_count);
info!("Starting steward epoch with {proposal_count} proposals");
group.start_steward_epoch().await?;
Ok(proposal_count)
}
@@ -491,10 +491,7 @@ impl User {
.map(alloy::hex::encode)
.collect::<Vec<_>>()
);
println!(
"User: start_voting - current user identity: {}",
current_identity
);
println!("User: start_voting - current user identity: {current_identity}");
let vote_id = uuid::Uuid::new_v4().as_bytes().to_vec();
Ok(vote_id)
@@ -522,11 +519,8 @@ impl User {
/// Submit a vote for the given vote ID.
pub async fn submit_vote(&mut self, vote_id: Vec<u8>, vote: bool) -> Result<(), UserError> {
let current_identity = self.identity.identity_string();
info!("User: submit_vote - vote_id: {:?}, vote: {}", vote_id, vote);
info!(
"User: submit_vote - current user identity: {}",
current_identity
);
info!("User: submit_vote - vote_id: {vote_id:?}, vote: {vote}");
info!("User: submit_vote - current user identity: {current_identity}");
Ok(())
}
@@ -541,7 +535,7 @@ impl User {
None => return Err(UserError::GroupNotFoundError(group_name)),
};
let identity_bytes = alloy::hex::decode(&identity)
.map_err(|e| UserError::ApplyProposalsError(format!("Invalid hex string: {}", e)))?;
.map_err(|e| UserError::ApplyProposalsError(format!("Invalid hex string: {e}")))?;
group.store_remove_proposal(identity_bytes).await?;
Ok(())
}

View File

@@ -33,7 +33,7 @@ pub async fn create_user_instance(
.map_err(|e| UserError::KameoCreateGroupError(e.to_string()))?;
let mut content_topics = build_content_topics(&group_name);
info!("Building content topics: {:?}", content_topics);
info!("Building content topics: {content_topics:?}");
app_state
.content_topics
.lock()
@@ -41,10 +41,7 @@ pub async fn create_user_instance(
.append(&mut content_topics);
if connection.should_create_group {
info!(
"User {:?} start sending steward message for group {:?}",
user_address, group_name
);
info!("User {user_address:?} start sending steward message for group {group_name:?}");
let user_clone = user_ref.clone();
let group_name_clone = group_name.clone();
tokio::spawn(async move {
@@ -62,7 +59,7 @@ pub async fn create_user_instance(
Ok::<(), UserError>(())
}
.await
.inspect_err(|e| error!("Error sending steward message to waku: {}", e));
.inspect_err(|e| error!("Error sending steward message to waku: {e}"));
}
});
};
@@ -81,7 +78,7 @@ pub async fn handle_steward_flow_per_epoch(
group_name: String,
app_state: Arc<AppState>,
) -> Result<(), UserError> {
info!("Starting steward epoch for group: {}", group_name);
info!("Starting steward epoch for group: {group_name}");
// Step 1: Start steward epoch - check for proposals and start epoch if needed
let proposals_count = user
@@ -101,22 +98,12 @@ pub async fn handle_steward_flow_per_epoch(
app_state.waku_node.send(msg).await?;
if proposals_count == 0 {
info!(
"No proposals to vote on for group: {}, completing epoch without voting",
group_name
);
info!(
"Steward epoch completed for group: {} (no proposals)",
group_name
);
info!("No proposals to vote on for group: {group_name}, completing epoch without voting");
info!("Steward epoch completed for group: {group_name} (no proposals)");
return Ok(());
}
info!(
"Found {} proposals to vote on for group: {}",
proposals_count, group_name
);
info!("Found {proposals_count} proposals to vote on for group: {group_name}");
// Step 3: Start voting process
let vote_id = user
@@ -126,10 +113,7 @@ pub async fn handle_steward_flow_per_epoch(
.await
.map_err(|e| UserError::ProcessProposalsError(e.to_string()))?;
info!(
"Started voting with vote_id: {:?} for group: {}",
vote_id, group_name
);
info!("Started voting with vote_id: {vote_id:?} for group: {group_name}");
// Step 4: Complete voting (in a real implementation, this would wait for actual votes)
// For now, we'll simulate the voting process
@@ -141,10 +125,7 @@ pub async fn handle_steward_flow_per_epoch(
.await
.map_err(|e| UserError::ApplyProposalsError(e.to_string()))?;
info!(
"Voting completed with result: {} for group: {}",
vote_result, group_name
);
info!("Voting completed with result: {vote_result} for group: {group_name}");
// Step 5: If vote passed, apply proposals and complete
if vote_result {
@@ -160,15 +141,9 @@ pub async fn handle_steward_flow_per_epoch(
app_state.waku_node.send(msg).await?;
}
info!(
"Proposals applied and steward epoch completed for group: {}",
group_name
);
info!("Proposals applied and steward epoch completed for group: {group_name}");
} else {
info!(
"Vote failed, returning to working state for group: {}",
group_name
);
info!("Vote failed, returning to working state for group: {group_name}");
}
user.ask(RemoveProposalsAndCompleteRequest {
@@ -177,9 +152,6 @@ pub async fn handle_steward_flow_per_epoch(
.await
.map_err(|e| UserError::ApplyProposalsError(e.to_string()))?;
info!(
"Removing proposals and completing steward epoch for group: {}",
group_name
);
info!("Removing proposals and completing steward epoch for group: {group_name}");
Ok(())
}

View File

@@ -81,10 +81,7 @@ async fn test_invite_users_flow() {
.get_pending_proposals_count(group_name.clone())
.await
.expect("Failed to get proposal count");
println!(
"Debug: Proposal count before steward epoch: {}",
proposal_count_before
);
println!("Debug: Proposal count before steward epoch: {proposal_count_before}");
// Add Bob and Carol to the group initially using steward epoch flow
// State machine: start steward epoch, voting, complete voting
@@ -93,10 +90,7 @@ async fn test_invite_users_flow() {
.await
.expect("Failed to start steward epoch");
println!(
"Debug: Steward epoch returned {} proposals",
steward_epoch_proposals
);
println!("Debug: Steward epoch returned {steward_epoch_proposals} proposals");
let vote_id = alice
.start_voting(group_name.clone())
@@ -146,7 +140,7 @@ async fn test_invite_users_flow() {
.process_waku_message(bob_res_waku_message.clone())
.await
.expect("Failed to process waku message");
println!("Alice result: {:?}", res_alice);
println!("Alice result: {res_alice:?}");
let res_alice_msg = match res_alice {
UserAction::SendToApp(msg) => msg,
_ => panic!("User action is not SendToApp"),
@@ -170,7 +164,7 @@ async fn test_invite_users_flow() {
)
.await
.expect("Failed to process waku message");
println!("Carol result: {:?}", res_carol);
println!("Carol result: {res_carol:?}");
let carol_group = carol
.get_group(group_name.clone())
@@ -298,7 +292,7 @@ async fn test_add_user_in_different_epoch() {
);
// Submit a vote (Alice votes yes for her own proposals)
println!("Test: Submitting vote with ID: {:?}", vote_id);
println!("Test: Submitting vote with ID: {vote_id:?}");
println!("Test: Alice's identity: {}", alice.identity_string());
alice
.submit_vote(vote_id.clone(), true)
@@ -397,7 +391,7 @@ async fn test_add_user_in_different_epoch() {
);
// Submit a vote (Alice votes yes for her own proposals)
println!("Test: Submitting vote with ID: {:?}", vote_id);
println!("Test: Submitting vote with ID: {vote_id:?}");
println!("Test: Alice's identity: {}", alice.identity_string());
alice
.submit_vote(vote_id.clone(), true)
@@ -446,7 +440,7 @@ async fn test_add_user_in_different_epoch() {
.expect("Failed to process waku message apply commit to the Bob")
{
UserAction::SendToWaku(msg) => {
println!("Bob action is SendToWaku: {:?}", msg);
println!("Bob action is SendToWaku: {msg:?}");
}
UserAction::DoNothing => {
println!("Bob action is DoNothing");
@@ -581,10 +575,7 @@ async fn test_remove_user_flow() {
.get_pending_proposals_count(group_name.clone())
.await
.expect("Failed to get proposal count");
println!(
"Debug: Proposal count before steward epoch: {}",
proposal_count_before
);
println!("Debug: Proposal count before steward epoch: {proposal_count_before}");
// Add Bob and Carol to the group initially using steward epoch flow
// State machine: start steward epoch, voting, complete voting
@@ -593,10 +584,7 @@ async fn test_remove_user_flow() {
.await
.expect("Failed to start steward epoch");
println!(
"Debug: Steward epoch returned {} proposals",
steward_epoch_proposals
);
println!("Debug: Steward epoch returned {steward_epoch_proposals} proposals");
let vote_id = alice
.start_voting(group_name.clone())
@@ -646,7 +634,7 @@ async fn test_remove_user_flow() {
.process_waku_message(bob_res_waku_message.clone())
.await
.expect("Failed to process waku message");
println!("Alice result: {:?}", res_alice);
println!("Alice result: {res_alice:?}");
let res_alice_msg = match res_alice {
UserAction::SendToApp(msg) => msg,
_ => panic!("User action is not SendToApp"),
@@ -670,7 +658,7 @@ async fn test_remove_user_flow() {
)
.await
.expect("Failed to process waku message");
println!("Carol result: {:?}", res_carol);
println!("Carol result: {res_carol:?}");
let carol_group = carol
.get_group(group_name.clone())
@@ -778,7 +766,7 @@ async fn test_remove_user_flow() {
.expect("Failed to start voting (removal)");
// Submit a vote (Alice votes yes for the removal)
println!("Test: Submitting vote with ID: {:?}", vote_id);
println!("Test: Submitting vote with ID: {vote_id:?}");
println!("Test: Alice's identity: {}", alice.identity_string());
alice
.submit_vote(vote_id.clone(), true)