mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-18 08:51:30 -05:00
Compare commits
1 Commits
feat/updat
...
metrics-fo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f51ce3d699 |
@@ -196,12 +196,12 @@ var (
|
||||
},
|
||||
[]string{"topic"})
|
||||
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
|
||||
Name: "gossipsub_topic_msg_sent_bytes",
|
||||
Help: "The total size of publish messages sent via rpc for a particular topic",
|
||||
},
|
||||
[]string{"topic", "is_partial"})
|
||||
[]string{"topic", "partial"})
|
||||
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "gossipsub_mesh_peers",
|
||||
Name: "gossipsub_mesh_peer_counts",
|
||||
Help: "The number of capable peers in mesh",
|
||||
},
|
||||
[]string{"topic", "supports_partial"})
|
||||
|
||||
@@ -94,25 +94,22 @@ const (
|
||||
)
|
||||
|
||||
type request struct {
|
||||
kind requestKind
|
||||
cellsValidated *cellsValidated
|
||||
response chan error
|
||||
unsub unsubscribe
|
||||
incomingRPC rpcWithFrom
|
||||
sub subscribe
|
||||
publish publish
|
||||
getBlobsCalled getBlobsCalled
|
||||
gossipForPeer gossipForPeer
|
||||
}
|
||||
|
||||
type getBlobsCalled struct {
|
||||
groupID string
|
||||
getBlobsCalled bool
|
||||
kind requestKind
|
||||
cellsValidated *cellsValidated
|
||||
response chan error
|
||||
getBlobsCalledGroup string
|
||||
unsub unsubscribe
|
||||
incomingRPC rpcWithFrom
|
||||
gossipForPeer gossipForPeer
|
||||
gossipForPeerResp chan gossipForPeerResponse
|
||||
sub subscribe
|
||||
publish publish
|
||||
}
|
||||
|
||||
type publish struct {
|
||||
topic string
|
||||
c blocks.PartialDataColumn
|
||||
getBlobsCalled bool
|
||||
topic string
|
||||
c blocks.PartialDataColumn
|
||||
}
|
||||
|
||||
type subscribe struct {
|
||||
@@ -137,11 +134,10 @@ type cellsValidated struct {
|
||||
}
|
||||
|
||||
type gossipForPeer struct {
|
||||
topic string
|
||||
groupID string
|
||||
remote peer.ID
|
||||
peerState partialmessages.PeerState
|
||||
gossipForPeerResp chan gossipForPeerResponse
|
||||
topic string
|
||||
groupID string
|
||||
remote peer.ID
|
||||
peerState partialmessages.PeerState
|
||||
}
|
||||
|
||||
type gossipForPeerResponse struct {
|
||||
@@ -179,19 +175,15 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindGossipForPeer,
|
||||
gossipForPeer: gossipForPeer{
|
||||
topic: topic,
|
||||
groupID: groupID,
|
||||
remote: remote,
|
||||
peerState: peerState,
|
||||
gossipForPeerResp: respCh,
|
||||
topic: topic,
|
||||
groupID: groupID,
|
||||
remote: remote,
|
||||
peerState: peerState,
|
||||
},
|
||||
gossipForPeerResp: respCh,
|
||||
}
|
||||
select {
|
||||
case resp := <-respCh:
|
||||
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
|
||||
case <-p.stop:
|
||||
return peerState, nil, nil, errors.New("broadcaster stopped")
|
||||
}
|
||||
resp := <-respCh
|
||||
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
|
||||
},
|
||||
OnIncomingRPC: func(from peer.ID, peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
|
||||
if rpc == nil {
|
||||
@@ -208,7 +200,7 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
}:
|
||||
default:
|
||||
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
|
||||
return nextPeerState, errors.New("incomingReq channel is full, dropping RPC")
|
||||
return nextPeerState, nil
|
||||
}
|
||||
return nextPeerState, nil
|
||||
},
|
||||
@@ -281,14 +273,14 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
case req := <-p.incomingReq:
|
||||
switch req.kind {
|
||||
case requestKindPublish:
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c, req.publish.getBlobsCalled)
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
|
||||
case requestKindSubscribe:
|
||||
req.response <- p.subscribe(req.sub.t)
|
||||
case requestKindUnsubscribe:
|
||||
req.response <- p.unsubscribe(req.unsub.topic)
|
||||
case requestKindGossipForPeer:
|
||||
nextPeerState, encodedMsg, partsMetadataToSend, err := p.handleGossipForPeer(req.gossipForPeer)
|
||||
req.gossipForPeer.gossipForPeerResp <- gossipForPeerResponse{
|
||||
req.gossipForPeerResp <- gossipForPeerResponse{
|
||||
nextPeerState: nextPeerState,
|
||||
encodedMsg: encodedMsg,
|
||||
partsMetadataToSend: partsMetadataToSend,
|
||||
@@ -305,7 +297,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
p.logger.Error("Failed to handle cells validated", "err", err)
|
||||
}
|
||||
case requestKindGetBlobsCalled:
|
||||
p.handleGetBlobsCalled(req.getBlobsCalled.groupID)
|
||||
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
|
||||
default:
|
||||
p.logger.Error("Unknown request kind", "kind", req.kind)
|
||||
}
|
||||
@@ -334,7 +326,6 @@ func (p *PartialColumnBroadcaster) handleGossipForPeer(req gossipForPeer) (parti
|
||||
if !ok || partialColumn == nil {
|
||||
return req.peerState, nil, nil, errors.New("not tracking topic for group")
|
||||
}
|
||||
// we're not requesting a message here as this will be used to emit gossip. So, we pass requested message as false.
|
||||
return partialColumn.ForPeer(req.remote, false, req.peerState)
|
||||
}
|
||||
|
||||
@@ -365,7 +356,7 @@ func updatePeerStateFromIncomingRPC(peerState partialmessages.PeerState, rpc *pu
|
||||
if err := message.UnmarshalSSZ(rpc.PartialMessage); err != nil {
|
||||
return peerState, errors.Wrap(err, "failed to unmarshal partial message data")
|
||||
}
|
||||
if len(message.CellsPresentBitmap) == 0 {
|
||||
if message.CellsPresentBitmap == nil {
|
||||
return nextPeerState, nil
|
||||
}
|
||||
|
||||
@@ -537,12 +528,6 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
}
|
||||
}
|
||||
|
||||
getBlobsCalled := p.getBlobsCalled[string(groupID)]
|
||||
if !getBlobsCalled {
|
||||
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
|
||||
return nil
|
||||
}
|
||||
|
||||
peerMeta := rpcWithFrom.PartsMetadata
|
||||
myMeta, err := ourDataColumn.PartsMetadata()
|
||||
if err != nil {
|
||||
@@ -554,6 +539,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
logger.Debug("republishing due to parts metadata difference")
|
||||
}
|
||||
|
||||
getBlobsCalled := p.getBlobsCalled[string(groupID)]
|
||||
if !getBlobsCalled {
|
||||
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRepublish {
|
||||
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
|
||||
if err != nil {
|
||||
@@ -631,10 +622,8 @@ func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
|
||||
}
|
||||
select {
|
||||
case p.incomingReq <- request{
|
||||
kind: requestKindGetBlobsCalled,
|
||||
getBlobsCalled: getBlobsCalled{
|
||||
groupID: groupID,
|
||||
},
|
||||
kind: requestKindGetBlobsCalled,
|
||||
getBlobsCalledGroup: groupID,
|
||||
}:
|
||||
default:
|
||||
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
|
||||
@@ -649,35 +638,32 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
|
||||
}
|
||||
respCh := make(chan error)
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindPublish,
|
||||
response: respCh,
|
||||
kind: requestKindPublish,
|
||||
response: respCh,
|
||||
getBlobsCalled: getBlobsCalled,
|
||||
publish: publish{
|
||||
topic: topic,
|
||||
c: c,
|
||||
getBlobsCalled: getBlobsCalled,
|
||||
topic: topic,
|
||||
c: c,
|
||||
},
|
||||
}
|
||||
return <-respCh
|
||||
}
|
||||
|
||||
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
|
||||
groupId := string(c.GroupID())
|
||||
topicStore, ok := p.partialMsgStore[topic]
|
||||
if !ok {
|
||||
topicStore = make(map[string]*blocks.PartialDataColumn)
|
||||
p.partialMsgStore[topic] = topicStore
|
||||
}
|
||||
|
||||
existing := topicStore[groupId]
|
||||
var extended bool
|
||||
existing := topicStore[string(c.GroupID())]
|
||||
if existing != nil {
|
||||
var extended bool
|
||||
// Extend the existing column with cells being published here.
|
||||
// The existing column may already contain cells received from peers. We must not overwrite it.
|
||||
for i := range c.Included.Len() {
|
||||
if c.Included.BitAt(i) {
|
||||
if existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i]) {
|
||||
extended = true
|
||||
}
|
||||
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
|
||||
}
|
||||
}
|
||||
if extended {
|
||||
@@ -689,15 +675,15 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
|
||||
}
|
||||
}
|
||||
} else {
|
||||
topicStore[groupId] = &c
|
||||
topicStore[string(c.GroupID())] = &c
|
||||
existing = &c
|
||||
}
|
||||
|
||||
p.groupTTL[groupId] = TTLInSlots
|
||||
p.groupTTL[string(c.GroupID())] = TTLInSlots
|
||||
|
||||
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
|
||||
if err == nil {
|
||||
p.getBlobsCalled[groupId] = getBlobsCalled
|
||||
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -443,7 +443,6 @@ func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbr
|
||||
err := s.processDataColumnSidecarsFromExecution(ctx, source)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to process partial data column header")
|
||||
return
|
||||
}
|
||||
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
|
||||
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")
|
||||
|
||||
@@ -25,7 +25,7 @@ type CellProofBundle struct {
|
||||
}
|
||||
|
||||
// PartialDataColumn is a partially populated DataColumnSidecar used for
|
||||
// exchanging cells with peers.
|
||||
// exchanging cells and metadata with peers.
|
||||
type PartialDataColumn struct {
|
||||
*ethpb.DataColumnSidecar
|
||||
root [fieldparams.RootLength]byte
|
||||
@@ -44,9 +44,6 @@ func NewPartialDataColumn(
|
||||
kzgCommitments [][]byte,
|
||||
kzgInclusionProof [][]byte,
|
||||
) (PartialDataColumn, error) {
|
||||
if signedBlockHeader == nil {
|
||||
return PartialDataColumn{}, errors.New("signedBlockHeader is nil")
|
||||
}
|
||||
root, err := signedBlockHeader.Header.HashTreeRoot()
|
||||
if err != nil {
|
||||
return PartialDataColumn{}, err
|
||||
@@ -73,6 +70,12 @@ func NewPartialDataColumn(
|
||||
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
|
||||
eagerPushSent: make(map[peer.ID]struct{}),
|
||||
}
|
||||
for i := range len(c.KzgCommitments) {
|
||||
if sidecar.Column[i] == nil {
|
||||
continue
|
||||
}
|
||||
c.Included.SetBitAt(uint64(i), true)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -82,7 +85,7 @@ func (p *PartialDataColumn) GroupID() []byte {
|
||||
}
|
||||
|
||||
// ClearEagerPushSent resets the set of peers that have received the eager push
|
||||
// header.
|
||||
// header, allowing the header to be re-sent on the next ForPeer call.
|
||||
func (p *PartialDataColumn) ClearEagerPushSent() {
|
||||
p.eagerPushSent = make(map[peer.ID]struct{})
|
||||
}
|
||||
@@ -127,8 +130,7 @@ func marshalPartsMetadata(meta *ethpb.PartialDataColumnPartsMetadata) (partialme
|
||||
return partialmessages.PartsMetadata(b), nil
|
||||
}
|
||||
|
||||
// NKzgCommitments returns the number of commitments in the block header for this column which
|
||||
// in turn will be equal to the number of cells in this column.
|
||||
// NKzgCommitments returns the number of commitments in the column.
|
||||
func (p *PartialDataColumn) NKzgCommitments() uint64 {
|
||||
return p.Included.Len()
|
||||
}
|
||||
@@ -136,7 +138,7 @@ func (p *PartialDataColumn) NKzgCommitments() uint64 {
|
||||
// ParsePartsMetadata SSZ-decodes bytes back to PartialDataColumnPartsMetadata.
|
||||
func ParsePartsMetadata(pm partialmessages.PartsMetadata, expectedLength uint64) (*ethpb.PartialDataColumnPartsMetadata, error) {
|
||||
meta := ðpb.PartialDataColumnPartsMetadata{}
|
||||
if err := meta.UnmarshalSSZ(pm); err != nil {
|
||||
if err := meta.UnmarshalSSZ([]byte(pm)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.Available.Len() != expectedLength || meta.Requests.Len() != expectedLength {
|
||||
@@ -206,7 +208,7 @@ func (p *PartialDataColumn) PartsMetadata() (partialmessages.PartsMetadata, erro
|
||||
return marshalPartsMetadata(meta)
|
||||
}
|
||||
|
||||
// MergeAvailableIntoPartsMetadata merges additional availabe cells into the base partsmetadata's available cells and
|
||||
// MergeAvailableIntoPartsMetadata merges additional availability into base and
|
||||
// returns the marshaled metadata.
|
||||
func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata, additionalAvailable bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
|
||||
if base == nil {
|
||||
@@ -228,8 +230,8 @@ func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata,
|
||||
|
||||
// merge available, keep request unchanged, if my parts are different, simply over write with myparts
|
||||
func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmessages.PartsMetadata, cellsSent bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
|
||||
if len(receivedMeta) == 0 {
|
||||
return nil, errors.New("receivedMeta is empty")
|
||||
if receivedMeta == nil || len(receivedMeta) == 0 {
|
||||
return nil, errors.New("recievedMeta is nil")
|
||||
}
|
||||
peerMeta, err := ParsePartsMetadata(receivedMeta, p.Included.Len())
|
||||
if err != nil {
|
||||
@@ -239,17 +241,16 @@ func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmess
|
||||
}
|
||||
|
||||
// ForPeer implements partialmessages.Message.
|
||||
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte,
|
||||
partialmessages.PartsMetadata, error) {
|
||||
// Eager push header - we don't know what the peer has and message has been requested.
|
||||
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
|
||||
// Eager push - peer has never been seen (RecvdState nil) and message requested.
|
||||
// Only send the header once per peer.
|
||||
if requestedMessage && peerState.RecvdState == nil {
|
||||
if _, sent := p.eagerPushSent[remote]; !sent {
|
||||
p.eagerPushSent[remote] = struct{}{}
|
||||
encoded, err := p.eagerPushBytes()
|
||||
if err != nil {
|
||||
return peerState, nil, nil, err
|
||||
}
|
||||
p.eagerPushSent[remote] = struct{}{}
|
||||
return peerState, encoded, nil, nil
|
||||
}
|
||||
return peerState, nil, nil, nil
|
||||
@@ -262,7 +263,6 @@ func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerS
|
||||
if peerState.SentState != nil {
|
||||
var ok bool
|
||||
sentMeta, ok = peerState.SentState.(partialmessages.PartsMetadata)
|
||||
// should never happen but checking this for safety
|
||||
if !ok {
|
||||
return peerState, nil, nil, errors.New("SentState is not PartsMetadata")
|
||||
}
|
||||
@@ -388,7 +388,7 @@ func (p *PartialDataColumn) ExtendFromVerifiedCells(cellIndices []uint64, cells
|
||||
return extended
|
||||
}
|
||||
|
||||
// Complete returns a verified read-only column if all cells are now present in this column.
|
||||
// Complete returns a verified read-only column if all cells are present.
|
||||
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
|
||||
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
|
||||
return VerifiedRODataColumn{}, false
|
||||
|
||||
Reference in New Issue
Block a user