mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 16:08:26 -05:00
Add lock around reading eth1data (#10557)
* Add lock around reading eth1data * fix lock pattern * more locking * move lock inside loop Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
@@ -114,8 +114,10 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockByTimestamp")
|
||||
defer span.End()
|
||||
|
||||
s.latestEth1DataLock.RLock()
|
||||
latestBlkHeight := s.latestEth1Data.BlockHeight
|
||||
latestBlkTime := s.latestEth1Data.BlockTime
|
||||
s.latestEth1DataLock.RUnlock()
|
||||
|
||||
if time > latestBlkTime {
|
||||
return nil, errors.Errorf("provided time is later than the current eth1 head. %d > %d", time, latestBlkTime)
|
||||
|
||||
@@ -342,7 +342,9 @@ func (s *Service) processPastLogs(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
// set new block number after checking for chainstart for previous block.
|
||||
s.latestEth1DataLock.Lock()
|
||||
s.latestEth1Data.LastRequestedBlock = currentBlockNum
|
||||
s.latestEth1DataLock.Unlock()
|
||||
currentBlockNum = filterLog.BlockNumber
|
||||
}
|
||||
if err := s.ProcessLog(ctx, filterLog); err != nil {
|
||||
@@ -363,7 +365,9 @@ func (s *Service) processPastLogs(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
s.latestEth1DataLock.Lock()
|
||||
s.latestEth1Data.LastRequestedBlock = currentBlockNum
|
||||
s.latestEth1DataLock.Unlock()
|
||||
|
||||
c, err := s.cfg.beaconDB.FinalizedCheckpoint(ctx)
|
||||
if err != nil {
|
||||
@@ -421,7 +425,9 @@ func (s *Service) requestBatchedHeadersAndLogs(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.latestEth1DataLock.Lock()
|
||||
s.latestEth1Data.LastRequestedBlock = i
|
||||
s.latestEth1DataLock.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -148,6 +148,7 @@ type Service struct {
|
||||
connectedETH1 bool
|
||||
isRunning bool
|
||||
processingLock sync.RWMutex
|
||||
latestEth1DataLock sync.RWMutex
|
||||
cfg *config
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -440,9 +441,11 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo
|
||||
func (s *Service) processBlockHeader(header *gethTypes.Header) {
|
||||
defer safelyHandlePanic()
|
||||
blockNumberGauge.Set(float64(header.Number.Int64()))
|
||||
s.latestEth1DataLock.Lock()
|
||||
s.latestEth1Data.BlockHeight = header.Number.Uint64()
|
||||
s.latestEth1Data.BlockHash = header.Hash().Bytes()
|
||||
s.latestEth1Data.BlockTime = header.Time
|
||||
s.latestEth1DataLock.Unlock()
|
||||
log.WithFields(logrus.Fields{
|
||||
"blockNumber": s.latestEth1Data.BlockHeight,
|
||||
"blockHash": hexutil.Encode(s.latestEth1Data.BlockHash),
|
||||
@@ -569,9 +572,11 @@ func (s *Service) initPOWService() {
|
||||
continue
|
||||
}
|
||||
|
||||
s.latestEth1DataLock.Lock()
|
||||
s.latestEth1Data.BlockHeight = header.Number.Uint64()
|
||||
s.latestEth1Data.BlockHash = header.Hash().Bytes()
|
||||
s.latestEth1Data.BlockTime = header.Time
|
||||
s.latestEth1DataLock.Unlock()
|
||||
|
||||
if err := s.processPastLogs(ctx); err != nil {
|
||||
s.retryExecutionClientConnection(ctx, err)
|
||||
|
||||
Reference in New Issue
Block a user