Compare commits

..

3 Commits

Author SHA1 Message Date
Aarsh Shah
a0630aec02 final changes 2026-02-18 15:18:02 +04:00
Aarsh Shah
9c0fc75cd2 Merge branch 'feat/process-eager-partial-header' into feat/update-gossipsub-update-new-partial-API 2026-02-18 13:17:39 +04:00
Aarsh Shah
0f3f3124af final changes 2026-02-18 13:07:34 +04:00
4 changed files with 83 additions and 68 deletions

View File

@@ -196,12 +196,12 @@ var (
},
[]string{"topic"})
pubsubRPCPubSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gossipsub_topic_msg_sent_bytes",
Name: "gossipsub_pubsub_rpc_sent_pub_size_total",
Help: "The total size of publish messages sent via rpc for a particular topic",
},
[]string{"topic", "partial"})
[]string{"topic", "is_partial"})
pubsubMeshPeers = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gossipsub_mesh_peer_counts",
Name: "gossipsub_mesh_peers",
Help: "The number of capable peers in mesh",
},
[]string{"topic", "supports_partial"})

View File

@@ -94,22 +94,25 @@ const (
)
type request struct {
getBlobsCalled bool
kind requestKind
cellsValidated *cellsValidated
response chan error
getBlobsCalledGroup string
unsub unsubscribe
incomingRPC rpcWithFrom
gossipForPeer gossipForPeer
gossipForPeerResp chan gossipForPeerResponse
sub subscribe
publish publish
kind requestKind
cellsValidated *cellsValidated
response chan error
unsub unsubscribe
incomingRPC rpcWithFrom
sub subscribe
publish publish
getBlobsCalled getBlobsCalled
gossipForPeer gossipForPeer
}
type getBlobsCalled struct {
groupID string
}
type publish struct {
topic string
c blocks.PartialDataColumn
topic string
c blocks.PartialDataColumn
getBlobsCalled bool
}
type subscribe struct {
@@ -134,10 +137,11 @@ type cellsValidated struct {
}
type gossipForPeer struct {
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
gossipForPeerResp chan gossipForPeerResponse
}
type gossipForPeerResponse struct {
@@ -175,15 +179,19 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
p.incomingReq <- request{
kind: requestKindGossipForPeer,
gossipForPeer: gossipForPeer{
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
gossipForPeerResp: respCh,
},
gossipForPeerResp: respCh,
}
resp := <-respCh
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
select {
case resp := <-respCh:
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
case <-p.stop:
return peerState, nil, nil, errors.New("broadcaster stopped")
}
},
OnIncomingRPC: func(from peer.ID, peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
if rpc == nil {
@@ -200,7 +208,7 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
return nextPeerState, nil
return nextPeerState, errors.New("incomingReq channel is full, dropping RPC")
}
return nextPeerState, nil
},
@@ -273,14 +281,14 @@ func (p *PartialColumnBroadcaster) loop() {
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
req.response <- p.publish(req.publish.topic, req.publish.c, req.publish.getBlobsCalled)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindGossipForPeer:
nextPeerState, encodedMsg, partsMetadataToSend, err := p.handleGossipForPeer(req.gossipForPeer)
req.gossipForPeerResp <- gossipForPeerResponse{
req.gossipForPeer.gossipForPeerResp <- gossipForPeerResponse{
nextPeerState: nextPeerState,
encodedMsg: encodedMsg,
partsMetadataToSend: partsMetadataToSend,
@@ -297,7 +305,7 @@ func (p *PartialColumnBroadcaster) loop() {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindGetBlobsCalled:
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
p.handleGetBlobsCalled(req.getBlobsCalled.groupID)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -326,6 +334,7 @@ func (p *PartialColumnBroadcaster) handleGossipForPeer(req gossipForPeer) (parti
if !ok || partialColumn == nil {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
// we're not requesting a message here as this will be used to emit gossip. So, we pass requested message as false.
return partialColumn.ForPeer(req.remote, false, req.peerState)
}
@@ -356,7 +365,7 @@ func updatePeerStateFromIncomingRPC(peerState partialmessages.PeerState, rpc *pu
if err := message.UnmarshalSSZ(rpc.PartialMessage); err != nil {
return peerState, errors.Wrap(err, "failed to unmarshal partial message data")
}
if message.CellsPresentBitmap == nil {
if len(message.CellsPresentBitmap) == 0 {
return nextPeerState, nil
}
@@ -528,6 +537,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
peerMeta := rpcWithFrom.PartsMetadata
myMeta, err := ourDataColumn.PartsMetadata()
if err != nil {
@@ -539,12 +554,6 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
logger.Debug("republishing due to parts metadata difference")
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
@@ -622,8 +631,10 @@ func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
}
select {
case p.incomingReq <- request{
kind: requestKindGetBlobsCalled,
getBlobsCalledGroup: groupID,
kind: requestKindGetBlobsCalled,
getBlobsCalled: getBlobsCalled{
groupID: groupID,
},
}:
default:
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
@@ -638,32 +649,35 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
getBlobsCalled: getBlobsCalled,
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
topic: topic,
c: c,
getBlobsCalled: getBlobsCalled,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
groupId := string(c.GroupID())
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
var extended bool
existing := topicStore[string(c.GroupID())]
existing := topicStore[groupId]
if existing != nil {
var extended bool
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
if existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i]) {
extended = true
}
}
}
if extended {
@@ -675,15 +689,15 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
}
}
} else {
topicStore[string(c.GroupID())] = &c
topicStore[groupId] = &c
existing = &c
}
p.groupTTL[string(c.GroupID())] = TTLInSlots
p.groupTTL[groupId] = TTLInSlots
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
if err == nil {
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
p.getBlobsCalled[groupId] = getBlobsCalled
}
return err
}

View File

@@ -443,6 +443,7 @@ func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbr
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
return
}
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")

View File

@@ -25,7 +25,7 @@ type CellProofBundle struct {
}
// PartialDataColumn is a partially populated DataColumnSidecar used for
// exchanging cells and metadata with peers.
// exchanging cells with peers.
type PartialDataColumn struct {
*ethpb.DataColumnSidecar
root [fieldparams.RootLength]byte
@@ -44,6 +44,9 @@ func NewPartialDataColumn(
kzgCommitments [][]byte,
kzgInclusionProof [][]byte,
) (PartialDataColumn, error) {
if signedBlockHeader == nil {
return PartialDataColumn{}, errors.New("signedBlockHeader is nil")
}
root, err := signedBlockHeader.Header.HashTreeRoot()
if err != nil {
return PartialDataColumn{}, err
@@ -70,12 +73,6 @@ func NewPartialDataColumn(
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
eagerPushSent: make(map[peer.ID]struct{}),
}
for i := range len(c.KzgCommitments) {
if sidecar.Column[i] == nil {
continue
}
c.Included.SetBitAt(uint64(i), true)
}
return c, nil
}
@@ -85,7 +82,7 @@ func (p *PartialDataColumn) GroupID() []byte {
}
// ClearEagerPushSent resets the set of peers that have received the eager push
// header, allowing the header to be re-sent on the next ForPeer call.
// header.
func (p *PartialDataColumn) ClearEagerPushSent() {
p.eagerPushSent = make(map[peer.ID]struct{})
}
@@ -130,7 +127,8 @@ func marshalPartsMetadata(meta *ethpb.PartialDataColumnPartsMetadata) (partialme
return partialmessages.PartsMetadata(b), nil
}
// NKzgCommitments returns the number of commitments in the column.
// NKzgCommitments returns the number of commitments in the block header for this column which
// in turn will be equal to the number of cells in this column.
func (p *PartialDataColumn) NKzgCommitments() uint64 {
return p.Included.Len()
}
@@ -138,7 +136,7 @@ func (p *PartialDataColumn) NKzgCommitments() uint64 {
// ParsePartsMetadata SSZ-decodes bytes back to PartialDataColumnPartsMetadata.
func ParsePartsMetadata(pm partialmessages.PartsMetadata, expectedLength uint64) (*ethpb.PartialDataColumnPartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{}
if err := meta.UnmarshalSSZ([]byte(pm)); err != nil {
if err := meta.UnmarshalSSZ(pm); err != nil {
return nil, err
}
if meta.Available.Len() != expectedLength || meta.Requests.Len() != expectedLength {
@@ -208,7 +206,7 @@ func (p *PartialDataColumn) PartsMetadata() (partialmessages.PartsMetadata, erro
return marshalPartsMetadata(meta)
}
// MergeAvailableIntoPartsMetadata merges additional availability into base and
// MergeAvailableIntoPartsMetadata merges additional availabe cells into the base partsmetadata's available cells and
// returns the marshaled metadata.
func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata, additionalAvailable bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if base == nil {
@@ -230,8 +228,8 @@ func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata,
// merge available, keep request unchanged, if my parts are different, simply over write with myparts
func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmessages.PartsMetadata, cellsSent bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if receivedMeta == nil || len(receivedMeta) == 0 {
return nil, errors.New("recievedMeta is nil")
if len(receivedMeta) == 0 {
return nil, errors.New("receivedMeta is empty")
}
peerMeta, err := ParsePartsMetadata(receivedMeta, p.Included.Len())
if err != nil {
@@ -241,16 +239,17 @@ func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmess
}
// ForPeer implements partialmessages.Message.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
// Eager push - peer has never been seen (RecvdState nil) and message requested.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte,
partialmessages.PartsMetadata, error) {
// Eager push header - we don't know what the peer has and message has been requested.
// Only send the header once per peer.
if requestedMessage && peerState.RecvdState == nil {
if _, sent := p.eagerPushSent[remote]; !sent {
p.eagerPushSent[remote] = struct{}{}
encoded, err := p.eagerPushBytes()
if err != nil {
return peerState, nil, nil, err
}
p.eagerPushSent[remote] = struct{}{}
return peerState, encoded, nil, nil
}
return peerState, nil, nil, nil
@@ -263,6 +262,7 @@ func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerS
if peerState.SentState != nil {
var ok bool
sentMeta, ok = peerState.SentState.(partialmessages.PartsMetadata)
// should never happen but checking this for safety
if !ok {
return peerState, nil, nil, errors.New("SentState is not PartsMetadata")
}
@@ -388,7 +388,7 @@ func (p *PartialDataColumn) ExtendFromVerifiedCells(cellIndices []uint64, cells
return extended
}
// Complete returns a verified read-only column if all cells are present.
// Complete returns a verified read-only column if all cells are now present in this column.
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
return VerifiedRODataColumn{}, false