Compare commits

...

1 Commits

Author SHA1 Message Date
Aarsh Shah
0f3f3124af final changes 2026-02-18 13:07:34 +04:00
2 changed files with 36 additions and 26 deletions

View File

@@ -94,20 +94,24 @@ const (
)
type request struct {
getBlobsCalled bool
kind requestKind
cellsValidated *cellsValidated
response chan error
getBlobsCalledGroup string
unsub unsubscribe
incomingRPC rpcWithFrom
sub subscribe
publish publish
kind requestKind
cellsValidated *cellsValidated
response chan error
unsub unsubscribe
incomingRPC rpcWithFrom
sub subscribe
publish publish
getBlobsCalled getBlobsCalled
}
type getBlobsCalled struct {
groupID string
}
type publish struct {
topic string
c blocks.PartialDataColumn
topic string
c blocks.PartialDataColumn
getBlobsCalled bool
}
type subscribe struct {
@@ -246,7 +250,7 @@ func (p *PartialColumnBroadcaster) loop() {
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
req.response <- p.publish(req.publish.topic, req.publish.c, req.publish.getBlobsCalled)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
@@ -262,7 +266,7 @@ func (p *PartialColumnBroadcaster) loop() {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindGetBlobsCalled:
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
p.handleGetBlobsCalled(req.getBlobsCalled.groupID)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -515,8 +519,10 @@ func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
}
select {
case p.incomingReq <- request{
kind: requestKindGetBlobsCalled,
getBlobsCalledGroup: groupID,
kind: requestKindGetBlobsCalled,
getBlobsCalled: getBlobsCalled{
groupID: groupID,
},
}:
default:
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
@@ -531,32 +537,35 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
getBlobsCalled: getBlobsCalled,
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
topic: topic,
c: c,
getBlobsCalled: getBlobsCalled,
},
}
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
groupId := string(c.GroupID())
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
var extended bool
existing := topicStore[string(c.GroupID())]
existing := topicStore[groupId]
if existing != nil {
var extended bool
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
if existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i]) {
extended = true
}
}
}
if extended {
@@ -568,15 +577,15 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
}
}
} else {
topicStore[string(c.GroupID())] = &c
topicStore[groupId] = &c
existing = &c
}
p.groupTTL[string(c.GroupID())] = TTLInSlots
p.groupTTL[groupId] = TTLInSlots
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
if err == nil {
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
p.getBlobsCalled[groupId] = getBlobsCalled
}
return err
}

View File

@@ -443,6 +443,7 @@ func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbr
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
return
}
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")