From e37a4b435f0db130f63e4ca532110bb5c854be05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vinh=20Tr=E1=BB=8Bnh?= <108657096+vinhtc27@users.noreply.github.com> Date: Wed, 9 Jul 2025 12:35:02 +0700 Subject: [PATCH] ci: optimize workflow with parallel jobs and better caching (#42) * ci: optimize performance with better caching and parallel execution * ci: hybrid pipeline with strategic build->test sequencing * ci: try other way * ci: seperate lint too, try ubuntu-latest-arm64 and some cargo optimize keys * ci: remove ubuntu-latest-arm64 * ci: back to manual cache * ci: add needs: [build-stable-release] for lint * ci: fix CI lint and simplify command for lint and doc CI jobs * chore: alot of clippy warning fix because of new Rust version * ci: make all the task parallel * chore: rerun CI workflow --- .github/workflows/ci.yml | 140 +++++++++------------------------------ ds/src/waku_actor.rs | 18 ++--- src/action_handlers.rs | 9 +-- src/group.rs | 4 +- src/main.rs | 18 ++--- src/state_machine.rs | 2 +- src/user.rs | 18 ++--- src/user_app_instance.rs | 52 ++++----------- tests/user_test.rs | 36 ++++------ 9 files changed, 85 insertions(+), 212 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5fb9008..a2cad66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ on: name: "CI" env: - PROTOC_VERSION: '3.25.3' + PROTOC_VERSION: "3.25.3" jobs: lint: @@ -16,148 +16,70 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - with: - submodules: 'recursive' - name: Install stable toolchain uses: dtolnay/rust-toolchain@stable with: components: rustfmt, clippy - - name: Cache protoc - uses: actions/cache@v4 + - uses: Swatinem/rust-cache@v2 with: - path: ~/.local/bin - key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }} - restore-keys: | - protoc-${{ env.PROTOC_VERSION }}- - protoc- - - name: install protoc + shared-key: "stable" + - name: Install protoc uses: taiki-e/install-action@v2 with: tool: protoc@${{ env.PROTOC_VERSION }} - - name: Cache Rust dependencies - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - rust-${{ runner.os }}- - name: Check formatting run: cargo fmt --all --check - name: Run clippy run: cargo clippy --all-features --tests -- -D warnings - unused_dependencies: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - with: - submodules: 'recursive' - - uses: dtolnay/rust-toolchain@nightly - - name: Cache protoc - uses: actions/cache@v4 - with: - path: ~/.local/bin - key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }} - restore-keys: | - protoc-${{ env.PROTOC_VERSION }}- - protoc- - - name: install protoc - uses: taiki-e/install-action@v2 - with: - tool: protoc@${{ env.PROTOC_VERSION }} - - name: Cache cargo-udeps - uses: actions/cache@v4 - with: - path: ~/.cargo/bin - key: cargo-udeps-${{ runner.os }} - - name: install cargo-udeps - uses: taiki-e/install-action@cargo-udeps - - name: Cache Rust dependencies - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - rust-${{ runner.os }}- - - name: cargo udeps - run: cargo +nightly udeps - - name: cargo udeps all-targets - run: cargo +nightly udeps --all-targets - docs: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v4 - with: - submodules: 'recursive' - - name: Cache protoc - uses: actions/cache@v4 - with: - path: ~/.local/bin - key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }} - restore-keys: | - protoc-${{ env.PROTOC_VERSION }}- - protoc- - - name: install protoc + - name: Install protoc uses: taiki-e/install-action@v2 with: tool: protoc@${{ env.PROTOC_VERSION }} - uses: dtolnay/rust-toolchain@stable - - name: Cache Rust dependencies - uses: actions/cache@v4 + - uses: Swatinem/rust-cache@v2 with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - rust-${{ runner.os }}- - - name: "doc --lib --all-features" + shared-key: "stable" + - name: Generate documentation run: | cargo doc --lib --no-deps --all-features --document-private-items env: - RUSTDOCFLAGS: -Dwarnings + RUSTDOCFLAGS: -Dwarnings test: - needs: [lint, unused_dependencies] runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 with: - submodules: 'recursive' - - name: Cache protoc - uses: actions/cache@v4 - with: - path: ~/.local/bin - key: protoc-${{ env.PROTOC_VERSION }}-${{ runner.os }} - restore-keys: | - protoc-${{ env.PROTOC_VERSION }}- - protoc- - - name: install protoc + shared-key: "stable" + - name: Install protoc uses: taiki-e/install-action@v2 with: tool: protoc@${{ env.PROTOC_VERSION }} - - uses: dtolnay/rust-toolchain@stable - - name: Cache Rust dependencies - uses: actions/cache@v4 + - name: Test + run: cargo test --release + + unused_dependencies: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@nightly + - uses: Swatinem/rust-cache@v2 with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - rust-${{ runner.os }}- - - name: Build and test - run: | - cargo build --release - cargo test --release \ No newline at end of file + shared-key: "nightly" + - name: Install protoc + uses: taiki-e/install-action@v2 + with: + tool: protoc@${{ env.PROTOC_VERSION }} + - name: Install cargo-udeps + uses: taiki-e/install-action@cargo-udeps + - name: Check for unused dependencies + run: cargo +nightly udeps --all-targets diff --git a/ds/src/waku_actor.rs b/ds/src/waku_actor.rs index e54448a..6a789a6 100644 --- a/ds/src/waku_actor.rs +++ b/ds/src/waku_actor.rs @@ -49,12 +49,12 @@ impl WakuNode { .expect("Failed to send message to waku"); } WakuEvent::RelayTopicHealthChange(evt) => { - info!("Relay topic change evt: {:?}", evt); + info!("Relay topic change evt: {evt:?}"); } WakuEvent::ConnectionChange(evt) => { - info!("Conn change evt: {:?}", evt); + info!("Conn change evt: {evt:?}"); } - WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + WakuEvent::Unrecognized(e) => panic!("Unrecognized waku event: {e:?}"), _ => panic!("event case not expected"), }; } @@ -65,7 +65,7 @@ impl WakuNode { .expect("set event call back working"); let waku = self.node.start().await.map_err(|e| { - debug!("Failed to start the Waku Node: {:?}", e); + debug!("Failed to start the Waku Node: {e:?}"); DeliveryServiceError::WakuNodeAlreadyInitialized(e.to_string()) })?; @@ -94,7 +94,7 @@ impl WakuNode { .relay_publish_message(&waku_message, &pubsub_topic(), None) .await .map_err(|e| { - error!("Failed to relay publish the message: {:?}", e); + error!("Failed to relay publish the message: {e:?}"); DeliveryServiceError::WakuPublishMessageError(e) })?; @@ -106,19 +106,19 @@ impl WakuNode { peer_addresses: Vec, ) -> Result<(), DeliveryServiceError> { for peer_address in peer_addresses { - info!("Connecting to peer: {:?}", peer_address); + info!("Connecting to peer: {peer_address:?}"); self.node .connect(&peer_address, None) .await .map_err(|e| DeliveryServiceError::WakuConnectPeerError(e.to_string()))?; - info!("Connected to peer: {:?}", peer_address); + info!("Connected to peer: {peer_address:?}"); } Ok(()) } pub async fn listen_addresses(&self) -> Result, DeliveryServiceError> { let addresses = self.node.listen_addresses().await.map_err(|e| { - debug!("Failed to get the listen addresses: {:?}", e); + debug!("Failed to get the listen addresses: {e:?}"); DeliveryServiceError::WakuGetListenAddressesError(e) })?; @@ -194,7 +194,7 @@ pub async fn run_waku_node( while let Some(msg) = reciever.recv().await { info!("Received message to send to waku"); let id = waku_node.send_message(msg).await?; - info!("Successfully publish message with id: {:?}", id); + info!("Successfully publish message with id: {id:?}"); } Ok(()) diff --git a/src/action_handlers.rs b/src/action_handlers.rs index f1d8763..fb78dc4 100644 --- a/src/action_handlers.rs +++ b/src/action_handlers.rs @@ -44,7 +44,7 @@ pub async fn handle_user_actions( .retain(|topic| topic.application_name != group_name); info!("Leave group: {:?}", &group_name); let app_message = wrap_conversation_message_into_application_msg( - format!("You're removed from the group {}", group_name).into_bytes(), + format!("You're removed from the group {group_name}").into_bytes(), "system".to_string(), group_name.clone(), ); @@ -93,11 +93,8 @@ pub async fn handle_ws_action( .await?; let app_message = wrap_conversation_message_into_application_msg( - format!( - "Remove proposal for user {} added to steward queue", - user_to_ban - ) - .into_bytes(), + format!("Remove proposal for user {user_to_ban} added to steward queue") + .into_bytes(), "system".to_string(), group_name.clone(), ); diff --git a/src/group.rs b/src/group.rs index 2fe7d7c..b74468d 100644 --- a/src/group.rs +++ b/src/group.rs @@ -325,14 +325,14 @@ impl Group { /// Get the number of pending proposals for the current epoch pub async fn get_pending_proposals_count(&self) -> usize { let count = self.state_machine.get_current_epoch_proposals_count().await; - info!("State machine reports {} current epoch proposals", count); + info!("State machine reports {count} current epoch proposals"); count } /// Get the number of pending proposals for the voting epoch pub async fn get_voting_proposals_count(&self) -> usize { let count = self.state_machine.get_voting_epoch_proposals_count().await; - info!("State machine reports {} voting proposals", count); + info!("State machine reports {count} voting proposals"); count } diff --git a/src/main.rs b/src/main.rs index d03ef9d..ed0bd04 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,12 +83,12 @@ async fn main() -> Result<(), Box> { tokio::select! { result = recv_messages => { if let Err(w) = result { - error!("Error receiving messages from waku: {}", w); + error!("Error receiving messages from waku: {w}"); } } result = server_task => { if let Err(e) = result { - error!("Error hosting server: {}", e); + error!("Error hosting server: {e}"); } } } @@ -110,7 +110,7 @@ async fn run_server( .with_state(app_state) .layer(cors); - info!("Hosted on {:?}", addr); + info!("Hosted on {addr:?}"); axum::Server::bind(&addr) .serve(app.into_make_service()) @@ -142,14 +142,14 @@ async fn handle_socket(socket: WebSocket, state: Arc) { if !rooms.contains(&connect.group_id.clone()) { rooms.insert(connect.group_id.clone()); } - info!("Prepare info for main loop: {:?}", main_loop_connection); + info!("Prepare info for main loop: {main_loop_connection:?}"); break; } Ok(_) => { info!("Got chat message for non-existent user"); } - Err(e) => error!("Error handling message: {}", e), + Err(e) => error!("Error handling message: {e}"), } } @@ -169,10 +169,10 @@ async fn handle_socket(socket: WebSocket, state: Arc) { let content_topic = msg.content_topic.clone(); // Check if message belongs to a relevant topic if !match_content_topic(&state_clone.content_topics, &content_topic) { - error!("Content topic not match: {:?}", content_topic); + error!("Content topic not match: {content_topic:?}"); return; }; - info!("Received message from waku that matches content topic",); + info!("Received message from waku that matches content topic"); let res = handle_user_actions( msg, state_clone.waku_node.clone(), @@ -183,7 +183,7 @@ async fn handle_socket(socket: WebSocket, state: Arc) { ) .await; if let Err(e) = res { - error!("Error handling waku message: {}", e); + error!("Error handling waku message: {e}"); } } }); @@ -201,7 +201,7 @@ async fn handle_socket(socket: WebSocket, state: Arc) { ) .await; if let Err(e) = res { - error!("Error handling websocket message: {}", e); + error!("Error handling websocket message: {e}"); } } }) diff --git a/src/state_machine.rs b/src/state_machine.rs index ee9c7fe..e702662 100644 --- a/src/state_machine.rs +++ b/src/state_machine.rs @@ -44,7 +44,7 @@ impl Display for GroupState { GroupState::Waiting => "Waiting - Steward epoch active", GroupState::Voting => "Voting - Vote in progress", }; - write!(f, "{}", state) + write!(f, "{state}") } } diff --git a/src/user.rs b/src/user.rs index 1a4bfdb..3fe64fc 100644 --- a/src/user.rs +++ b/src/user.rs @@ -328,7 +328,7 @@ impl User { return Ok(UserAction::DoNothing); } let ct_name = msg.content_topic.content_topic_name.to_string(); - info!("Processing waku message from content topic: {:?}", ct_name); + info!("Processing waku message from content topic: {ct_name:?}"); match ct_name.as_str() { WELCOME_SUBTOPIC => self.process_welcome_subtopic(msg, group_name).await, APP_MSG_SUBTOPIC => self.process_app_subtopic(msg, group_name).await, @@ -469,7 +469,7 @@ impl User { } // There are proposals, start the steward epoch - info!("Starting steward epoch with {} proposals", proposal_count); + info!("Starting steward epoch with {proposal_count} proposals"); group.start_steward_epoch().await?; Ok(proposal_count) } @@ -491,10 +491,7 @@ impl User { .map(alloy::hex::encode) .collect::>() ); - println!( - "User: start_voting - current user identity: {}", - current_identity - ); + println!("User: start_voting - current user identity: {current_identity}"); let vote_id = uuid::Uuid::new_v4().as_bytes().to_vec(); Ok(vote_id) @@ -522,11 +519,8 @@ impl User { /// Submit a vote for the given vote ID. pub async fn submit_vote(&mut self, vote_id: Vec, vote: bool) -> Result<(), UserError> { let current_identity = self.identity.identity_string(); - info!("User: submit_vote - vote_id: {:?}, vote: {}", vote_id, vote); - info!( - "User: submit_vote - current user identity: {}", - current_identity - ); + info!("User: submit_vote - vote_id: {vote_id:?}, vote: {vote}"); + info!("User: submit_vote - current user identity: {current_identity}"); Ok(()) } @@ -541,7 +535,7 @@ impl User { None => return Err(UserError::GroupNotFoundError(group_name)), }; let identity_bytes = alloy::hex::decode(&identity) - .map_err(|e| UserError::ApplyProposalsError(format!("Invalid hex string: {}", e)))?; + .map_err(|e| UserError::ApplyProposalsError(format!("Invalid hex string: {e}")))?; group.store_remove_proposal(identity_bytes).await?; Ok(()) } diff --git a/src/user_app_instance.rs b/src/user_app_instance.rs index 6627c78..bc45682 100644 --- a/src/user_app_instance.rs +++ b/src/user_app_instance.rs @@ -33,7 +33,7 @@ pub async fn create_user_instance( .map_err(|e| UserError::KameoCreateGroupError(e.to_string()))?; let mut content_topics = build_content_topics(&group_name); - info!("Building content topics: {:?}", content_topics); + info!("Building content topics: {content_topics:?}"); app_state .content_topics .lock() @@ -41,10 +41,7 @@ pub async fn create_user_instance( .append(&mut content_topics); if connection.should_create_group { - info!( - "User {:?} start sending steward message for group {:?}", - user_address, group_name - ); + info!("User {user_address:?} start sending steward message for group {group_name:?}"); let user_clone = user_ref.clone(); let group_name_clone = group_name.clone(); tokio::spawn(async move { @@ -62,7 +59,7 @@ pub async fn create_user_instance( Ok::<(), UserError>(()) } .await - .inspect_err(|e| error!("Error sending steward message to waku: {}", e)); + .inspect_err(|e| error!("Error sending steward message to waku: {e}")); } }); }; @@ -81,7 +78,7 @@ pub async fn handle_steward_flow_per_epoch( group_name: String, app_state: Arc, ) -> Result<(), UserError> { - info!("Starting steward epoch for group: {}", group_name); + info!("Starting steward epoch for group: {group_name}"); // Step 1: Start steward epoch - check for proposals and start epoch if needed let proposals_count = user @@ -101,22 +98,12 @@ pub async fn handle_steward_flow_per_epoch( app_state.waku_node.send(msg).await?; if proposals_count == 0 { - info!( - "No proposals to vote on for group: {}, completing epoch without voting", - group_name - ); - - info!( - "Steward epoch completed for group: {} (no proposals)", - group_name - ); + info!("No proposals to vote on for group: {group_name}, completing epoch without voting"); + info!("Steward epoch completed for group: {group_name} (no proposals)"); return Ok(()); } - info!( - "Found {} proposals to vote on for group: {}", - proposals_count, group_name - ); + info!("Found {proposals_count} proposals to vote on for group: {group_name}"); // Step 3: Start voting process let vote_id = user @@ -126,10 +113,7 @@ pub async fn handle_steward_flow_per_epoch( .await .map_err(|e| UserError::ProcessProposalsError(e.to_string()))?; - info!( - "Started voting with vote_id: {:?} for group: {}", - vote_id, group_name - ); + info!("Started voting with vote_id: {vote_id:?} for group: {group_name}"); // Step 4: Complete voting (in a real implementation, this would wait for actual votes) // For now, we'll simulate the voting process @@ -141,10 +125,7 @@ pub async fn handle_steward_flow_per_epoch( .await .map_err(|e| UserError::ApplyProposalsError(e.to_string()))?; - info!( - "Voting completed with result: {} for group: {}", - vote_result, group_name - ); + info!("Voting completed with result: {vote_result} for group: {group_name}"); // Step 5: If vote passed, apply proposals and complete if vote_result { @@ -160,15 +141,9 @@ pub async fn handle_steward_flow_per_epoch( app_state.waku_node.send(msg).await?; } - info!( - "Proposals applied and steward epoch completed for group: {}", - group_name - ); + info!("Proposals applied and steward epoch completed for group: {group_name}"); } else { - info!( - "Vote failed, returning to working state for group: {}", - group_name - ); + info!("Vote failed, returning to working state for group: {group_name}"); } user.ask(RemoveProposalsAndCompleteRequest { @@ -177,9 +152,6 @@ pub async fn handle_steward_flow_per_epoch( .await .map_err(|e| UserError::ApplyProposalsError(e.to_string()))?; - info!( - "Removing proposals and completing steward epoch for group: {}", - group_name - ); + info!("Removing proposals and completing steward epoch for group: {group_name}"); Ok(()) } diff --git a/tests/user_test.rs b/tests/user_test.rs index 0208104..4dffaa9 100644 --- a/tests/user_test.rs +++ b/tests/user_test.rs @@ -81,10 +81,7 @@ async fn test_invite_users_flow() { .get_pending_proposals_count(group_name.clone()) .await .expect("Failed to get proposal count"); - println!( - "Debug: Proposal count before steward epoch: {}", - proposal_count_before - ); + println!("Debug: Proposal count before steward epoch: {proposal_count_before}"); // Add Bob and Carol to the group initially using steward epoch flow // State machine: start steward epoch, voting, complete voting @@ -93,10 +90,7 @@ async fn test_invite_users_flow() { .await .expect("Failed to start steward epoch"); - println!( - "Debug: Steward epoch returned {} proposals", - steward_epoch_proposals - ); + println!("Debug: Steward epoch returned {steward_epoch_proposals} proposals"); let vote_id = alice .start_voting(group_name.clone()) @@ -146,7 +140,7 @@ async fn test_invite_users_flow() { .process_waku_message(bob_res_waku_message.clone()) .await .expect("Failed to process waku message"); - println!("Alice result: {:?}", res_alice); + println!("Alice result: {res_alice:?}"); let res_alice_msg = match res_alice { UserAction::SendToApp(msg) => msg, _ => panic!("User action is not SendToApp"), @@ -170,7 +164,7 @@ async fn test_invite_users_flow() { ) .await .expect("Failed to process waku message"); - println!("Carol result: {:?}", res_carol); + println!("Carol result: {res_carol:?}"); let carol_group = carol .get_group(group_name.clone()) @@ -298,7 +292,7 @@ async fn test_add_user_in_different_epoch() { ); // Submit a vote (Alice votes yes for her own proposals) - println!("Test: Submitting vote with ID: {:?}", vote_id); + println!("Test: Submitting vote with ID: {vote_id:?}"); println!("Test: Alice's identity: {}", alice.identity_string()); alice .submit_vote(vote_id.clone(), true) @@ -397,7 +391,7 @@ async fn test_add_user_in_different_epoch() { ); // Submit a vote (Alice votes yes for her own proposals) - println!("Test: Submitting vote with ID: {:?}", vote_id); + println!("Test: Submitting vote with ID: {vote_id:?}"); println!("Test: Alice's identity: {}", alice.identity_string()); alice .submit_vote(vote_id.clone(), true) @@ -446,7 +440,7 @@ async fn test_add_user_in_different_epoch() { .expect("Failed to process waku message apply commit to the Bob") { UserAction::SendToWaku(msg) => { - println!("Bob action is SendToWaku: {:?}", msg); + println!("Bob action is SendToWaku: {msg:?}"); } UserAction::DoNothing => { println!("Bob action is DoNothing"); @@ -581,10 +575,7 @@ async fn test_remove_user_flow() { .get_pending_proposals_count(group_name.clone()) .await .expect("Failed to get proposal count"); - println!( - "Debug: Proposal count before steward epoch: {}", - proposal_count_before - ); + println!("Debug: Proposal count before steward epoch: {proposal_count_before}"); // Add Bob and Carol to the group initially using steward epoch flow // State machine: start steward epoch, voting, complete voting @@ -593,10 +584,7 @@ async fn test_remove_user_flow() { .await .expect("Failed to start steward epoch"); - println!( - "Debug: Steward epoch returned {} proposals", - steward_epoch_proposals - ); + println!("Debug: Steward epoch returned {steward_epoch_proposals} proposals"); let vote_id = alice .start_voting(group_name.clone()) @@ -646,7 +634,7 @@ async fn test_remove_user_flow() { .process_waku_message(bob_res_waku_message.clone()) .await .expect("Failed to process waku message"); - println!("Alice result: {:?}", res_alice); + println!("Alice result: {res_alice:?}"); let res_alice_msg = match res_alice { UserAction::SendToApp(msg) => msg, _ => panic!("User action is not SendToApp"), @@ -670,7 +658,7 @@ async fn test_remove_user_flow() { ) .await .expect("Failed to process waku message"); - println!("Carol result: {:?}", res_carol); + println!("Carol result: {res_carol:?}"); let carol_group = carol .get_group(group_name.clone()) @@ -778,7 +766,7 @@ async fn test_remove_user_flow() { .expect("Failed to start voting (removal)"); // Submit a vote (Alice votes yes for the removal) - println!("Test: Submitting vote with ID: {:?}", vote_id); + println!("Test: Submitting vote with ID: {vote_id:?}"); println!("Test: Alice's identity: {}", alice.identity_string()); alice .submit_vote(vote_id.clone(), true)