Compare commits

..

1 Commits

Author SHA1 Message Date
Aarsh Shah
f51ce3d699 rename metrics 2026-02-18 10:55:41 +04:00
4 changed files with 68 additions and 83 deletions

View File

@@ -196,12 +196,12 @@ var (
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Name: "gossipsub_topic_msg_sent_bytes",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "is_partial"})
[]string{"topic", "partial"})
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gossipsub_mesh_peers",
Name: "gossipsub_mesh_peer_counts",
Help: "The number of capable peers in mesh",
},
[]string{"topic", "supports_partial"})

View File

@@ -94,25 +94,22 @@ const (
)
type request struct {
kind requestKind
cellsValidated *cellsValidated
response chan error
unsub unsubscribe
incomingRPC rpcWithFrom
sub subscribe
publish publish
getBlobsCalled getBlobsCalled
gossipForPeer gossipForPeer
}
type getBlobsCalled struct {
groupID string
getBlobsCalled bool
kind requestKind
cellsValidated *cellsValidated
response chan error
getBlobsCalledGroup string
unsub unsubscribe
incomingRPC rpcWithFrom
gossipForPeer gossipForPeer
gossipForPeerResp chan gossipForPeerResponse
sub subscribe
publish publish
}
type publish struct {
topic string
c blocks.PartialDataColumn
getBlobsCalled bool
topic string
c blocks.PartialDataColumn
}
type subscribe struct {
@@ -137,11 +134,10 @@ type cellsValidated struct {
}
type gossipForPeer struct {
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
gossipForPeerResp chan gossipForPeerResponse
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
}
type gossipForPeerResponse struct {
@@ -179,19 +175,15 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
p.incomingReq <- request{
kind: requestKindGossipForPeer,
gossipForPeer: gossipForPeer{
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
gossipForPeerResp: respCh,
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
},
gossipForPeerResp: respCh,
}
select {
case resp := <-respCh:
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
case <-p.stop:
return peerState, nil, nil, errors.New("broadcaster stopped")
}
resp := <-respCh
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
},
OnIncomingRPC: func(from peer.ID, peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
if rpc == nil {
@@ -208,7 +200,7 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
return nextPeerState, errors.New("incomingReq channel is full, dropping RPC")
return nextPeerState, nil
}
return nextPeerState, nil
},
@@ -281,14 +273,14 @@ func (p *PartialColumnBroadcaster) loop() {
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c, req.publish.getBlobsCalled)
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindGossipForPeer:
nextPeerState, encodedMsg, partsMetadataToSend, err := p.handleGossipForPeer(req.gossipForPeer)
req.gossipForPeer.gossipForPeerResp <- gossipForPeerResponse{
req.gossipForPeerResp <- gossipForPeerResponse{
nextPeerState: nextPeerState,
encodedMsg: encodedMsg,
partsMetadataToSend: partsMetadataToSend,
@@ -305,7 +297,7 @@ func (p *PartialColumnBroadcaster) loop() {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindGetBlobsCalled:
p.handleGetBlobsCalled(req.getBlobsCalled.groupID)
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -334,7 +326,6 @@ func (p *PartialColumnBroadcaster) handleGossipForPeer(req gossipForPeer) (parti
if !ok || partialColumn == nil {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
// we're not requesting a message here as this will be used to emit gossip. So, we pass requested message as false.
return partialColumn.ForPeer(req.remote, false, req.peerState)
}
@@ -365,7 +356,7 @@ func updatePeerStateFromIncomingRPC(peerState partialmessages.PeerState, rpc *pu
if err := message.UnmarshalSSZ(rpc.PartialMessage); err != nil {
return peerState, errors.Wrap(err, "failed to unmarshal partial message data")
}
if len(message.CellsPresentBitmap) == 0 {
if message.CellsPresentBitmap == nil {
return nextPeerState, nil
}
@@ -537,12 +528,6 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
peerMeta := rpcWithFrom.PartsMetadata
myMeta, err := ourDataColumn.PartsMetadata()
if err != nil {
@@ -554,6 +539,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
logger.Debug("republishing due to parts metadata difference")
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
@@ -631,10 +622,8 @@ func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
}
select {
case p.incomingReq <- request{
kind: requestKindGetBlobsCalled,
getBlobsCalled: getBlobsCalled{
groupID: groupID,
},
kind: requestKindGetBlobsCalled,
getBlobsCalledGroup: groupID,
}:
default:
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
@@ -649,35 +638,32 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
kind: requestKindPublish,
response: respCh,
getBlobsCalled: getBlobsCalled,
publish: publish{
topic: topic,
c: c,
getBlobsCalled: getBlobsCalled,
topic: topic,
c: c,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
groupId := string(c.GroupID())
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
existing := topicStore[groupId]
var extended bool
existing := topicStore[string(c.GroupID())]
if existing != nil {
var extended bool
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
if existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i]) {
extended = true
}
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
}
}
if extended {
@@ -689,15 +675,15 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
}
}
} else {
topicStore[groupId] = &c
topicStore[string(c.GroupID())] = &c
existing = &c
}
p.groupTTL[groupId] = TTLInSlots
p.groupTTL[string(c.GroupID())] = TTLInSlots
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
if err == nil {
p.getBlobsCalled[groupId] = getBlobsCalled
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
}
return err
}

View File

@@ -443,7 +443,6 @@ func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbr
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
return
}
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")

View File

@@ -25,7 +25,7 @@ type CellProofBundle struct {
}
// PartialDataColumn is a partially populated DataColumnSidecar used for
// exchanging cells with peers.
// exchanging cells and metadata with peers.
type PartialDataColumn struct {
*ethpb.DataColumnSidecar
root [fieldparams.RootLength]byte
@@ -44,9 +44,6 @@ func NewPartialDataColumn(
kzgCommitments [][]byte,
kzgInclusionProof [][]byte,
) (PartialDataColumn, error) {
if signedBlockHeader == nil {
return PartialDataColumn{}, errors.New("signedBlockHeader is nil")
}
root, err := signedBlockHeader.Header.HashTreeRoot()
if err != nil {
return PartialDataColumn{}, err
@@ -73,6 +70,12 @@ func NewPartialDataColumn(
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
eagerPushSent: make(map[peer.ID]struct{}),
}
for i := range len(c.KzgCommitments) {
if sidecar.Column[i] == nil {
continue
}
c.Included.SetBitAt(uint64(i), true)
}
return c, nil
}
@@ -82,7 +85,7 @@ func (p *PartialDataColumn) GroupID() []byte {
}
// ClearEagerPushSent resets the set of peers that have received the eager push
// header.
// header, allowing the header to be re-sent on the next ForPeer call.
func (p *PartialDataColumn) ClearEagerPushSent() {
p.eagerPushSent = make(map[peer.ID]struct{})
}
@@ -127,8 +130,7 @@ func marshalPartsMetadata(meta *ethpb.PartialDataColumnPartsMetadata) (partialme
return partialmessages.PartsMetadata(b), nil
}
// NKzgCommitments returns the number of commitments in the block header for this column which
// in turn will be equal to the number of cells in this column.
// NKzgCommitments returns the number of commitments in the column.
func (p *PartialDataColumn) NKzgCommitments() uint64 {
return p.Included.Len()
}
@@ -136,7 +138,7 @@ func (p *PartialDataColumn) NKzgCommitments() uint64 {
// ParsePartsMetadata SSZ-decodes bytes back to PartialDataColumnPartsMetadata.
func ParsePartsMetadata(pm partialmessages.PartsMetadata, expectedLength uint64) (*ethpb.PartialDataColumnPartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{}
if err := meta.UnmarshalSSZ(pm); err != nil {
if err := meta.UnmarshalSSZ([]byte(pm)); err != nil {
return nil, err
}
if meta.Available.Len() != expectedLength || meta.Requests.Len() != expectedLength {
@@ -206,7 +208,7 @@ func (p *PartialDataColumn) PartsMetadata() (partialmessages.PartsMetadata, erro
return marshalPartsMetadata(meta)
}
// MergeAvailableIntoPartsMetadata merges additional availabe cells into the base partsmetadata's available cells and
// MergeAvailableIntoPartsMetadata merges additional availability into base and
// returns the marshaled metadata.
func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata, additionalAvailable bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if base == nil {
@@ -228,8 +230,8 @@ func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata,
// merge available, keep request unchanged, if my parts are different, simply over write with myparts
func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmessages.PartsMetadata, cellsSent bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if len(receivedMeta) == 0 {
return nil, errors.New("receivedMeta is empty")
if receivedMeta == nil || len(receivedMeta) == 0 {
return nil, errors.New("recievedMeta is nil")
}
peerMeta, err := ParsePartsMetadata(receivedMeta, p.Included.Len())
if err != nil {
@@ -239,17 +241,16 @@ func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmess
}
// ForPeer implements partialmessages.Message.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte,
partialmessages.PartsMetadata, error) {
// Eager push header - we don't know what the peer has and message has been requested.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
// Eager push - peer has never been seen (RecvdState nil) and message requested.
// Only send the header once per peer.
if requestedMessage && peerState.RecvdState == nil {
if _, sent := p.eagerPushSent[remote]; !sent {
p.eagerPushSent[remote] = struct{}{}
encoded, err := p.eagerPushBytes()
if err != nil {
return peerState, nil, nil, err
}
p.eagerPushSent[remote] = struct{}{}
return peerState, encoded, nil, nil
}
return peerState, nil, nil, nil
@@ -262,7 +263,6 @@ func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerS
if peerState.SentState != nil {
var ok bool
sentMeta, ok = peerState.SentState.(partialmessages.PartsMetadata)
// should never happen but checking this for safety
if !ok {
return peerState, nil, nil, errors.New("SentState is not PartsMetadata")
}
@@ -388,7 +388,7 @@ func (p *PartialDataColumn) ExtendFromVerifiedCells(cellIndices []uint64, cells
return extended
}
// Complete returns a verified read-only column if all cells are now present in this column.
// Complete returns a verified read-only column if all cells are present.
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
return VerifiedRODataColumn{}, false