tests for origin blob fetching (#13667)

* tests for origin blob fetching

* Update beacon-chain/sync/initial-sync/service.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
kasey
2024-03-02 09:11:22 -06:00
committed by GitHub
parent 68b78dd520
commit 4c3dbae3c0
4 changed files with 98 additions and 22 deletions

View File

@@ -279,34 +279,22 @@ func (s *Service) markSynced() {
close(s.cfg.InitialSyncComplete)
}
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", r).Error("Block for checkpoint sync origin root not found in db")
return err
}
func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
r := blk.Root()
if blk.Version() < version.Deneb {
return nil
return nil, nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return err
return nil, err
}
if len(cmts) == 0 {
return nil
return nil, nil
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
onDisk, err := store.Indices(r)
if err != nil {
return err
}
onDisk, err := s.cfg.BlobStorage.Indices(r)
if err != nil {
return errors.Wrapf(err, "error checking existing blobs for checkpoint sync bloc root %#x", r)
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
@@ -315,8 +303,29 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
return req, nil
}
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db")
return err
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
if err != nil {
return err
}
req, err := missingBlobRequest(rob, s.cfg.BlobStorage)
if err != nil {
return err
}
if len(req) == 0 {
log.WithField("nBlobs", len(cmts)).WithField("root", fmt.Sprintf("%#x", r)).Debug("All checkpoint block blobs are present")
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All blobs for checkpoint block are present")
return nil
}
shufflePeers(pids)