From 96c7c03b1696186a315d135d990adea081baf0fd Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 5 Jan 2026 12:31:59 +0530 Subject: [PATCH] remove batchOrBroadcast method --- beacon-chain/p2p/broadcaster.go | 67 ++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index d03bf1b38a..d15c5b69c4 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -511,47 +511,68 @@ func (s *Service) findPeersIfNeeded( return nil } -// method to broadcast messages to other peers in our gossip mesh. +// encodeGossipMessage encodes an object for gossip transmission. +// It returns the encoded bytes and the full topic with protocol suffix. +func (s *Service) encodeGossipMessage(obj ssz.Marshaler, topic string) ([]byte, string, error) { + buf := new(bytes.Buffer) + if _, err := s.Encoding().EncodeGossip(buf, obj); err != nil { + return nil, "", fmt.Errorf("could not encode message: %w", err) + } + return buf.Bytes(), topic + s.Encoding().ProtocolSuffix(), nil +} + +// broadcastObject broadcasts a message to other peers in our gossip mesh. func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error { - return s.broadcastOrBatchObject(ctx, nil, obj, topic) -} - -// method to add an object to a message batch for a future broadcast. -func (s *Service) batchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error { - return s.broadcastOrBatchObject(ctx, batch, obj, topic) -} - -// method to broadcast or batch messages to other peers in our gossip mesh. If -// batch is non-nil the message is added to the batch WITHOUT publishing. The -// caller MUST publish the batch after all messages have been added to the batch -func (s *Service) broadcastOrBatchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error { ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject") defer span.End() span.SetAttributes(trace.StringAttribute("topic", topic)) - buf := new(bytes.Buffer) - if _, err := s.Encoding().EncodeGossip(buf, obj); err != nil { - err := errors.Wrap(err, "could not encode message") + data, fullTopic, err := s.encodeGossipMessage(obj, topic) + if err != nil { tracing.AnnotateError(span, err) return err } if span.IsRecording() { - id := hash.FastSum64(buf.Bytes()) - messageLen := int64(buf.Len()) + id := hash.FastSum64(data) + messageLen := int64(len(data)) // lint:ignore uintcast -- It's safe to do this for tracing. iid := int64(id) span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/) } - var err error - if batch != nil { - err = s.addToBatch(ctx, batch, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()) - } else { - err = s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()) + if err := s.PublishToTopic(ctx, fullTopic, data); err != nil { + err := errors.Wrap(err, "could not publish message") + tracing.AnnotateError(span, err) + return err } + return nil +} + +// batchObject adds an object to a message batch for a future broadcast. +// The caller MUST publish the batch after all messages have been added. +func (s *Service) batchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error { + ctx, span := trace.StartSpan(ctx, "p2p.batchObject") + defer span.End() + + span.SetAttributes(trace.StringAttribute("topic", topic)) + + data, fullTopic, err := s.encodeGossipMessage(obj, topic) if err != nil { + tracing.AnnotateError(span, err) + return err + } + + if span.IsRecording() { + id := hash.FastSum64(data) + messageLen := int64(len(data)) + // lint:ignore uintcast -- It's safe to do this for tracing. + iid := int64(id) + span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/) + } + + if err := s.addToBatch(ctx, batch, fullTopic, data); err != nil { err := errors.Wrap(err, "could not publish message") tracing.AnnotateError(span, err) return err