Compare commits

...

9 Commits

Author SHA1 Message Date
georgehao
20c5e9855b perf(coordinator): use optimistic lock during batch/chunk assignment (#958)
Co-authored-by: georgehao <georgehao@users.noreply.github.com>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-09-21 17:03:42 +08:00
Steven
1f2fe74cbe fix(libzkp): set StorageTrace optional in ChunkProof (#953)
Co-authored-by: HAOYUatHZ <HAOYUatHZ@users.noreply.github.com>
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-09-21 12:39:41 +08:00
Xi Lin
0e12661fd5 docs(contracts): OZ-N01 Misleading Documentation (#955)
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-09-21 12:37:56 +08:00
Xi Lin
04e66231e5 fix(contracts): OZ-L02 Implicit Limitation of Withdrawal (#954) 2023-09-20 12:18:19 +02:00
Xi Lin
417a228523 fix(contracts): OZ-L03 Inconsistency of Allowing a Trusted Forwarder (#846)
Co-authored-by: Haichen Shen <shenhaichen@gmail.com>
2023-09-19 14:50:21 -07:00
georgehao
dcd85b2f56 feat(coordinator): bump version (#952)
Co-authored-by: georgehao <georgehao@users.noreply.github.com>
2023-09-19 17:50:25 +08:00
georgehao
afb6476823 feat: fix cleanup bug again (#951) 2023-09-19 17:45:45 +08:00
georgehao
d991d6b99d fix(coordinator): fix clean challenge bug (#950)
Co-authored-by: georgehao <georgehao@users.noreply.github.com>
2023-09-19 16:41:33 +08:00
Steven
f0920362c5 fix(libzkp): try to convert to string for panic errors (#949) 2023-09-19 12:47:37 +08:00
15 changed files with 148 additions and 123 deletions

View File

@@ -1,7 +1,8 @@
use crate::{
types::{CheckChunkProofsResponse, ProofResult},
utils::{
c_char_to_str, c_char_to_vec, file_exists, string_to_c_char, vec_to_c_char, OUTPUT_DIR,
c_char_to_str, c_char_to_vec, file_exists, panic_catch, string_to_c_char, vec_to_c_char,
OUTPUT_DIR,
},
};
use libc::c_char;
@@ -11,7 +12,7 @@ use prover::{
utils::{chunk_trace_to_witness_block, init_env_and_log},
BatchProof, BlockTrace, ChunkHash, ChunkProof,
};
use std::{cell::OnceCell, env, panic, ptr::null};
use std::{cell::OnceCell, env, ptr::null};
static mut PROVER: OnceCell<Prover> = OnceCell::new();
static mut VERIFIER: OnceCell<Verifier> = OnceCell::new();
@@ -55,7 +56,7 @@ pub unsafe extern "C" fn init_batch_verifier(params_dir: *const c_char, assets_d
/// # Safety
#[no_mangle]
pub unsafe extern "C" fn get_batch_vk() -> *const c_char {
let vk_result = panic::catch_unwind(|| PROVER.get_mut().unwrap().get_vk());
let vk_result = panic_catch(|| PROVER.get_mut().unwrap().get_vk());
vk_result
.ok()
@@ -66,7 +67,7 @@ pub unsafe extern "C" fn get_batch_vk() -> *const c_char {
/// # Safety
#[no_mangle]
pub unsafe extern "C" fn check_chunk_proofs(chunk_proofs: *const c_char) -> *const c_char {
let check_result: Result<bool, String> = panic::catch_unwind(|| {
let check_result: Result<bool, String> = panic_catch(|| {
let chunk_proofs = c_char_to_vec(chunk_proofs);
let chunk_proofs = serde_json::from_slice::<Vec<ChunkProof>>(&chunk_proofs)
.map_err(|e| format!("failed to deserialize chunk proofs: {e:?}"))?;
@@ -102,7 +103,7 @@ pub unsafe extern "C" fn gen_batch_proof(
chunk_hashes: *const c_char,
chunk_proofs: *const c_char,
) -> *const c_char {
let proof_result: Result<Vec<u8>, String> = panic::catch_unwind(|| {
let proof_result: Result<Vec<u8>, String> = panic_catch(|| {
let chunk_hashes = c_char_to_vec(chunk_hashes);
let chunk_proofs = c_char_to_vec(chunk_proofs);
@@ -151,7 +152,7 @@ pub unsafe extern "C" fn verify_batch_proof(proof: *const c_char) -> c_char {
let proof = c_char_to_vec(proof);
let proof = serde_json::from_slice::<BatchProof>(proof.as_slice()).unwrap();
let verified = panic::catch_unwind(|| VERIFIER.get().unwrap().verify_agg_evm_proof(proof));
let verified = panic_catch(|| VERIFIER.get().unwrap().verify_agg_evm_proof(proof));
verified.unwrap_or(false) as c_char
}

View File

@@ -1,7 +1,8 @@
use crate::{
types::ProofResult,
utils::{
c_char_to_str, c_char_to_vec, file_exists, string_to_c_char, vec_to_c_char, OUTPUT_DIR,
c_char_to_str, c_char_to_vec, file_exists, panic_catch, string_to_c_char, vec_to_c_char,
OUTPUT_DIR,
},
};
use libc::c_char;
@@ -11,7 +12,7 @@ use prover::{
zkevm::{Prover, Verifier},
BlockTrace, ChunkProof,
};
use std::{cell::OnceCell, env, panic, ptr::null};
use std::{cell::OnceCell, env, ptr::null};
static mut PROVER: OnceCell<Prover> = OnceCell::new();
static mut VERIFIER: OnceCell<Verifier> = OnceCell::new();
@@ -55,7 +56,7 @@ pub unsafe extern "C" fn init_chunk_verifier(params_dir: *const c_char, assets_d
/// # Safety
#[no_mangle]
pub unsafe extern "C" fn get_chunk_vk() -> *const c_char {
let vk_result = panic::catch_unwind(|| PROVER.get_mut().unwrap().get_vk());
let vk_result = panic_catch(|| PROVER.get_mut().unwrap().get_vk());
vk_result
.ok()
@@ -66,7 +67,7 @@ pub unsafe extern "C" fn get_chunk_vk() -> *const c_char {
/// # Safety
#[no_mangle]
pub unsafe extern "C" fn gen_chunk_proof(block_traces: *const c_char) -> *const c_char {
let proof_result: Result<Vec<u8>, String> = panic::catch_unwind(|| {
let proof_result: Result<Vec<u8>, String> = panic_catch(|| {
let block_traces = c_char_to_vec(block_traces);
let block_traces = serde_json::from_slice::<Vec<BlockTrace>>(&block_traces)
.map_err(|e| format!("failed to deserialize block traces: {e:?}"))?;
@@ -101,6 +102,6 @@ pub unsafe extern "C" fn verify_chunk_proof(proof: *const c_char) -> c_char {
let proof = c_char_to_vec(proof);
let proof = serde_json::from_slice::<ChunkProof>(proof.as_slice()).unwrap();
let verified = panic::catch_unwind(|| VERIFIER.get().unwrap().verify_chunk_proof(proof));
let verified = panic_catch(|| VERIFIER.get().unwrap().verify_chunk_proof(proof));
verified.unwrap_or(false) as c_char
}

View File

@@ -3,6 +3,7 @@ use std::{
env,
ffi::{CStr, CString},
os::raw::c_char,
panic::{catch_unwind, AssertUnwindSafe},
path::PathBuf,
};
@@ -34,3 +35,15 @@ pub(crate) fn file_exists(dir: &str, filename: &str) -> bool {
path.exists()
}
pub(crate) fn panic_catch<F: FnOnce() -> R, R>(f: F) -> Result<R, String> {
catch_unwind(AssertUnwindSafe(f)).map_err(|err| {
if let Some(s) = err.downcast_ref::<String>() {
s.to_string()
} else if let Some(s) = err.downcast_ref::<&str>() {
s.to_string()
} else {
format!("unable to get panic info {err:?}")
}
})
}

View File

@@ -263,7 +263,7 @@ type ChunkInfo struct {
// ChunkProof includes the proof info that are required for chunk verification and rollup.
type ChunkProof struct {
StorageTrace []byte `json:"storage_trace"`
StorageTrace []byte `json:"storage_trace,omitempty"`
Protocol []byte `json:"protocol"`
Proof []byte `json:"proof"`
Instances []byte `json:"instances"`

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.3.9"
var tag = "v4.3.13"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -64,7 +64,7 @@ contract L1MessageQueue is OwnableUpgradeable, IL1MessageQueue {
/// @notice The max gas limit of L1 transactions.
uint256 public maxGasLimit;
/// @dev The bitmap for skipped messages.
/// @dev The bitmap for dropped messages, where `droppedMessageBitmap[i]` keeps the bits from `[i*256, (i+1)*256)`.
BitMapsUpgradeable.BitMap private droppedMessageBitmap;
/// @dev The bitmap for skipped messages, where `skippedMessageBitmap[i]` keeps the bits from `[i*256, (i+1)*256)`.

View File

@@ -32,7 +32,7 @@ import {OwnableBase} from "../../libraries/common/OwnableBase.sol";
// solhint-disable reason-string
/// @title L2TxFeeVault
/// @notice The L2TxFeeVault contract contains the basic logic for the various different vault contracts
/// @notice The L2TxFeeVault contract contains the logic for the vault contracts
/// used to hold fee revenue generated by the L2 system.
contract L2TxFeeVault is OwnableBase {
/**********
@@ -110,6 +110,9 @@ contract L2TxFeeVault is OwnableBase {
"FeeVault: withdrawal amount must be greater than minimum withdrawal amount"
);
uint256 _balance = address(this).balance;
require(_value <= _balance, "FeeVault: insufficient balance to withdraw");
unchecked {
totalProcessed += _value;
}

View File

@@ -2,17 +2,17 @@
pragma solidity =0.8.16;
import {Ownable} from "@openzeppelin/contracts/access/Ownable.sol";
import {ERC2771Context} from "@openzeppelin/contracts/metatx/ERC2771Context.sol";
import {ReentrancyGuard} from "@openzeppelin/contracts/security/ReentrancyGuard.sol";
import {IERC20} from "@openzeppelin/contracts/token/ERC20/IERC20.sol";
import {SafeERC20} from "@openzeppelin/contracts/token/ERC20/utils/SafeERC20.sol";
import {IERC20Permit} from "@openzeppelin/contracts/token/ERC20/extensions/draft-IERC20Permit.sol";
import {OwnableBase} from "../libraries/common/OwnableBase.sol";
import {Context} from "@openzeppelin/contracts/utils/Context.sol";
// solhint-disable no-empty-blocks
contract GasSwap is ERC2771Context, ReentrancyGuard, OwnableBase {
contract GasSwap is ERC2771Context, Ownable, ReentrancyGuard {
using SafeERC20 for IERC20;
using SafeERC20 for IERC20Permit;
@@ -76,9 +76,7 @@ contract GasSwap is ERC2771Context, ReentrancyGuard, OwnableBase {
* Constructor *
***************/
constructor(address trustedForwarder) ERC2771Context(trustedForwarder) {
owner = msg.sender;
}
constructor(address trustedForwarder) ERC2771Context(trustedForwarder) {}
/*****************************
* Public Mutating Functions *
@@ -174,6 +172,16 @@ contract GasSwap is ERC2771Context, ReentrancyGuard, OwnableBase {
* Internal Functions *
**********************/
/// @inheritdoc Context
function _msgData() internal view virtual override(Context, ERC2771Context) returns (bytes calldata) {
return ERC2771Context._msgData();
}
/// @inheritdoc Context
function _msgSender() internal view virtual override(Context, ERC2771Context) returns (address) {
return ERC2771Context._msgSender();
}
/// @dev Internal function to concat two bytes array.
function concat(bytes memory a, bytes memory b) internal pure returns (bytes memory) {
return abi.encodePacked(a, b);

View File

@@ -33,7 +33,7 @@ contract L2TxFeeVaultTest is DSTestPlus {
function testCantWithdrawMoreThanBalance(uint256 amount) public {
hevm.assume(amount >= 10 ether);
hevm.deal(address(vault), amount - 1);
hevm.expectRevert(new bytes(0));
hevm.expectRevert("FeeVault: insufficient balance to withdraw");
vault.withdraw(amount);
}

View File

@@ -12,7 +12,7 @@ import (
func (c *Collector) cleanupChallenge() {
defer func() {
if err := recover(); err != nil {
nerr := fmt.Errorf("clean challenge panic error:%v", err)
nerr := fmt.Errorf("clean challenge panic error: %v", err)
log.Warn(nerr.Error())
}
}()
@@ -21,7 +21,7 @@ func (c *Collector) cleanupChallenge() {
for {
select {
case <-ticker.C:
expiredTime := utils.NowUTC().Add(time.Hour)
expiredTime := utils.NowUTC().Add(-time.Hour)
if err := c.challenge.DeleteExpireChallenge(c.ctx, expiredTime); err != nil {
log.Error("delete expired challenge failure", "error", err)
}

View File

@@ -47,6 +47,7 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom
proverTaskOrm: orm.NewProverTask(db),
chunkOrm: orm.NewChunk(db),
batchOrm: orm.NewBatch(db),
challenge: orm.NewChallenge(db),
timeoutBatchCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "coordinator_batch_timeout_checker_run_total",

View File

@@ -61,13 +61,34 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned batch proving tasks", "err", err)
return nil, ErrCoordinatorInternalFailure
var batchTask *orm.Batch
for i := 0; i < 100; i++ {
unassignedBatch, getUnassignedErr := bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts)
if getUnassignedErr != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnassignedErr)
return nil, ErrCoordinatorInternalFailure
}
if unassignedBatch == nil {
log.Debug("get empty unassigned batch", "height", getTaskParameter.ProverHeight)
return nil, nil
}
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, unassignedBatch.Index, unassignedBatch.ActiveAttempts, unassignedBatch.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
if rowsAffected == 0 {
continue
}
batchTask = unassignedBatch
break
}
if batchTask == nil {
log.Debug("get empty unassigned batch after retry 100 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

View File

@@ -64,13 +64,35 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
var chunkTask *orm.Chunk
for i := 0; i < 100; i++ {
unassignedChunk, getUnsignedChunkErr := cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getUnsignedChunkErr != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnsignedChunkErr)
return nil, ErrCoordinatorInternalFailure
}
if unassignedChunk == nil {
log.Debug("get empty unassigned chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, unassignedChunk.Index, unassignedChunk.ActiveAttempts, unassignedChunk.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
if rowsAffected == 0 {
continue
}
chunkTask = unassignedChunk
break
}
if chunkTask == nil {
log.Debug("get empty unassigned chunk after retry 100 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
@@ -72,26 +71,26 @@ func (*Batch) TableName() string {
return "batch"
}
// GetUnassignedBatches retrieves unassigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatches(ctx context.Context, limit int) ([]*Batch, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
}
if limit == 0 {
// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
db = db.Order("index ASC")
var batch Batch
err := db.First(&batch).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
db := o.db.WithContext(ctx)
db = db.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady)
db = db.Order("index ASC")
db = db.Limit(limit)
var batches []*Batch
if err := db.Find(&batches).Error; err != nil {
if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
}
return batches, nil
return &batch, nil
}
// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
@@ -303,22 +302,13 @@ func (o *Batch) UpdateProofAndProvingStatusByHash(ctx context.Context, hash stri
return nil
}
// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
// UpdateBatchAttempts atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Batch{}).Select("index")
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
var updatedBatch Batch
db = db.Model(&updatedBatch).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Model(&Batch{})
db = db.Where("index = ?", index)
db = db.Where("active_attempts = ?", curActiveAttempts)
db = db.Where("total_attempts = ?", curTotalAttempts)
result := db.Updates(map[string]interface{}{
"proving_status": types.ProvingTaskAssigned,
"total_attempts": gorm.Expr("total_attempts + 1"),
@@ -326,13 +316,9 @@ func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttem
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
return 0, fmt.Errorf("failed to update batch, err:%w", result.Error)
}
if result.RowsAffected == 0 {
return nil, nil
}
return &updatedBatch, nil
return result.RowsAffected, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash.

View File

@@ -9,7 +9,6 @@ import (
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
@@ -67,27 +66,27 @@ func (*Chunk) TableName() string {
return "chunk"
}
// GetUnassignedChunks retrieves unassigned chunks based on the specified limit.
// GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetUnassignedChunks(ctx context.Context, limit int) ([]*Chunk, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
}
if limit == 0 {
func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
db = db.Order("index ASC")
var chunk Chunk
err := db.First(&chunk).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
db = db.Order("index ASC")
db = db.Limit(limit)
var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
if err != nil {
return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
}
return chunks, nil
return &chunk, nil
}
// GetChunksByBatchHash retrieves the chunks associated with a specific batch hash.
@@ -158,19 +157,6 @@ func (o *Chunk) GetProvingStatusByHash(ctx context.Context, hash string) (types.
return types.ProvingStatus(chunk.ProvingStatus), nil
}
// GetAssignedChunks retrieves all chunks whose proving_status is either types.ProvingTaskAssigned.
func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
}
return chunks, nil
}
// CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected.
func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) {
db := o.db.WithContext(ctx)
@@ -350,26 +336,13 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
return nil
}
// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
if height <= 0 {
return nil, errors.New("Chunk.UpdateChunkAttemptsReturning error: height must be larger than zero")
}
// UpdateChunkAttempts atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) {
db := o.db.WithContext(ctx)
subQueryDB := db.Model(&Chunk{}).Select("index")
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("end_block_number <= ?", height)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)
var updatedChunk Chunk
db = db.Model(&updatedChunk).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Model(&Chunk{})
db = db.Where("index = ?", index)
db = db.Where("active_attempts = ?", curActiveAttempts)
db = db.Where("total_attempts = ?", curTotalAttempts)
result := db.Updates(map[string]interface{}{
"proving_status": types.ProvingTaskAssigned,
"total_attempts": gorm.Expr("total_attempts + 1"),
@@ -377,13 +350,9 @@ func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, height int, ma
})
if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
return 0, fmt.Errorf("failed to update chunk, err:%w", result.Error)
}
if result.RowsAffected == 0 {
return nil, nil
}
return &updatedChunk, nil
return result.RowsAffected, nil
}
// DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.