diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index cbc32e5f75..d399f6682c 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -10,26 +10,26 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" ) -func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error) { +func (s *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error) { if msg == nil || msg.TopicIDs == nil || len(msg.TopicIDs) == 0 { return nil, errors.New("nil pubsub message") } topic := msg.TopicIDs[0] - topic = strings.TrimSuffix(topic, r.p2p.Encoding().ProtocolSuffix()) - topic = r.replaceForkDigest(topic) + topic = strings.TrimSuffix(topic, s.p2p.Encoding().ProtocolSuffix()) + topic = s.replaceForkDigest(topic) base, ok := p2p.GossipTopicMappings[topic] if !ok { return nil, fmt.Errorf("no message mapped for topic %s", topic) } m := proto.Clone(base) - if err := r.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil { + if err := s.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil { return nil, err } return m, nil } // Replaces our fork digest with the formatter. -func (r *Service) replaceForkDigest(topic string) string { +func (s *Service) replaceForkDigest(topic string) string { subStrings := strings.Split(topic, "/") subStrings[2] = "%x" return strings.Join(subStrings, "/") diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 0aa812f090..02d713e4d6 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -22,12 +22,12 @@ var responseCodeSuccess = byte(0x00) var responseCodeInvalidRequest = byte(0x01) var responseCodeServerError = byte(0x02) -func (r *Service) generateErrorResponse(code byte, reason string) ([]byte, error) { +func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) { buf := bytes.NewBuffer([]byte{code}) resp := &pb.ErrorResponse{ Message: []byte(reason), } - if _, err := r.p2p.Encoding().EncodeWithLength(buf, resp); err != nil { + if _, err := s.p2p.Encoding().EncodeWithLength(buf, resp); err != nil { return nil, err } diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 00415dba8d..b8277dea48 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -78,23 +78,23 @@ var ( ) ) -func (r *Service) updateMetrics() { +func (s *Service) updateMetrics() { // do not update metrics if genesis time // has not been initialized - if r.chain.GenesisTime().IsZero() { + if s.chain.GenesisTime().IsZero() { return } // We update the dynamic subnet topics. - digest, err := r.forkDigest() + digest, err := s.forkDigest() if err != nil { log.WithError(err).Errorf("Could not compute fork digest") } - indices := r.aggregatorSubnetIndices(r.chain.CurrentSlot()) + indices := s.aggregatorSubnetIndices(s.chain.CurrentSlot()) attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] - attTopic += r.p2p.Encoding().ProtocolSuffix() + attTopic += s.p2p.Encoding().ProtocolSuffix() for _, committeeIdx := range indices { formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx) - topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(r.p2p.PubSub().ListPeers(formattedTopic)))) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) } // We update all other gossip topics. for topic := range p2p.GossipTopicMappings { @@ -102,12 +102,12 @@ func (r *Service) updateMetrics() { if strings.Contains(topic, "beacon_attestation") { continue } - topic += r.p2p.Encoding().ProtocolSuffix() + topic += s.p2p.Encoding().ProtocolSuffix() if !strings.Contains(topic, "%x") { - topicPeerCount.WithLabelValues(topic).Set(float64(len(r.p2p.PubSub().ListPeers(topic)))) + topicPeerCount.WithLabelValues(topic).Set(float64(len(s.p2p.PubSub().ListPeers(topic)))) continue } formattedTopic := fmt.Sprintf(topic, digest) - topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(r.p2p.PubSub().ListPeers(formattedTopic)))) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) } } diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index efebebd2b5..8470b6f0f3 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -22,12 +22,12 @@ import ( var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */) // processes pending blocks queue on every processPendingBlocksPeriod -func (r *Service) processPendingBlocksQueue() { +func (s *Service) processPendingBlocksQueue() { ctx := context.Background() locker := new(sync.Mutex) - runutil.RunEvery(r.ctx, processPendingBlocksPeriod, func() { + runutil.RunEvery(s.ctx, processPendingBlocksPeriod, func() { locker.Lock() - if err := r.processPendingBlocks(ctx); err != nil { + if err := s.processPendingBlocks(ctx); err != nil { log.WithError(err).Error("Failed to process pending blocks") } locker.Unlock() @@ -35,37 +35,37 @@ func (r *Service) processPendingBlocksQueue() { } // processes the block tree inside the queue -func (r *Service) processPendingBlocks(ctx context.Context) error { +func (s *Service) processPendingBlocks(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "processPendingBlocks") defer span.End() - pids := r.p2p.Peers().Connected() - if err := r.validatePendingSlots(); err != nil { + pids := s.p2p.Peers().Connected() + if err := s.validatePendingSlots(); err != nil { return errors.Wrap(err, "could not validate pending slots") } - slots := r.sortedPendingSlots() + slots := s.sortedPendingSlots() span.AddAttributes( trace.Int64Attribute("numSlots", int64(len(slots))), trace.Int64Attribute("numPeers", int64(len(pids))), ) - for _, s := range slots { + for _, slot := range slots { ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop") - span.AddAttributes(trace.Int64Attribute("slot", int64(s))) + span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) - r.pendingQueueLock.RLock() - b := r.slotToPendingBlocks[s] + s.pendingQueueLock.RLock() + b := s.slotToPendingBlocks[slot] // Skip if block does not exist. if b == nil || b.Block == nil { - r.pendingQueueLock.RUnlock() + s.pendingQueueLock.RUnlock() span.End() continue } - r.pendingQueueLock.RUnlock() - inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)] + s.pendingQueueLock.RUnlock() + inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)] - inDB := r.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot)) + inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot)) hasPeer := len(pids) != 0 // Only request for missing parent block if it's not in DB, not in pending cache @@ -81,17 +81,17 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { // have a head slot newer than the block slot we are requesting. pid := pids[rand.Int()%len(pids)] for _, p := range pids { - cs, err := r.p2p.Peers().ChainState(p) + cs, err := s.p2p.Peers().ChainState(p) if err != nil { return errors.Wrap(err, "failed to read chain state for peer") } - if cs != nil && cs.HeadSlot >= s { + if cs != nil && cs.HeadSlot >= slot { pid = p break } } - if err := r.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { + if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { traceutil.AnnotateError(span, err) log.Errorf("Could not send recent block request: %v", err) } @@ -111,23 +111,23 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { return err } - if err := r.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil { + if err := s.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil { log.Errorf("Could not process block from slot %d: %v", b.Block.Slot, err) traceutil.AnnotateError(span, err) } // Broadcasting the block again once a node is able to process it. - if err := r.p2p.Broadcast(ctx, b); err != nil { + if err := s.p2p.Broadcast(ctx, b); err != nil { log.WithError(err).Error("Failed to broadcast block") } - r.pendingQueueLock.Lock() - delete(r.slotToPendingBlocks, s) - delete(r.seenPendingBlocks, blkRoot) - r.pendingQueueLock.Unlock() + s.pendingQueueLock.Lock() + delete(s.slotToPendingBlocks, slot) + delete(s.seenPendingBlocks, blkRoot) + s.pendingQueueLock.Unlock() log.WithFields(logrus.Fields{ - "slot": s, + "slot": slot, "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])), }).Debug("Processed pending block and cleared it in cache") @@ -137,13 +137,13 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { return nil } -func (r *Service) sortedPendingSlots() []uint64 { - r.pendingQueueLock.RLock() - defer r.pendingQueueLock.RUnlock() +func (s *Service) sortedPendingSlots() []uint64 { + s.pendingQueueLock.RLock() + defer s.pendingQueueLock.RUnlock() - slots := make([]uint64, 0, len(r.slotToPendingBlocks)) - for s := range r.slotToPendingBlocks { - slots = append(slots, s) + slots := make([]uint64, 0, len(s.slotToPendingBlocks)) + for slot := range s.slotToPendingBlocks { + slots = append(slots, slot) } sort.Slice(slots, func(i, j int) bool { return slots[i] < slots[j] @@ -154,14 +154,14 @@ func (r *Service) sortedPendingSlots() []uint64 { // validatePendingSlots validates the pending blocks // by their slot. If they are before the current finalized // checkpoint, these blocks are removed from the queue. -func (r *Service) validatePendingSlots() error { - r.pendingQueueLock.Lock() - defer r.pendingQueueLock.Unlock() +func (s *Service) validatePendingSlots() error { + s.pendingQueueLock.Lock() + defer s.pendingQueueLock.Unlock() oldBlockRoots := make(map[[32]byte]bool) - finalizedEpoch := r.chain.FinalizedCheckpt().Epoch - for s, b := range r.slotToPendingBlocks { - epoch := helpers.SlotToEpoch(s) + finalizedEpoch := s.chain.FinalizedCheckpt().Epoch + for slot, b := range s.slotToPendingBlocks { + epoch := helpers.SlotToEpoch(slot) // remove all descendant blocks of old blocks if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] { root, err := stateutil.BlockRoot(b.Block) @@ -169,8 +169,8 @@ func (r *Service) validatePendingSlots() error { return err } oldBlockRoots[root] = true - delete(r.slotToPendingBlocks, s) - delete(r.seenPendingBlocks, root) + delete(s.slotToPendingBlocks, slot) + delete(s.seenPendingBlocks, root) continue } // don't process old blocks @@ -180,16 +180,16 @@ func (r *Service) validatePendingSlots() error { return err } oldBlockRoots[blkRoot] = true - delete(r.slotToPendingBlocks, s) - delete(r.seenPendingBlocks, blkRoot) + delete(s.slotToPendingBlocks, slot) + delete(s.seenPendingBlocks, blkRoot) } } return nil } -func (r *Service) clearPendingSlots() { - r.pendingQueueLock.Lock() - defer r.pendingQueueLock.Unlock() - r.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock) - r.seenPendingBlocks = make(map[[32]byte]bool) +func (s *Service) clearPendingSlots() { + s.pendingQueueLock.Lock() + defer s.pendingQueueLock.Unlock() + s.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock) + s.seenPendingBlocks = make(map[[32]byte]bool) } diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index fb93525725..6345b1986a 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -31,44 +31,44 @@ var maxChunkSize = params.BeaconNetworkConfig().MaxChunkSize type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error // registerRPCHandlers for p2p RPC. -func (r *Service) registerRPCHandlers() { - r.registerRPC( +func (s *Service) registerRPCHandlers() { + s.registerRPC( p2p.RPCStatusTopic, &pb.Status{}, - r.statusRPCHandler, + s.statusRPCHandler, ) - r.registerRPC( + s.registerRPC( p2p.RPCGoodByeTopic, new(uint64), - r.goodbyeRPCHandler, + s.goodbyeRPCHandler, ) - r.registerRPC( + s.registerRPC( p2p.RPCBlocksByRangeTopic, &pb.BeaconBlocksByRangeRequest{}, - r.beaconBlocksByRangeRPCHandler, + s.beaconBlocksByRangeRPCHandler, ) - r.registerRPC( + s.registerRPC( p2p.RPCBlocksByRootTopic, &pb.BeaconBlocksByRootRequest{}, - r.beaconBlocksRootRPCHandler, + s.beaconBlocksRootRPCHandler, ) - r.registerRPC( + s.registerRPC( p2p.RPCPingTopic, new(uint64), - r.pingHandler, + s.pingHandler, ) - r.registerRPC( + s.registerRPC( p2p.RPCMetaDataTopic, new(interface{}), - r.metaDataHandler, + s.metaDataHandler, ) } // registerRPC for a given topic with an expected protobuf message type. -func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler) { - topic += r.p2p.Encoding().ProtocolSuffix() +func (s *Service) registerRPC(topic string, base interface{}, handle rpcHandler) { + topic += s.p2p.Encoding().ProtocolSuffix() log := log.WithField("topic", topic) - r.p2p.SetStreamHandler(topic, func(stream network.Stream) { + s.p2p.SetStreamHandler(topic, func(stream network.Stream) { ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout) defer cancel() defer func() { @@ -109,7 +109,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler) t := reflect.TypeOf(base) if t.Kind() == reflect.Ptr { msg := reflect.New(t.Elem()) - if err := r.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { + if err := s.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { // Debug logs for goodbye/status errors if strings.Contains(topic, p2p.RPCGoodByeTopic) || strings.Contains(topic, p2p.RPCStatusTopic) { log.WithError(err).Debug("Failed to decode goodbye stream message") @@ -129,7 +129,7 @@ func (r *Service) registerRPC(topic string, base interface{}, handle rpcHandler) } } else { msg := reflect.New(t) - if err := r.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { + if err := s.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { log.WithError(err).Warn("Failed to decode stream message") traceutil.AnnotateError(span, err) return diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index f3029c62ee..b5888376a7 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -17,7 +17,7 @@ import ( ) // beaconBlocksByRangeRPCHandler looks up the request blocks from the database from a given start block. -func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler") defer span.End() defer func() { @@ -52,7 +52,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa // The final requested slot from remote peer. endReqSlot := startSlot + (m.Step * (m.Count - 1)) - remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) + remainingBucketCapacity := s.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) span.AddAttributes( trace.Int64Attribute("start", int64(startSlot)), trace.Int64Attribute("end", int64(endReqSlot)), @@ -63,36 +63,36 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa ) maxRequestBlocks := params.BeaconNetworkConfig().MaxRequestBlocks for startSlot <= endReqSlot { - remainingBucketCapacity = r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) + remainingBucketCapacity = s.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) if int64(allowedBlocksPerSecond) > remainingBucketCapacity { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) - if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { log.Debug("Disconnecting bad peer") defer func() { - if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { + if err := s.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { log.WithError(err).Error("Failed to disconnect peer") } }() } - r.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream) + s.writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream) return errors.New(rateLimitedError) } // TODO(3147): Update this with reasonable constraints. if endSlot-startSlot > rangeLimit || m.Step == 0 || m.Count > maxRequestBlocks { - r.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream) + s.writeErrorResponseToStream(responseCodeInvalidRequest, stepError, stream) err := errors.New(stepError) traceutil.AnnotateError(span, err) return err } - if err := r.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil { + if err := s.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, stream); err != nil { return err } // Decrease allowed blocks capacity by the number of streamed blocks. if startSlot <= endSlot { - r.blocksRateLimiter.Add( + s.blocksRateLimiter.Add( stream.Conn().RemotePeer().String(), int64(1+(endSlot-startSlot)/m.Step)) } @@ -114,31 +114,31 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa return nil } -func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error { +func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot, step uint64, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream") defer span.End() filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step) - blks, err := r.db.Blocks(ctx, filter) + blks, err := s.db.Blocks(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve blocks") - r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) + s.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } - roots, err := r.db.BlockRoots(ctx, filter) + roots, err := s.db.BlockRoots(ctx, filter) if err != nil { log.WithError(err).Error("Failed to retrieve block roots") - r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) + s.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } // handle genesis case if startSlot == 0 { - genBlock, genRoot, err := r.retrieveGenesisBlock(ctx) + genBlock, genRoot, err := s.retrieveGenesisBlock(ctx) if err != nil { log.WithError(err).Error("Failed to retrieve genesis block") - r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) + s.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } @@ -147,8 +147,8 @@ func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo } // Filter and sort our retrieved blocks, so that // we only return valid sets of blocks. - blks, roots = r.dedupBlocksAndRoots(blks, roots) - blks, roots = r.sortBlocksAndRoots(blks, roots) + blks, roots = s.dedupBlocksAndRoots(blks, roots) + blks, roots = s.sortBlocksAndRoots(blks, roots) for i, b := range blks { if b == nil || b.Block == nil { continue @@ -158,19 +158,19 @@ func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo // Check that the block is valid according to the request and part of the canonical chain. isRequestedSlotStep := (blk.Slot-startSlot)%step == 0 if isRequestedSlotStep { - canonical, err := r.chain.IsCanonical(ctx, roots[i]) + canonical, err := s.chain.IsCanonical(ctx, roots[i]) if err != nil { log.WithError(err).Error("Failed to determine canonical block") - r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) + s.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } if !canonical { continue } - if err := r.chunkWriter(stream, b); err != nil { + if err := s.chunkWriter(stream, b); err != nil { log.WithError(err).Error("Failed to send a chunked response") - r.writeErrorResponseToStream(responseCodeServerError, genericError, stream) + s.writeErrorResponseToStream(responseCodeServerError, genericError, stream) traceutil.AnnotateError(span, err) return err } @@ -179,8 +179,8 @@ func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo return nil } -func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { - resp, err := r.generateErrorResponse(responseCode, reason) +func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { + resp, err := s.generateErrorResponse(responseCode, reason) if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -190,8 +190,8 @@ func (r *Service) writeErrorResponseToStream(responseCode byte, reason string, s } } -func (r *Service) retrieveGenesisBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, [32]byte, error) { - genBlock, err := r.db.GenesisBlock(ctx) +func (s *Service) retrieveGenesisBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, [32]byte, error) { + genBlock, err := s.db.GenesisBlock(ctx) if err != nil { return nil, [32]byte{}, err } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index cc4be6d35e..20fef12db8 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -17,12 +17,12 @@ import ( // sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get // those corresponding blocks from that peer. -func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][]byte, id peer.ID) error { +func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][]byte, id peer.ID) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req := &pbp2p.BeaconBlocksByRootRequest{BlockRoots: blockRoots} - stream, err := r.p2p.Send(ctx, req, p2p.RPCBlocksByRootTopic, id) + stream, err := s.p2p.Send(ctx, req, p2p.RPCBlocksByRootTopic, id) if err != nil { return err } @@ -32,7 +32,7 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots } }() for i := 0; i < len(blockRoots); i++ { - blk, err := ReadChunkedBlock(stream, r.p2p) + blk, err := ReadChunkedBlock(stream, s.p2p) if err == io.EOF { break } @@ -49,17 +49,17 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots if err != nil { return err } - r.pendingQueueLock.Lock() - r.slotToPendingBlocks[blk.Block.Slot] = blk - r.seenPendingBlocks[blkRoot] = true - r.pendingQueueLock.Unlock() + s.pendingQueueLock.Lock() + s.slotToPendingBlocks[blk.Block.Slot] = blk + s.seenPendingBlocks[blkRoot] = true + s.pendingQueueLock.Unlock() } return nil } // beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots. -func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { defer func() { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") @@ -75,7 +75,7 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ return errors.New("message is not type BeaconBlocksByRootRequest") } if len(req.BlockRoots) == 0 { - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request") + resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request") if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -86,17 +86,17 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ return errors.New("no block roots provided") } - if int64(len(req.BlockRoots)) > r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) - if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { + if int64(len(req.BlockRoots)) > s.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) { + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { log.Debug("Disconnecting bad peer") defer func() { - if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { + if err := s.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { log.WithError(err).Error("Failed to disconnect peer") } }() } - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) + resp, err := s.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError) if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -108,7 +108,7 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ } if uint64(len(req.BlockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks { - resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit") + resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit") if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -119,13 +119,13 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ return errors.New("requested more than the max block limit") } - r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(req.BlockRoots))) + s.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(req.BlockRoots))) for _, root := range req.BlockRoots { - blk, err := r.db.Block(ctx, bytesutil.ToBytes32(root)) + blk, err := s.db.Block(ctx, bytesutil.ToBytes32(root)) if err != nil { log.WithError(err).Error("Failed to fetch block") - resp, err := r.generateErrorResponse(responseCodeServerError, genericError) + resp, err := s.generateErrorResponse(responseCodeServerError, genericError) if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -138,7 +138,7 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ if blk == nil { continue } - if err := r.chunkWriter(stream, blk); err != nil { + if err := s.chunkWriter(stream, blk); err != nil { return err } } diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 54e7bf9495..d7987d1409 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -13,9 +13,9 @@ import ( // chunkWriter writes the given message as a chunked response to the given network // stream. // response_chunk ::= | | | -func (r *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error { +func (s *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error { setStreamWriteDeadline(stream, defaultWriteDuration) - return WriteChunk(stream, r.p2p.Encoding(), msg) + return WriteChunk(stream, s.p2p.Encoding(), msg) } // WriteChunk object to stream. diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index e4be06c107..cfefe5d075 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -24,7 +24,7 @@ var goodByes = map[uint64]string{ } // goodbyeRPCHandler reads the incoming goodbye rpc message from the peer. -func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { defer func() { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") @@ -41,27 +41,27 @@ func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream log := log.WithField("Reason", goodbyeMessage(*m)) log.WithField("peer", stream.Conn().RemotePeer()).Debug("Peer has sent a goodbye message") // closes all streams with the peer - return r.p2p.Disconnect(stream.Conn().RemotePeer()) + return s.p2p.Disconnect(stream.Conn().RemotePeer()) } -func (r *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id peer.ID) error { - if err := r.sendGoodByeMessage(ctx, code, id); err != nil { +func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id peer.ID) error { + if err := s.sendGoodByeMessage(ctx, code, id); err != nil { log.WithFields(logrus.Fields{ "error": err, "peer": id, }).Debug("Could not send goodbye message to peer") } - if err := r.p2p.Disconnect(id); err != nil { + if err := s.p2p.Disconnect(id); err != nil { return err } return nil } -func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error { +func (s *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - stream, err := r.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id) + stream, err := s.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id) if err != nil { return err } @@ -79,8 +79,8 @@ func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.I } // sends a goodbye message for a generic error -func (r *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error { - return r.sendGoodByeMessage(ctx, codeGenericError, id) +func (s *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error { + return s.sendGoodByeMessage(ctx, codeGenericError, id) } func goodbyeMessage(num uint64) string { diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index a174923c5f..786cadbff9 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -12,7 +12,7 @@ import ( ) // metaDataHandler reads the incoming metadata rpc request from the peer. -func (r *Service) metaDataHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) metaDataHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { defer func() { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") @@ -25,15 +25,15 @@ func (r *Service) metaDataHandler(ctx context.Context, msg interface{}, stream l if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { return err } - _, err := r.p2p.Encoding().EncodeWithLength(stream, r.p2p.Metadata()) + _, err := s.p2p.Encoding().EncodeWithLength(stream, s.p2p.Metadata()) return err } -func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.MetaData, error) { +func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.MetaData, error) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - stream, err := r.p2p.Send(ctx, new(interface{}), p2p.RPCMetaDataTopic, id) + stream, err := s.p2p.Send(ctx, new(interface{}), p2p.RPCMetaDataTopic, id) if err != nil { return nil, err } @@ -45,16 +45,16 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta log.WithError(err).Errorf("Failed to reset stream for protocol %s", stream.Protocol()) } }() - code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) + code, errMsg, err := ReadStatusCode(stream, s.p2p.Encoding()) if err != nil { return nil, err } if code != 0 { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) return nil, errors.New(errMsg) } msg := new(pb.MetaData) - if err := r.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { return nil, err } return msg, nil diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 03d269cc89..b25ac7e87f 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -12,7 +12,7 @@ import ( ) // pingHandler reads the incoming ping rpc message from the peer. -func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { setRPCStreamDeadlines(stream) m, ok := msg.(*uint64) @@ -22,7 +22,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2 } return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg) } - valid, err := r.validateSequenceNum(*m, stream.Conn().RemotePeer()) + valid, err := s.validateSequenceNum(*m, stream.Conn().RemotePeer()) if err != nil { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") @@ -35,7 +35,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2 } return err } - if _, err := r.p2p.Encoding().EncodeWithLength(stream, r.p2p.MetadataSeq()); err != nil { + if _, err := s.p2p.Encoding().EncodeWithLength(stream, s.p2p.MetadataSeq()); err != nil { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") } @@ -60,24 +60,24 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2 // New context so the calling function doesn't cancel on us. ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout) defer cancel() - md, err := r.sendMetaDataRequest(ctx, stream.Conn().RemotePeer()) + md, err := s.sendMetaDataRequest(ctx, stream.Conn().RemotePeer()) if err != nil { log.WithField("peer", stream.Conn().RemotePeer()).WithError(err).Debug("Failed to send metadata request") return } // update metadata if there is no error - r.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md) + s.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md) }() return nil } -func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error { +func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - metadataSeq := r.p2p.MetadataSeq() - stream, err := r.p2p.Send(ctx, &metadataSeq, p2p.RPCPingTopic, id) + metadataSeq := s.p2p.MetadataSeq() + stream, err := s.p2p.Send(ctx, &metadataSeq, p2p.RPCPingTopic, id) if err != nil { return err } @@ -87,40 +87,40 @@ func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error { } }() - code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) + code, errMsg, err := ReadStatusCode(stream, s.p2p.Encoding()) if err != nil { return err } if code != 0 { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) return errors.New(errMsg) } msg := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { return err } - valid, err := r.validateSequenceNum(*msg, stream.Conn().RemotePeer()) + valid, err := s.validateSequenceNum(*msg, stream.Conn().RemotePeer()) if err != nil { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) return err } if valid { return nil } - md, err := r.sendMetaDataRequest(ctx, stream.Conn().RemotePeer()) + md, err := s.sendMetaDataRequest(ctx, stream.Conn().RemotePeer()) if err != nil { // do not increment bad responses, as its // already done in the request method. return err } - r.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md) + s.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md) return nil } // validates the peer's sequence number. -func (r *Service) validateSequenceNum(seq uint64, id peer.ID) (bool, error) { - md, err := r.p2p.Peers().Metadata(id) +func (s *Service) validateSequenceNum(seq uint64, id peer.ID) (bool, error) { + md, err := s.p2p.Peers().Metadata(id) if err != nil { return false, err } diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index fee76ebe1e..5fcfb821f3 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -21,28 +21,28 @@ import ( ) // maintainPeerStatuses by infrequently polling peers for their latest status. -func (r *Service) maintainPeerStatuses() { +func (s *Service) maintainPeerStatuses() { // Run twice per epoch. interval := time.Duration(params.BeaconConfig().SecondsPerSlot*params.BeaconConfig().SlotsPerEpoch/2) * time.Second - runutil.RunEvery(r.ctx, interval, func() { - for _, pid := range r.p2p.Peers().Connected() { + runutil.RunEvery(s.ctx, interval, func() { + for _, pid := range s.p2p.Peers().Connected() { go func(id peer.ID) { - if r.p2p.Peers().IsBad(id) { - if err := r.sendGoodByeAndDisconnect(r.ctx, codeGenericError, id); err != nil { + if s.p2p.Peers().IsBad(id) { + if err := s.sendGoodByeAndDisconnect(s.ctx, codeGenericError, id); err != nil { log.Errorf("Error when disconnecting with bad peer: %v", err) } return } // If the status hasn't been updated in the recent interval time. - lastUpdated, err := r.p2p.Peers().ChainStateLastUpdated(id) + lastUpdated, err := s.p2p.Peers().ChainStateLastUpdated(id) if err != nil { // Peer has vanished; nothing to do. return } if roughtime.Now().After(lastUpdated.Add(interval)) { - if err := r.reValidatePeer(r.ctx, id); err != nil { + if err := s.reValidatePeer(s.ctx, id); err != nil { log.WithField("peer", id).WithError(err).Error("Failed to revalidate peer") - r.p2p.Peers().IncrementBadResponses(id) + s.p2p.Peers().IncrementBadResponses(id) } } }(pid) @@ -52,23 +52,23 @@ func (r *Service) maintainPeerStatuses() { // resyncIfBehind checks periodically to see if we are in normal sync but have fallen behind our peers by more than an epoch, // in which case we attempt a resync using the initial sync method to catch up. -func (r *Service) resyncIfBehind() { +func (s *Service) resyncIfBehind() { millisecondsPerEpoch := params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch * 1000 // Run sixteen times per epoch. interval := time.Duration(int64(millisecondsPerEpoch)/16) * time.Millisecond - runutil.RunEvery(r.ctx, interval, func() { - if r.shouldReSync() { - syncedEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()) - _, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, syncedEpoch) - if helpers.StartSlot(highestEpoch) > r.chain.HeadSlot() { + runutil.RunEvery(s.ctx, interval, func() { + if s.shouldReSync() { + syncedEpoch := helpers.SlotToEpoch(s.chain.HeadSlot()) + _, highestEpoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, syncedEpoch) + if helpers.StartSlot(highestEpoch) > s.chain.HeadSlot() { log.WithFields(logrus.Fields{ - "currentEpoch": helpers.SlotToEpoch(r.chain.CurrentSlot()), + "currentEpoch": helpers.SlotToEpoch(s.chain.CurrentSlot()), "syncedEpoch": syncedEpoch, "peersEpoch": highestEpoch, }).Info("Fallen behind peers; reverting to initial sync to catch up") numberOfTimesResyncedCounter.Inc() - r.clearPendingSlots() - if err := r.initialSync.Resync(); err != nil { + s.clearPendingSlots() + if err := s.initialSync.Resync(); err != nil { log.Errorf("Could not resync chain: %v", err) } } @@ -77,38 +77,38 @@ func (r *Service) resyncIfBehind() { } // shouldReSync returns true if the node is not syncing and falls behind two epochs. -func (r *Service) shouldReSync() bool { - syncedEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()) - currentEpoch := helpers.SlotToEpoch(r.chain.CurrentSlot()) +func (s *Service) shouldReSync() bool { + syncedEpoch := helpers.SlotToEpoch(s.chain.HeadSlot()) + currentEpoch := helpers.SlotToEpoch(s.chain.CurrentSlot()) prevEpoch := uint64(0) if currentEpoch > 1 { prevEpoch = currentEpoch - 1 } - return r.initialSync != nil && !r.initialSync.Syncing() && syncedEpoch < prevEpoch + return s.initialSync != nil && !s.initialSync.Syncing() && syncedEpoch < prevEpoch } // sendRPCStatusRequest for a given topic with an expected protobuf message type. -func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { +func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - headRoot, err := r.chain.HeadRoot(ctx) + headRoot, err := s.chain.HeadRoot(ctx) if err != nil { return err } - forkDigest, err := r.forkDigest() + forkDigest, err := s.forkDigest() if err != nil { return err } resp := &pb.Status{ ForkDigest: forkDigest[:], - FinalizedRoot: r.chain.FinalizedCheckpt().Root, - FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, + FinalizedRoot: s.chain.FinalizedCheckpt().Root, + FinalizedEpoch: s.chain.FinalizedCheckpt().Epoch, HeadRoot: headRoot, - HeadSlot: r.chain.HeadSlot(), + HeadSlot: s.chain.HeadSlot(), } - stream, err := r.p2p.Send(ctx, resp, p2p.RPCStatusTopic, id) + stream, err := s.p2p.Send(ctx, resp, p2p.RPCStatusTopic, id) if err != nil { return err } @@ -118,28 +118,28 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { } }() - code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) + code, errMsg, err := ReadStatusCode(stream, s.p2p.Encoding()) if err != nil { return err } if code != 0 { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) return errors.New(errMsg) } msg := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { return err } - r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), msg) + s.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), msg) - err = r.validateStatusMessage(ctx, msg) + err = s.validateStatusMessage(ctx, msg) if err != nil { - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) // Disconnect if on a wrong fork. if err == errWrongForkDigestVersion { - if err := r.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil { + if err := s.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil { return err } } @@ -147,24 +147,24 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { return err } -func (r *Service) reValidatePeer(ctx context.Context, id peer.ID) error { - if err := r.sendRPCStatusRequest(ctx, id); err != nil { +func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error { + if err := s.sendRPCStatusRequest(ctx, id); err != nil { return err } // Do not return an error for ping requests. - if err := r.sendPingRequest(ctx, id); err != nil { + if err := s.sendPingRequest(ctx, id); err != nil { log.WithError(err).Debug("Could not ping peer") } return nil } -func (r *Service) removeDisconnectedPeerStatus(ctx context.Context, pid peer.ID) error { +func (s *Service) removeDisconnectedPeerStatus(ctx context.Context, pid peer.ID) error { return nil } // statusRPCHandler reads the incoming Status RPC from the peer and responds with our version of a status message. // This handler will disconnect any peer that does not match our fork version. -func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { defer func() { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") @@ -179,7 +179,7 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream return errors.New("message is not type *pb.Status") } - if err := r.validateStatusMessage(ctx, m); err != nil { + if err := s.validateStatusMessage(ctx, m); err != nil { log.WithFields(logrus.Fields{ "peer": stream.Conn().RemotePeer(), "error": err}).Debug("Invalid status message from peer") @@ -190,24 +190,24 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream respCode = responseCodeServerError case errWrongForkDigestVersion: // Respond with our status and disconnect with the peer. - r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m) - if err := r.respondWithStatus(ctx, stream); err != nil { + s.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m) + if err := s.respondWithStatus(ctx, stream); err != nil { return err } if err := stream.Close(); err != nil { // Close before disconnecting. log.WithError(err).Error("Failed to close stream") } - if err := r.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil { + if err := s.sendGoodByeAndDisconnect(ctx, codeWrongNetwork, stream.Conn().RemotePeer()); err != nil { return err } return nil default: respCode = responseCodeInvalidRequest - r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) + s.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer()) } originalErr := err - resp, err := r.generateErrorResponse(respCode, err.Error()) + resp, err := s.generateErrorResponse(respCode, err.Error()) if err != nil { log.WithError(err).Error("Failed to generate a response error") } else { @@ -222,51 +222,51 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream // Add a short delay to allow the stream to flush before closing the connection. // There is still a chance that the peer won't receive the message. time.Sleep(50 * time.Millisecond) - if err := r.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { + if err := s.p2p.Disconnect(stream.Conn().RemotePeer()); err != nil { log.WithError(err).Error("Failed to disconnect from peer") } return originalErr } - r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m) + s.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m) - return r.respondWithStatus(ctx, stream) + return s.respondWithStatus(ctx, stream) } -func (r *Service) respondWithStatus(ctx context.Context, stream network.Stream) error { - headRoot, err := r.chain.HeadRoot(ctx) +func (s *Service) respondWithStatus(ctx context.Context, stream network.Stream) error { + headRoot, err := s.chain.HeadRoot(ctx) if err != nil { return err } - forkDigest, err := r.forkDigest() + forkDigest, err := s.forkDigest() if err != nil { return err } resp := &pb.Status{ ForkDigest: forkDigest[:], - FinalizedRoot: r.chain.FinalizedCheckpt().Root, - FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch, + FinalizedRoot: s.chain.FinalizedCheckpt().Root, + FinalizedEpoch: s.chain.FinalizedCheckpt().Epoch, HeadRoot: headRoot, - HeadSlot: r.chain.HeadSlot(), + HeadSlot: s.chain.HeadSlot(), } if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { log.WithError(err).Error("Failed to write to stream") } - _, err = r.p2p.Encoding().EncodeWithLength(stream, resp) + _, err = s.p2p.Encoding().EncodeWithLength(stream, resp) return err } -func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status) error { - forkDigest, err := r.forkDigest() +func (s *Service) validateStatusMessage(ctx context.Context, msg *pb.Status) error { + forkDigest, err := s.forkDigest() if err != nil { return err } if !bytes.Equal(forkDigest[:], msg.ForkDigest) { return errWrongForkDigestVersion } - genesis := r.chain.GenesisTime() - finalizedEpoch := r.chain.FinalizedCheckpt().Epoch + genesis := s.chain.GenesisTime() + finalizedEpoch := s.chain.FinalizedCheckpt().Epoch maxEpoch := slotutil.EpochsSinceGenesis(genesis) // It would take a minimum of 2 epochs to finalize a // previous epoch @@ -288,10 +288,10 @@ func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status) err if finalizedAtGenesis && rootIsEqual { return nil } - if !r.db.IsFinalizedBlock(context.Background(), bytesutil.ToBytes32(msg.FinalizedRoot)) { + if !s.db.IsFinalizedBlock(context.Background(), bytesutil.ToBytes32(msg.FinalizedRoot)) { return errInvalidFinalizedRoot } - blk, err := r.db.Block(ctx, bytesutil.ToBytes32(msg.FinalizedRoot)) + blk, err := s.db.Block(ctx, bytesutil.ToBytes32(msg.FinalizedRoot)) if err != nil { return errGeneric } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 1122f7ff3d..382422757b 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -140,45 +140,45 @@ func NewRegularSync(cfg *Config) *Service { } // Start the regular sync service. -func (r *Service) Start() { - if err := r.initCaches(); err != nil { +func (s *Service) Start() { + if err := s.initCaches(); err != nil { panic(err) } - r.p2p.AddConnectionHandler(r.reValidatePeer, r.sendGenericGoodbyeMessage) - r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus) - r.p2p.AddPingMethod(r.sendPingRequest) - r.processPendingBlocksQueue() - r.processPendingAttsQueue() - r.maintainPeerStatuses() - r.resyncIfBehind() + s.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGenericGoodbyeMessage) + s.p2p.AddDisconnectionHandler(s.removeDisconnectedPeerStatus) + s.p2p.AddPingMethod(s.sendPingRequest) + s.processPendingBlocksQueue() + s.processPendingAttsQueue() + s.maintainPeerStatuses() + s.resyncIfBehind() // Update sync metrics. - runutil.RunEvery(r.ctx, time.Second*10, r.updateMetrics) + runutil.RunEvery(s.ctx, time.Second*10, s.updateMetrics) } // Stop the regular sync service. -func (r *Service) Stop() error { +func (s *Service) Stop() error { defer func() { - if r.blocksRateLimiter != nil { - r.blocksRateLimiter.Free() - r.blocksRateLimiter = nil + if s.blocksRateLimiter != nil { + s.blocksRateLimiter.Free() + s.blocksRateLimiter = nil } }() - defer r.cancel() + defer s.cancel() return nil } // Status of the currently running regular sync service. -func (r *Service) Status() error { - if r.chainStarted { - if r.initialSync.Syncing() { +func (s *Service) Status() error { + if s.chainStarted { + if s.initialSync.Syncing() { return errors.New("waiting for initial sync") } // If our head slot is on a previous epoch and our peers are reporting their head block are // in the most recent epoch, then we might be out of sync. - if headEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()); headEpoch+1 < helpers.SlotToEpoch(r.chain.CurrentSlot()) && - headEpoch+1 < r.p2p.Peers().CurrentEpoch() { + if headEpoch := helpers.SlotToEpoch(s.chain.HeadSlot()); headEpoch+1 < helpers.SlotToEpoch(s.chain.CurrentSlot()) && + headEpoch+1 < s.p2p.Peers().CurrentEpoch() { return errors.New("out of sync") } } @@ -187,7 +187,7 @@ func (r *Service) Status() error { // This initializes the caches to update seen beacon objects coming in from the wire // and prevent DoS. -func (r *Service) initCaches() error { +func (s *Service) initCaches() error { blkCache, err := lru.New(seenBlockSize) if err != nil { return err @@ -208,21 +208,21 @@ func (r *Service) initCaches() error { if err != nil { return err } - r.seenBlockCache = blkCache - r.seenAttestationCache = attCache - r.seenExitCache = exitCache - r.seenAttesterSlashingCache = attesterSlashingCache - r.seenProposerSlashingCache = proposerSlashingCache + s.seenBlockCache = blkCache + s.seenAttestationCache = attCache + s.seenExitCache = exitCache + s.seenAttesterSlashingCache = attesterSlashingCache + s.seenProposerSlashingCache = proposerSlashingCache return nil } -func (r *Service) registerHandlers() { +func (s *Service) registerHandlers() { // Wait until chain start. stateChannel := make(chan *feed.Event, 1) - stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel) + stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) defer stateSub.Unsubscribe() - for r.chainStarted == false { + for s.chainStarted == false { select { case event := <-stateChannel: if event.Type == statefeed.Initialized { @@ -234,16 +234,16 @@ func (r *Service) registerHandlers() { log.WithField("starttime", data.StartTime).Debug("Received state initialized event") // Register respective rpc and pubsub handlers at state initialized event. - r.registerRPCHandlers() - r.registerSubscribers() + s.registerRPCHandlers() + s.registerSubscribers() if data.StartTime.After(roughtime.Now()) { stateSub.Unsubscribe() time.Sleep(roughtime.Until(data.StartTime)) } - r.chainStarted = true + s.chainStarted = true } - case <-r.ctx.Done(): + case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") return case err := <-stateSub.Err(): diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 9c9a76a148..a83253b6f7 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -29,8 +29,8 @@ const pubsubMessageTimeout = 30 * time.Second type subHandler func(context.Context, proto.Message) error // noopValidator is a no-op that only decodes the message, but does not check its contents. -func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - m, err := r.decodePubsubMessage(msg) +func (s *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") return pubsub.ValidationReject @@ -40,68 +40,68 @@ func (r *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Mess } // Register PubSub subscribers -func (r *Service) registerSubscribers() { - r.subscribe( +func (s *Service) registerSubscribers() { + s.subscribe( "/eth2/%x/beacon_block", - r.validateBeaconBlockPubSub, - r.beaconBlockSubscriber, + s.validateBeaconBlockPubSub, + s.beaconBlockSubscriber, ) - r.subscribe( + s.subscribe( "/eth2/%x/beacon_aggregate_and_proof", - r.validateAggregateAndProof, - r.beaconAggregateProofSubscriber, + s.validateAggregateAndProof, + s.beaconAggregateProofSubscriber, ) - r.subscribe( + s.subscribe( "/eth2/%x/voluntary_exit", - r.validateVoluntaryExit, - r.voluntaryExitSubscriber, + s.validateVoluntaryExit, + s.voluntaryExitSubscriber, ) - r.subscribe( + s.subscribe( "/eth2/%x/proposer_slashing", - r.validateProposerSlashing, - r.proposerSlashingSubscriber, + s.validateProposerSlashing, + s.proposerSlashingSubscriber, ) - r.subscribe( + s.subscribe( "/eth2/%x/attester_slashing", - r.validateAttesterSlashing, - r.attesterSlashingSubscriber, + s.validateAttesterSlashing, + s.attesterSlashingSubscriber, ) if featureconfig.Get().DisableDynamicCommitteeSubnets { for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { - r.subscribe( + s.subscribe( fmt.Sprintf("/eth2/%%x/beacon_attestation_%d", i), - r.validateCommitteeIndexBeaconAttestation, /* validator */ - r.committeeIndexBeaconAttestationSubscriber, /* message handler */ + s.validateCommitteeIndexBeaconAttestation, /* validator */ + s.committeeIndexBeaconAttestationSubscriber, /* message handler */ ) } } else { - r.subscribeDynamicWithSubnets( + s.subscribeDynamicWithSubnets( "/eth2/%x/beacon_attestation_%d", - r.validateCommitteeIndexBeaconAttestation, /* validator */ - r.committeeIndexBeaconAttestationSubscriber, /* message handler */ + s.validateCommitteeIndexBeaconAttestation, /* validator */ + s.committeeIndexBeaconAttestationSubscriber, /* message handler */ ) } } // subscribe to a given topic with a given validator and subscription handler. // The base protobuf message is used to initialize new messages for decoding. -func (r *Service) subscribe(topic string, validator pubsub.ValidatorEx, handle subHandler) *pubsub.Subscription { +func (s *Service) subscribe(topic string, validator pubsub.ValidatorEx, handle subHandler) *pubsub.Subscription { base := p2p.GossipTopicMappings[topic] if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) } - return r.subscribeWithBase(base, r.addDigestToTopic(topic), validator, handle) + return s.subscribeWithBase(base, s.addDigestToTopic(topic), validator, handle) } -func (r *Service) subscribeWithBase(base proto.Message, topic string, validator pubsub.ValidatorEx, handle subHandler) *pubsub.Subscription { - topic += r.p2p.Encoding().ProtocolSuffix() +func (s *Service) subscribeWithBase(base proto.Message, topic string, validator pubsub.ValidatorEx, handle subHandler) *pubsub.Subscription { + topic += s.p2p.Encoding().ProtocolSuffix() log := log.WithField("topic", topic) - if err := r.p2p.PubSub().RegisterTopicValidator(wrapAndReportValidation(topic, validator)); err != nil { + if err := s.p2p.PubSub().RegisterTopicValidator(wrapAndReportValidation(topic, validator)); err != nil { log.WithError(err).Error("Failed to register validator") } - sub, err := r.p2p.PubSub().Subscribe(topic) + sub, err := s.p2p.PubSub().Subscribe(topic) if err != nil { // Any error subscribing to a PubSub topic would be the result of a misconfiguration of // libp2p PubSub library. This should not happen at normal runtime, unless the config @@ -144,14 +144,14 @@ func (r *Service) subscribeWithBase(base proto.Message, topic string, validator // The main message loop for receiving incoming messages from this subscription. messageLoop := func() { for { - msg, err := sub.Next(r.ctx) + msg, err := sub.Next(s.ctx) if err != nil { // This should only happen when the context is cancelled or subscription is cancelled. log.WithError(err).Warn("Subscription next failed") return } - if msg.ReceivedFrom == r.p2p.PeerID() { + if msg.ReceivedFrom == s.p2p.PeerID() { continue } @@ -182,7 +182,7 @@ func wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (string, pubsub // subscribe to a dynamically changing list of subnets. This method expects a fmt compatible // string for the topic name and the list of subnets for subscribed topics that should be // maintained. -func (r *Service) subscribeDynamicWithSubnets( +func (s *Service) subscribeDynamicWithSubnets( topicFormat string, validate pubsub.ValidatorEx, handle subHandler, @@ -191,43 +191,43 @@ func (r *Service) subscribeDynamicWithSubnets( if base == nil { log.Fatalf("%s is not mapped to any message in GossipTopicMappings", topicFormat) } - digest, err := r.forkDigest() + digest, err := s.forkDigest() if err != nil { log.WithError(err).Fatal("Could not compute fork digest") } subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) - genesis := r.chain.GenesisTime() + genesis := s.chain.GenesisTime() ticker := slotutil.GetSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) go func() { for { select { - case <-r.ctx.Done(): + case <-s.ctx.Done(): ticker.Done() return case currentSlot := <-ticker.C(): - if r.chainStarted && r.initialSync.Syncing() { + if s.chainStarted && s.initialSync.Syncing() { continue } // Persistent subscriptions from validators - persistentSubs := r.persistentSubnetIndices() + persistentSubs := s.persistentSubnetIndices() // Update desired topic indices for aggregator - wantedSubs := r.aggregatorSubnetIndices(currentSlot) + wantedSubs := s.aggregatorSubnetIndices(currentSlot) // Combine subscriptions to get all requested subscriptions wantedSubs = sliceutil.SetUint64(append(persistentSubs, wantedSubs...)) // Resize as appropriate. - r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) + s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) // subscribe desired aggregator subnets. for _, idx := range wantedSubs { - r.subscribeAggregatorSubnet(subscriptions, idx, base, digest, validate, handle) + s.subscribeAggregatorSubnet(subscriptions, idx, base, digest, validate, handle) } // find desired subs for attesters - attesterSubs := r.attesterSubnetIndices(currentSlot) + attesterSubs := s.attesterSubnetIndices(currentSlot) for _, idx := range attesterSubs { - r.lookupAttesterSubnets(digest, idx) + s.lookupAttesterSubnets(digest, idx) } } } @@ -235,7 +235,7 @@ func (r *Service) subscribeDynamicWithSubnets( } // revalidate that our currently connected subnets are valid. -func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, +func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, wantedSubs []uint64, topicFormat string, digest [4]byte) { for k, v := range subscriptions { var wanted bool @@ -247,8 +247,8 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc } if !wanted && v != nil { v.Cancel() - fullTopic := fmt.Sprintf(topicFormat, digest, k) + r.p2p.Encoding().ProtocolSuffix() - if err := r.p2p.PubSub().UnregisterTopicValidator(fullTopic); err != nil { + fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.p2p.Encoding().ProtocolSuffix() + if err := s.p2p.PubSub().UnregisterTopicValidator(fullTopic); err != nil { log.WithError(err).Error("Failed to unregister topic validator") } delete(subscriptions, k) @@ -257,7 +257,7 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc } // subscribe missing subnets for our aggregators. -func (r *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64, +func (s *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64, base proto.Message, digest [4]byte, validate pubsub.ValidatorEx, handle subHandler) { // do not subscribe if we have no peers in the same // subnet @@ -265,13 +265,13 @@ func (r *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Sub subnetTopic := fmt.Sprintf(topic, digest, idx) // check if subscription exists and if not subscribe the relevant subnet. if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle) + subscriptions[idx] = s.subscribeWithBase(base, subnetTopic, validate, handle) } - if !r.validPeersExist(subnetTopic, idx) { + if !s.validPeersExist(subnetTopic, idx) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) go func(idx uint64) { - _, err := r.p2p.FindPeersWithSubnet(idx) + _, err := s.p2p.FindPeersWithSubnet(idx) if err != nil { log.Errorf("Could not search for peers: %v", err) return @@ -282,15 +282,15 @@ func (r *Service) subscribeAggregatorSubnet(subscriptions map[uint64]*pubsub.Sub } // lookup peers for attester specific subnets. -func (r *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { +func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) - if !r.validPeersExist(subnetTopic, idx) { + if !s.validPeersExist(subnetTopic, idx) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", idx) go func(idx uint64) { // perform a search for peers with the desired committee index. - _, err := r.p2p.FindPeersWithSubnet(idx) + _, err := s.p2p.FindPeersWithSubnet(idx) if err != nil { log.Errorf("Could not search for peers: %v", err) return @@ -300,24 +300,24 @@ func (r *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { } // find if we have peers who are subscribed to the same subnet -func (r *Service) validPeersExist(subnetTopic string, idx uint64) bool { - numOfPeers := r.p2p.PubSub().ListPeers(subnetTopic + r.p2p.Encoding().ProtocolSuffix()) - return len(r.p2p.Peers().SubscribedToSubnet(idx)) > 0 || len(numOfPeers) > 0 +func (s *Service) validPeersExist(subnetTopic string, idx uint64) bool { + numOfPeers := s.p2p.PubSub().ListPeers(subnetTopic + s.p2p.Encoding().ProtocolSuffix()) + return len(s.p2p.Peers().SubscribedToSubnet(idx)) > 0 || len(numOfPeers) > 0 } // Add fork digest to topic. -func (r *Service) addDigestToTopic(topic string) string { +func (s *Service) addDigestToTopic(topic string) string { if !strings.Contains(topic, "%x") { log.Fatal("Topic does not have appropriate formatter for digest") } - digest, err := r.forkDigest() + digest, err := s.forkDigest() if err != nil { log.WithError(err).Fatal("Could not compute fork digest") } return fmt.Sprintf(topic, digest) } -func (r *Service) forkDigest() ([4]byte, error) { - genRoot := r.chain.GenesisValidatorRoot() - return p2putils.CreateForkDigest(r.chain.GenesisTime(), genRoot[:]) +func (s *Service) forkDigest() ([4]byte, error) { + genRoot := s.chain.GenesisValidatorRoot() + return p2putils.CreateForkDigest(s.chain.GenesisTime(), genRoot[:]) } diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 20bf83a536..79dd8b3f46 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -14,7 +14,7 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. -func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { a, ok := msg.(*ethpb.SignedAggregateAttestationAndProof) if !ok { return fmt.Errorf("message was not type *eth.SignedAggregateAttestationAndProof, type=%T", msg) @@ -26,7 +26,7 @@ func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto. // Broadcast the aggregated attestation on a feed to notify other services in the beacon node // of a received aggregated attestation. - r.attestationNotifier.OperationFeed().Send(&feed.Event{ + s.attestationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.AggregatedAttReceived, Data: &operation.AggregatedAttReceivedData{ Attestation: a.Message, @@ -35,8 +35,8 @@ func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto. // An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet. if !helpers.IsAggregated(a.Message.Aggregate) { - return r.attPool.SaveUnaggregatedAttestation(a.Message.Aggregate) + return s.attPool.SaveUnaggregatedAttestation(a.Message.Aggregate) } - return r.attPool.SaveAggregatedAttestation(a.Message.Aggregate) + return s.attPool.SaveAggregatedAttestation(a.Message.Aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_attestation.go b/beacon-chain/sync/subscriber_beacon_attestation.go index e0b8be89aa..98341f7c9f 100644 --- a/beacon-chain/sync/subscriber_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_beacon_attestation.go @@ -15,7 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/sliceutil" ) -func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, msg proto.Message) error { a, ok := msg.(*eth.Attestation) if !ok { return fmt.Errorf("message was not type *eth.Attestation, type=%T", msg) @@ -24,9 +24,9 @@ func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, if a.Data == nil { return errors.New("nil attestation") } - r.setSeenCommitteeIndicesSlot(a.Data.Slot, a.Data.CommitteeIndex, a.AggregationBits) + s.setSeenCommitteeIndicesSlot(a.Data.Slot, a.Data.CommitteeIndex, a.AggregationBits) - exists, err := r.attPool.HasAggregatedAttestation(a) + exists, err := s.attPool.HasAggregatedAttestation(a) if err != nil { return errors.Wrap(err, "failed to determine if attestation pool has this atttestation") } @@ -36,25 +36,25 @@ func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. - r.attestationNotifier.OperationFeed().Send(&feed.Event{ + s.attestationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: a, }, }) - return r.attPool.SaveUnaggregatedAttestation(a) + return s.attPool.SaveUnaggregatedAttestation(a) } -func (r *Service) subnetCount() int { +func (s *Service) subnetCount() int { return int(params.BeaconNetworkConfig().AttestationSubnetCount) } -func (r *Service) persistentSubnetIndices() []uint64 { +func (s *Service) persistentSubnetIndices() []uint64 { return cache.SubnetIDs.GetAllSubnets() } -func (r *Service) aggregatorSubnetIndices(currentSlot uint64) []uint64 { +func (s *Service) aggregatorSubnetIndices(currentSlot uint64) []uint64 { endEpoch := helpers.SlotToEpoch(currentSlot) + 1 endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch commIds := []uint64{} @@ -64,7 +64,7 @@ func (r *Service) aggregatorSubnetIndices(currentSlot uint64) []uint64 { return sliceutil.SetUint64(commIds) } -func (r *Service) attesterSubnetIndices(currentSlot uint64) []uint64 { +func (s *Service) attesterSubnetIndices(currentSlot uint64) []uint64 { endEpoch := helpers.SlotToEpoch(currentSlot) + 1 endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch commIds := []uint64{} diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 18ef600db9..21f5c53456 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -11,7 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" ) -func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error { signed, ok := msg.(*ethpb.SignedBeaconBlock) if !ok { return errors.New("message is not type *ethpb.SignedBeaconBlock") @@ -21,7 +21,7 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return errors.New("nil block") } - r.setSeenBlockIndexSlot(signed.Block.Slot, signed.Block.ProposerIndex) + s.setSeenBlockIndexSlot(signed.Block.Slot, signed.Block.ProposerIndex) block := signed.Block @@ -30,12 +30,12 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return err } - if err := r.chain.ReceiveBlockNoPubsub(ctx, signed, root); err != nil { + if err := s.chain.ReceiveBlockNoPubsub(ctx, signed, root); err != nil { interop.WriteBlockToDisk(signed, true /*failed*/) } // Delete attestations from the block in the pool to avoid inclusion in future block. - if err := r.deleteAttsInPool(block.Body.Attestations); err != nil { + if err := s.deleteAttsInPool(block.Body.Attestations); err != nil { log.Errorf("Could not delete attestations in pool: %v", err) return nil } @@ -45,15 +45,15 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) // The input attestations are seen by the network, this deletes them from pool // so proposers don't include them in a block for the future. -func (r *Service) deleteAttsInPool(atts []*ethpb.Attestation) error { +func (s *Service) deleteAttsInPool(atts []*ethpb.Attestation) error { for _, att := range atts { if helpers.IsAggregated(att) { - if err := r.attPool.DeleteAggregatedAttestation(att); err != nil { + if err := s.attPool.DeleteAggregatedAttestation(att); err != nil { return err } } else { // Ideally there's shouldn't be any unaggregated attestation in the block. - if err := r.attPool.DeleteUnaggregatedAttestation(att); err != nil { + if err := s.attPool.DeleteUnaggregatedAttestation(att); err != nil { return err } } diff --git a/beacon-chain/sync/subscriber_handlers.go b/beacon-chain/sync/subscriber_handlers.go index cc54f0ade6..5f5685f4f0 100644 --- a/beacon-chain/sync/subscriber_handlers.go +++ b/beacon-chain/sync/subscriber_handlers.go @@ -9,7 +9,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" ) -func (r *Service) voluntaryExitSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) voluntaryExitSubscriber(ctx context.Context, msg proto.Message) error { ve, ok := msg.(*ethpb.SignedVoluntaryExit) if !ok { return fmt.Errorf("wrong type, expected: *ethpb.SignedVoluntaryExit got: %T", msg) @@ -18,17 +18,17 @@ func (r *Service) voluntaryExitSubscriber(ctx context.Context, msg proto.Message if ve.Exit == nil { return errors.New("exit can't be nil") } - r.setExitIndexSeen(ve.Exit.ValidatorIndex) + s.setExitIndexSeen(ve.Exit.ValidatorIndex) - s, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return err } - r.exitPool.InsertVoluntaryExit(ctx, s, ve) + s.exitPool.InsertVoluntaryExit(ctx, headState, ve) return nil } -func (r *Service) attesterSlashingSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) attesterSlashingSubscriber(ctx context.Context, msg proto.Message) error { aSlashing, ok := msg.(*ethpb.AttesterSlashing) if !ok { return fmt.Errorf("wrong type, expected: *ethpb.AttesterSlashing got: %T", msg) @@ -37,19 +37,19 @@ func (r *Service) attesterSlashingSubscriber(ctx context.Context, msg proto.Mess aSlashing1IsNil := aSlashing == nil || aSlashing.Attestation_1 == nil || aSlashing.Attestation_1.AttestingIndices == nil aSlashing2IsNil := aSlashing == nil || aSlashing.Attestation_2 == nil || aSlashing.Attestation_2.AttestingIndices == nil if !aSlashing1IsNil && !aSlashing2IsNil { - headState, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return err } - if err := r.slashingPool.InsertAttesterSlashing(ctx, headState, aSlashing); err != nil { + if err := s.slashingPool.InsertAttesterSlashing(ctx, headState, aSlashing); err != nil { return errors.Wrap(err, "could not insert attester slashing into pool") } - r.setAttesterSlashingIndicesSeen(aSlashing.Attestation_1.AttestingIndices, aSlashing.Attestation_2.AttestingIndices) + s.setAttesterSlashingIndicesSeen(aSlashing.Attestation_1.AttestingIndices, aSlashing.Attestation_2.AttestingIndices) } return nil } -func (r *Service) proposerSlashingSubscriber(ctx context.Context, msg proto.Message) error { +func (s *Service) proposerSlashingSubscriber(ctx context.Context, msg proto.Message) error { pSlashing, ok := msg.(*ethpb.ProposerSlashing) if !ok { return fmt.Errorf("wrong type, expected: *ethpb.ProposerSlashing got: %T", msg) @@ -58,14 +58,14 @@ func (r *Service) proposerSlashingSubscriber(ctx context.Context, msg proto.Mess header1IsNil := pSlashing == nil || pSlashing.Header_1 == nil || pSlashing.Header_1.Header == nil header2IsNil := pSlashing == nil || pSlashing.Header_2 == nil || pSlashing.Header_2.Header == nil if !header1IsNil && !header2IsNil { - headState, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return err } - if err := r.slashingPool.InsertProposerSlashing(ctx, headState, pSlashing); err != nil { + if err := s.slashingPool.InsertProposerSlashing(ctx, headState, pSlashing); err != nil { return errors.Wrap(err, "could not insert proposer slashing into pool") } - r.setProposerSlashingIndexSeen(pSlashing.Header_1.Header.ProposerIndex) + s.setProposerSlashingIndexSeen(pSlashing.Header_1.Header.ProposerIndex) } return nil } diff --git a/beacon-chain/sync/utils.go b/beacon-chain/sync/utils.go index d167889d7d..712081f13e 100644 --- a/beacon-chain/sync/utils.go +++ b/beacon-chain/sync/utils.go @@ -28,7 +28,7 @@ func (s sortedObj) Len() int { } // removes duplicates from provided blocks and roots. -func (r *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) { +func (s *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) { // Remove duplicate blocks received rootMap := make(map[[32]byte]bool) newBlks := make([]*ethpb.SignedBeaconBlock, 0, len(blks)) @@ -46,7 +46,7 @@ func (r *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][ // sort the provided blocks and roots in ascending order. This method assumes that the size of // block slice and root slice is equal. -func (r *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) { +func (s *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) { obj := sortedObj{ blks: blks, roots: roots, diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 819064c9b9..0100b57b03 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -24,8 +24,8 @@ import ( // validateAggregateAndProof verifies the aggregated signature and the selection proof is valid before forwarding to the // network and downstream services. -func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - if pid == r.p2p.PeerID() { +func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + if pid == s.p2p.PeerID() { return pubsub.ValidationAccept } @@ -34,11 +34,11 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms // To process the following it requires the recent blocks to be present in the database, so we'll skip // validating or processing aggregated attestations until fully synced. - if r.initialSync.Syncing() { + if s.initialSync.Syncing() { return pubsub.ValidationIgnore } - raw, err := r.decodePubsubMessage(msg) + raw, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") traceutil.AnnotateError(span, err) @@ -53,12 +53,12 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return pubsub.ValidationReject } // Verify this is the first aggregate received from the aggregator with index and slot. - if r.hasSeenAggregatorIndexEpoch(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) { + if s.hasSeenAggregatorIndexEpoch(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) { return pubsub.ValidationIgnore } // Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally. - seen, err := r.attPool.HasAggregatedAttestation(m.Message.Aggregate) + seen, err := s.attPool.HasAggregatedAttestation(m.Message.Aggregate) if err != nil { traceutil.AnnotateError(span, err) return pubsub.ValidationIgnore @@ -66,41 +66,41 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms if seen { return pubsub.ValidationIgnore } - if !r.validateBlockInAttestation(ctx, m) { + if !s.validateBlockInAttestation(ctx, m) { return pubsub.ValidationIgnore } - validationRes := r.validateAggregatedAtt(ctx, m) + validationRes := s.validateAggregatedAtt(ctx, m) if validationRes != pubsub.ValidationAccept { return validationRes } - r.setAggregatorIndexEpochSeen(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) + s.setAggregatorIndexEpochSeen(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) msg.ValidatorData = m return pubsub.ValidationAccept } -func (r *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.SignedAggregateAttestationAndProof) pubsub.ValidationResult { +func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.SignedAggregateAttestationAndProof) pubsub.ValidationResult { ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt") defer span.End() attSlot := signed.Message.Aggregate.Data.Slot - if err := validateAggregateAttTime(attSlot, r.chain.GenesisTime()); err != nil { + if err := validateAggregateAttTime(attSlot, s.chain.GenesisTime()); err != nil { traceutil.AnnotateError(span, err) return pubsub.ValidationIgnore } - s, err := r.chain.AttestationPreState(ctx, signed.Message.Aggregate) + bs, err := s.chain.AttestationPreState(ctx, signed.Message.Aggregate) if err != nil { traceutil.AnnotateError(span, err) return pubsub.ValidationIgnore } // Only advance state if different epoch as the committee can only change on an epoch transition. - if helpers.SlotToEpoch(attSlot) > helpers.SlotToEpoch(s.Slot()) { - s, err = state.ProcessSlots(ctx, s, helpers.StartSlot(helpers.SlotToEpoch(attSlot))) + if helpers.SlotToEpoch(attSlot) > helpers.SlotToEpoch(bs.Slot()) { + bs, err = state.ProcessSlots(ctx, bs, helpers.StartSlot(helpers.SlotToEpoch(attSlot))) if err != nil { traceutil.AnnotateError(span, err) return pubsub.ValidationIgnore @@ -108,26 +108,26 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe } // Verify validator index is within the beacon committee. - if err := validateIndexInCommittee(ctx, s, signed.Message.Aggregate, signed.Message.AggregatorIndex); err != nil { + if err := validateIndexInCommittee(ctx, bs, signed.Message.Aggregate, signed.Message.AggregatorIndex); err != nil { traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate index in committee")) return pubsub.ValidationReject } // Verify selection proof reflects to the right validator and signature is valid. - if err := validateSelection(ctx, s, signed.Message.Aggregate.Data, signed.Message.AggregatorIndex, signed.Message.SelectionProof); err != nil { + if err := validateSelection(ctx, bs, signed.Message.Aggregate.Data, signed.Message.AggregatorIndex, signed.Message.SelectionProof); err != nil { traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate selection for validator %d", signed.Message.AggregatorIndex)) return pubsub.ValidationReject } // Verify the aggregator's signature is valid. - if err := validateAggregatorSignature(s, signed); err != nil { + if err := validateAggregatorSignature(bs, signed); err != nil { traceutil.AnnotateError(span, errors.Wrapf(err, "Could not verify aggregator signature %d", signed.Message.AggregatorIndex)) return pubsub.ValidationReject } // Verify aggregated attestation has a valid signature. if !featureconfig.Get().DisableStrictAttestationPubsubVerification { - if err := blocks.VerifyAttestation(ctx, s, signed.Message.Aggregate); err != nil { + if err := blocks.VerifyAttestation(ctx, bs, signed.Message.Aggregate); err != nil { traceutil.AnnotateError(span, err) return pubsub.ValidationReject } @@ -136,44 +136,44 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe return pubsub.ValidationAccept } -func (r *Service) validateBlockInAttestation(ctx context.Context, s *ethpb.SignedAggregateAttestationAndProof) bool { - a := s.Message +func (s *Service) validateBlockInAttestation(ctx context.Context, satt *ethpb.SignedAggregateAttestationAndProof) bool { + a := satt.Message // Verify the block being voted and the processed state is in DB. The block should have passed validation if it's in the DB. blockRoot := bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot) - hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, blockRoot) || r.stateSummaryCache.Has(blockRoot) - hasState := r.db.HasState(ctx, blockRoot) || hasStateSummary - hasBlock := r.db.HasBlock(ctx, blockRoot) + hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, blockRoot) || s.stateSummaryCache.Has(blockRoot) + hasState := s.db.HasState(ctx, blockRoot) || hasStateSummary + hasBlock := s.db.HasBlock(ctx, blockRoot) if !(hasState && hasBlock) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. - r.savePendingAtt(s) + s.savePendingAtt(satt) return false } return true } // Returns true if the node has received aggregate for the aggregator with index and target epoch. -func (r *Service) hasSeenAggregatorIndexEpoch(epoch uint64, aggregatorIndex uint64) bool { - r.seenAttestationLock.RLock() - defer r.seenAttestationLock.RUnlock() +func (s *Service) hasSeenAggregatorIndexEpoch(epoch uint64, aggregatorIndex uint64) bool { + s.seenAttestationLock.RLock() + defer s.seenAttestationLock.RUnlock() b := append(bytesutil.Bytes32(epoch), bytesutil.Bytes32(aggregatorIndex)...) - _, seen := r.seenAttestationCache.Get(string(b)) + _, seen := s.seenAttestationCache.Get(string(b)) return seen } // Set aggregate's aggregator index target epoch as seen. -func (r *Service) setAggregatorIndexEpochSeen(epoch uint64, aggregatorIndex uint64) { - r.seenAttestationLock.Lock() - defer r.seenAttestationLock.Unlock() +func (s *Service) setAggregatorIndexEpochSeen(epoch uint64, aggregatorIndex uint64) { + s.seenAttestationLock.Lock() + defer s.seenAttestationLock.Unlock() b := append(bytesutil.Bytes32(epoch), bytesutil.Bytes32(aggregatorIndex)...) - r.seenAttestationCache.Add(string(b), true) + s.seenAttestationCache.Add(string(b), true) } // This validates the aggregator's index in state is within the beacon committee. -func validateIndexInCommittee(ctx context.Context, s *stateTrie.BeaconState, a *ethpb.Attestation, validatorIndex uint64) error { +func validateIndexInCommittee(ctx context.Context, bs *stateTrie.BeaconState, a *ethpb.Attestation, validatorIndex uint64) error { ctx, span := trace.StartSpan(ctx, "sync.validateIndexInCommittee") defer span.End() - committee, err := helpers.BeaconCommitteeFromState(s, a.Data.Slot, a.Data.CommitteeIndex) + committee, err := helpers.BeaconCommitteeFromState(bs, a.Data.Slot, a.Data.CommitteeIndex) if err != nil { return err } @@ -211,11 +211,11 @@ func validateAggregateAttTime(attSlot uint64, genesisTime time.Time) error { // This validates selection proof by validating it's from the correct validator index of the slot and selection // proof is a valid signature. -func validateSelection(ctx context.Context, s *stateTrie.BeaconState, data *ethpb.AttestationData, validatorIndex uint64, proof []byte) error { +func validateSelection(ctx context.Context, bs *stateTrie.BeaconState, data *ethpb.AttestationData, validatorIndex uint64, proof []byte) error { _, span := trace.StartSpan(ctx, "sync.validateSelection") defer span.End() - committee, err := helpers.BeaconCommitteeFromState(s, data.Slot, data.CommitteeIndex) + committee, err := helpers.BeaconCommitteeFromState(bs, data.Slot, data.CommitteeIndex) if err != nil { return err } @@ -227,7 +227,7 @@ func validateSelection(ctx context.Context, s *stateTrie.BeaconState, data *ethp return fmt.Errorf("validator is not an aggregator for slot %d", data.Slot) } - domain, err := helpers.Domain(s.Fork(), helpers.SlotToEpoch(data.Slot), params.BeaconConfig().DomainSelectionProof, s.GenesisValidatorRoot()) + domain, err := helpers.Domain(bs.Fork(), helpers.SlotToEpoch(data.Slot), params.BeaconConfig().DomainSelectionProof, bs.GenesisValidatorRoot()) if err != nil { return err } @@ -235,7 +235,7 @@ func validateSelection(ctx context.Context, s *stateTrie.BeaconState, data *ethp if err != nil { return err } - pubkeyState := s.PubkeyAtIndex(validatorIndex) + pubkeyState := bs.PubkeyAtIndex(validatorIndex) pubKey, err := bls.PublicKeyFromBytes(pubkeyState[:]) if err != nil { return err diff --git a/beacon-chain/sync/validate_attester_slashing.go b/beacon-chain/sync/validate_attester_slashing.go index c93ddb0e5f..97591707da 100644 --- a/beacon-chain/sync/validate_attester_slashing.go +++ b/beacon-chain/sync/validate_attester_slashing.go @@ -18,22 +18,22 @@ import ( // Clients who receive an attester slashing on this topic MUST validate the conditions within VerifyAttesterSlashing before // forwarding it across the network. -func (r *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { +func (s *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // Validation runs on publish (not just subscriptions), so we should approve any message from // ourselves. - if pid == r.p2p.PeerID() { + if pid == s.p2p.PeerID() { return pubsub.ValidationAccept } // The head state will be too far away to validate any slashing. - if r.initialSync.Syncing() { + if s.initialSync.Syncing() { return pubsub.ValidationIgnore } ctx, span := trace.StartSpan(ctx, "sync.validateAttesterSlashing") defer span.End() - m, err := r.decodePubsubMessage(msg) + m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") traceutil.AnnotateError(span, err) @@ -47,29 +47,29 @@ func (r *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg if slashing == nil || slashing.Attestation_1 == nil || slashing.Attestation_2 == nil { return pubsub.ValidationReject } - if r.hasSeenAttesterSlashingIndices(slashing.Attestation_1.AttestingIndices, slashing.Attestation_2.AttestingIndices) { + if s.hasSeenAttesterSlashingIndices(slashing.Attestation_1.AttestingIndices, slashing.Attestation_2.AttestingIndices) { return pubsub.ValidationIgnore } // Retrieve head state, advance state to the epoch slot used specified in slashing message. - s, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return pubsub.ValidationIgnore } slashSlot := slashing.Attestation_1.Data.Target.Epoch * params.BeaconConfig().SlotsPerEpoch - if s.Slot() < slashSlot { + if headState.Slot() < slashSlot { if ctx.Err() != nil { return pubsub.ValidationIgnore } var err error - s, err = state.ProcessSlots(ctx, s, slashSlot) + headState, err = state.ProcessSlots(ctx, headState, slashSlot) if err != nil { return pubsub.ValidationIgnore } } - if err := blocks.VerifyAttesterSlashing(ctx, s, slashing); err != nil { + if err := blocks.VerifyAttesterSlashing(ctx, headState, slashing); err != nil { return pubsub.ValidationReject } @@ -78,9 +78,9 @@ func (r *Service) validateAttesterSlashing(ctx context.Context, pid peer.ID, msg } // Returns true if the node has already received a valid attester slashing with the attesting indices. -func (r *Service) hasSeenAttesterSlashingIndices(indices1 []uint64, indices2 []uint64) bool { - r.seenAttesterSlashingLock.RLock() - defer r.seenAttesterSlashingLock.RUnlock() +func (s *Service) hasSeenAttesterSlashingIndices(indices1 []uint64, indices2 []uint64) bool { + s.seenAttesterSlashingLock.RLock() + defer s.seenAttesterSlashingLock.RUnlock() slashableIndices := sliceutil.IntersectionUint64(indices1, indices2) sort.SliceStable(slashableIndices, func(i, j int) bool { @@ -92,14 +92,14 @@ func (r *Service) hasSeenAttesterSlashingIndices(indices1 []uint64, indices2 []u } b := hashutil.FastSum256(IndicesInBytes) - _, seen := r.seenAttesterSlashingCache.Get(b) + _, seen := s.seenAttesterSlashingCache.Get(b) return seen } // Set attester slashing indices in attester slashing cache. -func (r *Service) setAttesterSlashingIndicesSeen(indices1 []uint64, indices2 []uint64) { - r.seenAttesterSlashingLock.Lock() - defer r.seenAttesterSlashingLock.Unlock() +func (s *Service) setAttesterSlashingIndicesSeen(indices1 []uint64, indices2 []uint64) { + s.seenAttesterSlashingLock.Lock() + defer s.seenAttesterSlashingLock.Unlock() slashableIndices := sliceutil.IntersectionUint64(indices1, indices2) sort.SliceStable(slashableIndices, func(i, j int) bool { @@ -111,5 +111,5 @@ func (r *Service) setAttesterSlashingIndicesSeen(indices1 []uint64, indices2 []u } b := hashutil.FastSum256(IndicesInBytes) - r.seenAttesterSlashingCache.Add(b, true) + s.seenAttesterSlashingCache.Add(b, true) } diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 195485f105..436e386d68 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -24,30 +24,30 @@ import ( // validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature. // Blocks that have already been seen are ignored. If the BLS signature is any valid signature, // this method rebroadcasts the message. -func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { +func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // Validation runs on publish (not just subscriptions), so we should approve any message from // ourselves. - if pid == r.p2p.PeerID() { + if pid == s.p2p.PeerID() { return pubsub.ValidationAccept } // We should not attempt to process blocks until fully synced, but propagation is OK. - if r.initialSync.Syncing() { + if s.initialSync.Syncing() { return pubsub.ValidationIgnore } ctx, span := trace.StartSpan(ctx, "sync.validateBeaconBlockPubSub") defer span.End() - m, err := r.decodePubsubMessage(msg) + m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") traceutil.AnnotateError(span, err) return pubsub.ValidationReject } - r.validateBlockLock.Lock() - defer r.validateBlockLock.Unlock() + s.validateBlockLock.Lock() + defer s.validateBlockLock.Unlock() blk, ok := m.(*ethpb.SignedBeaconBlock) if !ok { @@ -60,7 +60,7 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms // Broadcast the block on a feed to notify other services in the beacon node // of a received block (even if it does not process correctly through a state transition). - r.blockNotifier.BlockFeed().Send(&feed.Event{ + s.blockNotifier.BlockFeed().Send(&feed.Event{ Type: blockfeed.ReceivedBlock, Data: &blockfeed.ReceivedBlockData{ SignedBlock: blk, @@ -68,7 +68,7 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms }) // Verify the block is the first block received for the proposer for the slot. - if r.hasSeenBlockIndexSlot(blk.Block.Slot, blk.Block.ProposerIndex) { + if s.hasSeenBlockIndexSlot(blk.Block.Slot, blk.Block.ProposerIndex) { return pubsub.ValidationIgnore } @@ -76,49 +76,49 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms if err != nil { return pubsub.ValidationIgnore } - if r.db.HasBlock(ctx, blockRoot) { + if s.db.HasBlock(ctx, blockRoot) { return pubsub.ValidationIgnore } - r.pendingQueueLock.RLock() - if r.seenPendingBlocks[blockRoot] { - r.pendingQueueLock.RUnlock() + s.pendingQueueLock.RLock() + if s.seenPendingBlocks[blockRoot] { + s.pendingQueueLock.RUnlock() return pubsub.ValidationIgnore } - r.pendingQueueLock.RUnlock() + s.pendingQueueLock.RUnlock() // Add metrics for block arrival time subtracts slot start time. - if captureArrivalTimeMetric(uint64(r.chain.GenesisTime().Unix()), blk.Block.Slot) != nil { + if captureArrivalTimeMetric(uint64(s.chain.GenesisTime().Unix()), blk.Block.Slot) != nil { return pubsub.ValidationIgnore } - if err := helpers.VerifySlotTime(uint64(r.chain.GenesisTime().Unix()), blk.Block.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { + if err := helpers.VerifySlotTime(uint64(s.chain.GenesisTime().Unix()), blk.Block.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Rejecting incoming block.") return pubsub.ValidationIgnore } - if helpers.StartSlot(r.chain.FinalizedCheckpt().Epoch) >= blk.Block.Slot { + if helpers.StartSlot(s.chain.FinalizedCheckpt().Epoch) >= blk.Block.Slot { log.Debug("Block slot older/equal than last finalized epoch start slot, rejecting it") return pubsub.ValidationIgnore } // Handle block when the parent is unknown. - if !r.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { - r.pendingQueueLock.Lock() - r.slotToPendingBlocks[blk.Block.Slot] = blk - r.seenPendingBlocks[blockRoot] = true - r.pendingQueueLock.Unlock() + if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { + s.pendingQueueLock.Lock() + s.slotToPendingBlocks[blk.Block.Slot] = blk + s.seenPendingBlocks[blockRoot] = true + s.pendingQueueLock.Unlock() return pubsub.ValidationIgnore } if featureconfig.Get().NewStateMgmt { - hasStateSummaryDB := r.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) - hasStateSummaryCache := r.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot)) + hasStateSummaryDB := s.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) + hasStateSummaryCache := s.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot)) if !hasStateSummaryDB && !hasStateSummaryCache { log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("No access to parent state") return pubsub.ValidationIgnore } - parentState, err := r.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) + parentState, err := s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) if err != nil { log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Could not get parent state") return pubsub.ValidationIgnore @@ -150,20 +150,20 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms } // Returns true if the block is not the first block proposed for the proposer for the slot. -func (r *Service) hasSeenBlockIndexSlot(slot uint64, proposerIdx uint64) bool { - r.seenBlockLock.RLock() - defer r.seenBlockLock.RUnlock() +func (s *Service) hasSeenBlockIndexSlot(slot uint64, proposerIdx uint64) bool { + s.seenBlockLock.RLock() + defer s.seenBlockLock.RUnlock() b := append(bytesutil.Bytes32(slot), bytesutil.Bytes32(proposerIdx)...) - _, seen := r.seenBlockCache.Get(string(b)) + _, seen := s.seenBlockCache.Get(string(b)) return seen } // Set block proposer index and slot as seen for incoming blocks. -func (r *Service) setSeenBlockIndexSlot(slot uint64, proposerIdx uint64) { - r.seenBlockLock.Lock() - defer r.seenBlockLock.Unlock() +func (s *Service) setSeenBlockIndexSlot(slot uint64, proposerIdx uint64) { + s.seenBlockLock.Lock() + defer s.seenBlockLock.Unlock() b := append(bytesutil.Bytes32(slot), bytesutil.Bytes32(proposerIdx)...) - r.seenBlockCache.Add(string(b), true) + s.seenBlockCache.Add(string(b), true) } // This captures metrics for block arrival time by subtracts slot start time. diff --git a/beacon-chain/sync/validate_proposer_slashing.go b/beacon-chain/sync/validate_proposer_slashing.go index e3c47c188d..29a6c7286d 100644 --- a/beacon-chain/sync/validate_proposer_slashing.go +++ b/beacon-chain/sync/validate_proposer_slashing.go @@ -14,22 +14,22 @@ import ( // Clients who receive a proposer slashing on this topic MUST validate the conditions within VerifyProposerSlashing before // forwarding it across the network. -func (r *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { +func (s *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // Validation runs on publish (not just subscriptions), so we should approve any message from // ourselves. - if pid == r.p2p.PeerID() { + if pid == s.p2p.PeerID() { return pubsub.ValidationAccept } // The head state will be too far away to validate any slashing. - if r.initialSync.Syncing() { + if s.initialSync.Syncing() { return pubsub.ValidationIgnore } ctx, span := trace.StartSpan(ctx, "sync.validateProposerSlashing") defer span.End() - m, err := r.decodePubsubMessage(msg) + m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") traceutil.AnnotateError(span, err) @@ -44,28 +44,28 @@ func (r *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg if slashing.Header_1 == nil || slashing.Header_1.Header == nil { return pubsub.ValidationReject } - if r.hasSeenProposerSlashingIndex(slashing.Header_1.Header.ProposerIndex) { + if s.hasSeenProposerSlashingIndex(slashing.Header_1.Header.ProposerIndex) { return pubsub.ValidationIgnore } // Retrieve head state, advance state to the epoch slot used specified in slashing message. - s, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return pubsub.ValidationIgnore } slashSlot := slashing.Header_1.Header.Slot - if s.Slot() < slashSlot { + if headState.Slot() < slashSlot { if ctx.Err() != nil { return pubsub.ValidationIgnore } var err error - s, err = state.ProcessSlots(ctx, s, slashSlot) + headState, err = state.ProcessSlots(ctx, headState, slashSlot) if err != nil { return pubsub.ValidationIgnore } } - if err := blocks.VerifyProposerSlashing(s, slashing); err != nil { + if err := blocks.VerifyProposerSlashing(headState, slashing); err != nil { return pubsub.ValidationReject } @@ -74,16 +74,16 @@ func (r *Service) validateProposerSlashing(ctx context.Context, pid peer.ID, msg } // Returns true if the node has already received a valid proposer slashing received for the proposer with index -func (r *Service) hasSeenProposerSlashingIndex(i uint64) bool { - r.seenProposerSlashingLock.RLock() - defer r.seenProposerSlashingLock.RUnlock() - _, seen := r.seenProposerSlashingCache.Get(i) +func (s *Service) hasSeenProposerSlashingIndex(i uint64) bool { + s.seenProposerSlashingLock.RLock() + defer s.seenProposerSlashingLock.RUnlock() + _, seen := s.seenProposerSlashingCache.Get(i) return seen } // Set proposer slashing index in proposer slashing cache. -func (r *Service) setProposerSlashingIndexSeen(i uint64) { - r.seenProposerSlashingLock.Lock() - defer r.seenProposerSlashingLock.Unlock() - r.seenProposerSlashingCache.Add(i, true) +func (s *Service) setProposerSlashingIndexSeen(i uint64) { + s.seenProposerSlashingLock.Lock() + defer s.seenProposerSlashingLock.Unlock() + s.seenProposerSlashingCache.Add(i, true) } diff --git a/beacon-chain/sync/validate_voluntary_exit.go b/beacon-chain/sync/validate_voluntary_exit.go index e49288388a..d400a97c38 100644 --- a/beacon-chain/sync/validate_voluntary_exit.go +++ b/beacon-chain/sync/validate_voluntary_exit.go @@ -14,22 +14,22 @@ import ( // Clients who receive a voluntary exit on this topic MUST validate the conditions within process_voluntary_exit before // forwarding it across the network. -func (r *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { +func (s *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // Validation runs on publish (not just subscriptions), so we should approve any message from // ourselves. - if pid == r.p2p.PeerID() { + if pid == s.p2p.PeerID() { return pubsub.ValidationAccept } // The head state will be too far away to validate any voluntary exit. - if r.initialSync.Syncing() { + if s.initialSync.Syncing() { return pubsub.ValidationIgnore } ctx, span := trace.StartSpan(ctx, "sync.validateVoluntaryExit") defer span.End() - m, err := r.decodePubsubMessage(msg) + m, err := s.decodePubsubMessage(msg) if err != nil { log.WithError(err).Error("Failed to decode message") traceutil.AnnotateError(span, err) @@ -44,24 +44,24 @@ func (r *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *p if exit.Exit == nil { return pubsub.ValidationReject } - if r.hasSeenExitIndex(exit.Exit.ValidatorIndex) { + if s.hasSeenExitIndex(exit.Exit.ValidatorIndex) { return pubsub.ValidationIgnore } - s, err := r.chain.HeadState(ctx) + headState, err := s.chain.HeadState(ctx) if err != nil { return pubsub.ValidationIgnore } exitedEpochSlot := exit.Exit.Epoch * params.BeaconConfig().SlotsPerEpoch - if int(exit.Exit.ValidatorIndex) >= s.NumValidators() { + if int(exit.Exit.ValidatorIndex) >= headState.NumValidators() { return pubsub.ValidationReject } - val, err := s.ValidatorAtIndexReadOnly(exit.Exit.ValidatorIndex) + val, err := headState.ValidatorAtIndexReadOnly(exit.Exit.ValidatorIndex) if err != nil { return pubsub.ValidationIgnore } - if err := blocks.VerifyExit(val, exitedEpochSlot, s.Fork(), exit, s.GenesisValidatorRoot()); err != nil { + if err := blocks.VerifyExit(val, exitedEpochSlot, headState.Fork(), exit, headState.GenesisValidatorRoot()); err != nil { return pubsub.ValidationReject } @@ -71,16 +71,16 @@ func (r *Service) validateVoluntaryExit(ctx context.Context, pid peer.ID, msg *p } // Returns true if the node has already received a valid exit request for the validator with index `i`. -func (r *Service) hasSeenExitIndex(i uint64) bool { - r.seenExitLock.RLock() - defer r.seenExitLock.RUnlock() - _, seen := r.seenExitCache.Get(i) +func (s *Service) hasSeenExitIndex(i uint64) bool { + s.seenExitLock.RLock() + defer s.seenExitLock.RUnlock() + _, seen := s.seenExitCache.Get(i) return seen } // Set exit request index `i` in seen exit request cache. -func (r *Service) setExitIndexSeen(i uint64) { - r.seenExitLock.Lock() - defer r.seenExitLock.Unlock() - r.seenExitCache.Add(i, true) +func (s *Service) setExitIndexSeen(i uint64) { + s.seenExitLock.Lock() + defer s.seenExitLock.Unlock() + s.seenExitCache.Add(i, true) }