mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-04-19 03:01:06 -04:00
Merge branch 'feat/process-eager-partial-header' into feat/update-gossipsub-update-new-partial-API
This commit is contained in:
@@ -94,22 +94,27 @@ const (
|
||||
)
|
||||
|
||||
type request struct {
|
||||
getBlobsCalled bool
|
||||
kind requestKind
|
||||
cellsValidated *cellsValidated
|
||||
response chan error
|
||||
getBlobsCalledGroup string
|
||||
unsub unsubscribe
|
||||
incomingRPC rpcWithFrom
|
||||
gossipForPeer gossipForPeer
|
||||
gossipForPeerResp chan gossipForPeerResponse
|
||||
sub subscribe
|
||||
publish publish
|
||||
kind requestKind
|
||||
cellsValidated *cellsValidated
|
||||
response chan error
|
||||
unsub unsubscribe
|
||||
incomingRPC rpcWithFrom
|
||||
sub subscribe
|
||||
publish publish
|
||||
getBlobsCalled getBlobsCalled
|
||||
gossipForPeer gossipForPeer
|
||||
gossipForPeerResp chan gossipForPeerResponse
|
||||
}
|
||||
|
||||
type getBlobsCalled struct {
|
||||
groupID string
|
||||
}
|
||||
|
||||
type publish struct {
|
||||
topic string
|
||||
c blocks.PartialDataColumn
|
||||
topic string
|
||||
c blocks.PartialDataColumn
|
||||
getBlobsCalled bool
|
||||
}
|
||||
|
||||
type subscribe struct {
|
||||
@@ -273,7 +278,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
case req := <-p.incomingReq:
|
||||
switch req.kind {
|
||||
case requestKindPublish:
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c, req.publish.getBlobsCalled)
|
||||
case requestKindSubscribe:
|
||||
req.response <- p.subscribe(req.sub.t)
|
||||
case requestKindUnsubscribe:
|
||||
@@ -297,7 +302,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
p.logger.Error("Failed to handle cells validated", "err", err)
|
||||
}
|
||||
case requestKindGetBlobsCalled:
|
||||
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
|
||||
p.handleGetBlobsCalled(req.getBlobsCalled.groupID)
|
||||
default:
|
||||
p.logger.Error("Unknown request kind", "kind", req.kind)
|
||||
}
|
||||
@@ -622,8 +627,10 @@ func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
|
||||
}
|
||||
select {
|
||||
case p.incomingReq <- request{
|
||||
kind: requestKindGetBlobsCalled,
|
||||
getBlobsCalledGroup: groupID,
|
||||
kind: requestKindGetBlobsCalled,
|
||||
getBlobsCalled: getBlobsCalled{
|
||||
groupID: groupID,
|
||||
},
|
||||
}:
|
||||
default:
|
||||
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
|
||||
@@ -638,32 +645,35 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
|
||||
}
|
||||
respCh := make(chan error)
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindPublish,
|
||||
response: respCh,
|
||||
getBlobsCalled: getBlobsCalled,
|
||||
kind: requestKindPublish,
|
||||
response: respCh,
|
||||
publish: publish{
|
||||
topic: topic,
|
||||
c: c,
|
||||
topic: topic,
|
||||
c: c,
|
||||
getBlobsCalled: getBlobsCalled,
|
||||
},
|
||||
}
|
||||
return <-respCh
|
||||
}
|
||||
|
||||
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
|
||||
groupId := string(c.GroupID())
|
||||
topicStore, ok := p.partialMsgStore[topic]
|
||||
if !ok {
|
||||
topicStore = make(map[string]*blocks.PartialDataColumn)
|
||||
p.partialMsgStore[topic] = topicStore
|
||||
}
|
||||
|
||||
var extended bool
|
||||
existing := topicStore[string(c.GroupID())]
|
||||
existing := topicStore[groupId]
|
||||
if existing != nil {
|
||||
var extended bool
|
||||
// Extend the existing column with cells being published here.
|
||||
// The existing column may already contain cells received from peers. We must not overwrite it.
|
||||
for i := range c.Included.Len() {
|
||||
if c.Included.BitAt(i) {
|
||||
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
|
||||
if existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i]) {
|
||||
extended = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if extended {
|
||||
@@ -675,15 +685,15 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
|
||||
}
|
||||
}
|
||||
} else {
|
||||
topicStore[string(c.GroupID())] = &c
|
||||
topicStore[groupId] = &c
|
||||
existing = &c
|
||||
}
|
||||
|
||||
p.groupTTL[string(c.GroupID())] = TTLInSlots
|
||||
p.groupTTL[groupId] = TTLInSlots
|
||||
|
||||
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
|
||||
if err == nil {
|
||||
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
|
||||
p.getBlobsCalled[groupId] = getBlobsCalled
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -443,6 +443,7 @@ func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbr
|
||||
err := s.processDataColumnSidecarsFromExecution(ctx, source)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to process partial data column header")
|
||||
return
|
||||
}
|
||||
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
|
||||
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")
|
||||
|
||||
Reference in New Issue
Block a user