diff --git a/beacon-chain/attestation/BUILD.bazel b/beacon-chain/attestation/BUILD.bazel index 048a60dd40..8788329fdc 100644 --- a/beacon-chain/attestation/BUILD.bazel +++ b/beacon-chain/attestation/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//shared/bytesutil:go_default_library", "//shared/event:go_default_library", "//shared/hashutil:go_default_library", + "//shared/messagehandler:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], diff --git a/beacon-chain/attestation/service.go b/beacon-chain/attestation/service.go index 4fcf4cd453..6eb0117443 100644 --- a/beacon-chain/attestation/service.go +++ b/beacon-chain/attestation/service.go @@ -5,6 +5,8 @@ import ( "context" "fmt" + handler "github.com/prysmaticlabs/prysm/shared/messagehandler" + "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -140,22 +142,27 @@ func (a *Service) attestationPool() { return // Listen for a newly received incoming attestation from the sync service. case attestation := <-a.incomingChan: - enc, err := proto.Marshal(attestation) - if err != nil { - log.Errorf("Could not marshal incoming attestation to bytes: %v", err) - continue - } - h := hashutil.Hash(enc) - - if err := a.updateLatestAttestation(a.ctx, attestation); err != nil { - log.Errorf("Could not update attestation pool: %v", err) - continue - } - log.Infof("Updated attestation pool for attestation %#x", h) + handler.SafelyHandleMessage(a.ctx, a.handleAttestation, attestation) } } } +func (a *Service) handleAttestation(msg proto.Message) { + attestation := msg.(*pb.Attestation) + enc, err := proto.Marshal(attestation) + if err != nil { + log.Errorf("Could not marshal incoming attestation to bytes: %v", err) + return + } + h := hashutil.Hash(enc) + + if err := a.updateLatestAttestation(a.ctx, attestation); err != nil { + log.Errorf("Could not update attestation pool: %v", err) + return + } + log.Infof("Updated attestation pool for attestation %#x", h) +} + // updateLatestAttestation inputs an new attestation and checks whether // the attesters who submitted this attestation with the higher slot number // have been noted in the attestation pool. If not, it updates the diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 95956f9f84..5942f87dd7 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -20,9 +20,11 @@ go_library( "//shared/bytesutil:go_default_library", "//shared/event:go_default_library", "//shared/hashutil:go_default_library", + "//shared/messagehandler:go_default_library", "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//core/types:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], ) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 57115c12b0..e907865d6e 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -8,6 +8,8 @@ import ( "fmt" "time" + "github.com/gogo/protobuf/proto" + "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/prysmaticlabs/prysm/beacon-chain/attestation" @@ -21,6 +23,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/hashutil" + handler "github.com/prysmaticlabs/prysm/shared/messagehandler" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -237,23 +240,27 @@ func (c *ChainService) blockProcessing() { // can be received either from the sync service, the RPC service, // or via p2p. case block := <-c.incomingBlockChan: - beaconState, err := c.beaconDB.State(c.ctx) - if err != nil { - log.Errorf("Unable to retrieve beacon state %v", err) - continue - } + handler.SafelyHandleMessage(c.ctx, c.processBlock, block) + } + } +} - if block.Slot > beaconState.Slot { - computedState, err := c.ReceiveBlock(block, beaconState) - if err != nil { - log.Errorf("Could not process received block: %v", err) - continue - } - if err := c.ApplyForkChoiceRule(block, computedState); err != nil { - log.Errorf("Could not update chain head: %v", err) - continue - } - } +func (c *ChainService) processBlock(message proto.Message) { + block := message.(*pb.BeaconBlock) + beaconState, err := c.beaconDB.State(c.ctx) + if err != nil { + log.Errorf("Unable to retrieve beacon state %v", err) + return + } + if block.Slot > beaconState.Slot { + computedState, err := c.ReceiveBlock(block, beaconState) + if err != nil { + log.Errorf("Could not process received block: %v", err) + return + } + if err := c.ApplyForkChoiceRule(block, computedState); err != nil { + log.Errorf("Could not update chain head: %v", err) + return } } } diff --git a/beacon-chain/operations/BUILD.bazel b/beacon-chain/operations/BUILD.bazel index e72274238d..41206bd385 100644 --- a/beacon-chain/operations/BUILD.bazel +++ b/beacon-chain/operations/BUILD.bazel @@ -10,7 +10,9 @@ go_library( "//proto/beacon/p2p/v1:go_default_library", "//shared/event:go_default_library", "//shared/hashutil:go_default_library", + "//shared/messagehandler:go_default_library", "//shared/params:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], ) diff --git a/beacon-chain/operations/service.go b/beacon-chain/operations/service.go index 5d8db25144..b93d1fdc2c 100644 --- a/beacon-chain/operations/service.go +++ b/beacon-chain/operations/service.go @@ -6,10 +6,12 @@ import ( "fmt" "sort" + "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/hashutil" + handler "github.com/prysmaticlabs/prysm/shared/messagehandler" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -138,31 +140,41 @@ func (s *Service) saveOperations() { return // Listen for a newly received incoming exit from the sync service. case exit := <-s.incomingValidatorExits: - hash, err := hashutil.HashProto(exit) - if err != nil { - log.Errorf("Could not hash exit req proto: %v", err) - continue - } - if err := s.beaconDB.SaveExit(exit); err != nil { - log.Errorf("Could not save exit request: %v", err) - continue - } - log.Infof("Exit request %#x saved in DB", hash) + handler.SafelyHandleMessage(s.ctx, s.handleValidatorExits, exit) case attestation := <-s.incomingAtt: - hash, err := hashutil.HashProto(attestation) - if err != nil { - log.Errorf("Could not hash attestation proto: %v", err) - continue - } - if err := s.beaconDB.SaveAttestation(attestation); err != nil { - log.Errorf("Could not save attestation: %v", err) - continue - } - log.Infof("Attestation %#x saved in DB", hash) + handler.SafelyHandleMessage(s.ctx, s.handleAttestations, attestation) } } } +func (s *Service) handleValidatorExits(message proto.Message) { + exit := message.(*pb.VoluntaryExit) + hash, err := hashutil.HashProto(exit) + if err != nil { + log.Errorf("Could not hash exit req proto: %v", err) + return + } + if err := s.beaconDB.SaveExit(exit); err != nil { + log.Errorf("Could not save exit request: %v", err) + return + } + log.Infof("Exit request %#x saved in DB", hash) +} + +func (s *Service) handleAttestations(message proto.Message) { + attestation := message.(*pb.Attestation) + hash, err := hashutil.HashProto(attestation) + if err != nil { + log.Errorf("Could not hash attestation proto: %v", err) + return + } + if err := s.beaconDB.SaveAttestation(attestation); err != nil { + log.Errorf("Could not save attestation: %v", err) + return + } + log.Infof("Attestation %#x saved in DB", hash) +} + // removeOperations removes the processed operations from operation pool and DB. func (s *Service) removeOperations() { incomingBlockSub := s.incomingProcessedBlockFeed.Subscribe(s.incomingProcessedBlock) @@ -178,19 +190,29 @@ func (s *Service) removeOperations() { return // Listen for processed block from the block chain service. case block := <-s.incomingProcessedBlock: + handler.SafelyHandleMessage(s.ctx, s.handleProcessedBlock, block) // Removes the pending attestations received from processed block body in DB. if err := s.removePendingAttestations(block.Body.Attestations); err != nil { log.Errorf("Could not remove processed attestations from DB: %v", err) return } if err := s.removeEpochOldAttestations(block.Slot); err != nil { - log.Errorf("Could not remove old attestations from DB at slot %d: %v",block.Slot, err) + log.Errorf("Could not remove old attestations from DB at slot %d: %v", block.Slot, err) return } } } } +func (s *Service) handleProcessedBlock(message proto.Message) { + block := message.(*pb.BeaconBlock) + // Removes the pending attestations received from processed block body in DB. + if err := s.removePendingAttestations(block.Body.Attestations); err != nil { + log.Errorf("Could not remove processed attestations from DB: %v", err) + return + } +} + // removePendingAttestations removes a list of attestations from DB. func (s *Service) removePendingAttestations(attestations []*pb.Attestation) error { for _, attestation := range attestations { diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index 6637f817d8..d9169fc563 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -456,32 +456,41 @@ func (w *Web3Service) run(done <-chan struct{}) { log.Debug("Unsubscribed to head events, exiting goroutine") return case header := <-w.headerChan: - blockNumberGauge.Set(float64(header.Number.Int64())) - w.blockHeight = header.Number - w.blockHash = header.Hash() - w.blockTime = time.Unix(header.Time.Int64(), 0) - log.WithFields(logrus.Fields{ - "blockNumber": w.blockHeight, - "blockHash": w.blockHash.Hex(), - }).Debug("Latest web3 chain event") - - if err := w.blockCache.AddBlock(gethTypes.NewBlockWithHeader(header)); err != nil { - w.runError = err - log.Errorf("Unable to add block data to cache %v", err) - } + w.processSubscribedHeaders(header) case <-ticker.C: - if w.lastRequestedBlock.Cmp(w.blockHeight) == 0 { - continue - } - if err := w.requestBatchedLogs(); err != nil { - w.runError = err - log.Error(err) - } - + w.handleDelayTicker() } } } +func (w *Web3Service) processSubscribedHeaders(header *gethTypes.Header) { + defer safelyHandlePanic() + blockNumberGauge.Set(float64(header.Number.Int64())) + w.blockHeight = header.Number + w.blockHash = header.Hash() + w.blockTime = time.Unix(header.Time.Int64(), 0) + log.WithFields(logrus.Fields{ + "blockNumber": w.blockHeight, + "blockHash": w.blockHash.Hex(), + }).Debug("Latest web3 chain event") + + if err := w.blockCache.AddBlock(gethTypes.NewBlockWithHeader(header)); err != nil { + w.runError = err + log.Errorf("Unable to add block data to cache %v", err) + } +} + +func (w *Web3Service) handleDelayTicker() { + defer safelyHandlePanic() + if w.lastRequestedBlock.Cmp(w.blockHeight) == 0 { + return + } + if err := w.requestBatchedLogs(); err != nil { + w.runError = err + log.Error(err) + } +} + // initDataFromContract calls the deposit contract and finds the deposit count // and deposit root. func (w *Web3Service) initDataFromContract() error { @@ -552,3 +561,13 @@ func (w *Web3Service) requestBatchedLogs() error { w.lastRequestedBlock.Set(requestedBlock) return nil } + +// safelyHandleHeader will recover and log any panic that occurs from the +// block +func safelyHandlePanic() { + if r := recover(); r != nil { + log.WithFields(logrus.Fields{ + "r": r, + }).Error("Panicked when handling data from ETH 1.0 Chain! Recovering...") + } +} diff --git a/beacon-chain/powchain/service_test.go b/beacon-chain/powchain/service_test.go index 67c152c04e..fbb9d3c7ca 100644 --- a/beacon-chain/powchain/service_test.go +++ b/beacon-chain/powchain/service_test.go @@ -1062,3 +1062,18 @@ func TestBlockExists_UsesCachedBlockInfo(t *testing.T) { t.Fatalf("Block height did not equal expected height, expected: %v, got: %v", big.NewInt(42), height) } } + +func TestHandlePanic_OK(t *testing.T) { + hook := logTest.NewGlobal() + endpoint := "ws://127.0.0.1" + web3Service, err := NewWeb3Service(context.Background(), &Web3ServiceConfig{ + Endpoint: endpoint, + BlockFetcher: nil, // nil blockFetcher would panic if cached value not used + }) + if err != nil { + t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err) + } + + web3Service.processSubscribedHeaders(nil) + testutil.AssertLogsContain(t, hook, "Panicked when handling data from ETH 1.0 Chain!") +} diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 9bc0c3c796..d8ae12a7ba 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -219,18 +219,50 @@ func (s *InitialSync) run(delayChan <-chan time.Time) { return } case msg := <-s.blockAnnounceBuf: - s.processBlockAnnounce(msg) + safelyHandleMessage(s.processBlockAnnounce, msg) case msg := <-s.blockBuf: - data := msg.Data.(*pb.BeaconBlockResponse) - s.processBlock(msg.Ctx, data.Block, msg.Peer) + safelyHandleMessage(func(message p2p.Message) { + data := message.Data.(*pb.BeaconBlockResponse) + s.processBlock(message.Ctx, data.Block, message.Peer) + }, msg) case msg := <-s.stateBuf: - s.processState(msg) + safelyHandleMessage(s.processState, msg) case msg := <-s.batchedBlockBuf: - s.processBatchedBlocks(msg) + safelyHandleMessage(s.processBatchedBlocks, msg) } } } +// safelyHandleMessage will recover and log any panic that occurs from the +// function argument. +func safelyHandleMessage(fn func(p2p.Message), msg p2p.Message) { + defer func() { + if r := recover(); r != nil { + printedMsg := "message contains no data" + if msg.Data != nil { + printedMsg = proto.MarshalTextString(msg.Data) + } + log.WithFields(logrus.Fields{ + "r": r, + "msg": printedMsg, + }).Error("Panicked when handling p2p message! Recovering...") + + if msg.Ctx == nil { + return + } + if span := trace.FromContext(msg.Ctx); span != nil { + span.SetStatus(trace.Status{ + Code: trace.StatusCodeInternal, + Message: fmt.Sprintf("Panic: %v", r), + }) + } + } + }() + + // Fingers crossed that it doesn't panic... + fn(msg) +} + // checkInMemoryBlocks is another routine which will run concurrently with the // main routine for initial sync, where it checks the blocks saved in memory regularly // to see if the blocks are valid enough to be processed. diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index 39d996c284..16864382d3 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -495,3 +495,27 @@ func TestRequestBlocksBySlot_OK(t *testing.T) { hook.Reset() } +func TestSafelyHandleMessage(t *testing.T) { + hook := logTest.NewGlobal() + + safelyHandleMessage(func(_ p2p.Message) { + panic("bad!") + }, p2p.Message{ + Data: &pb.BeaconBlock{}, + }) + + testutil.AssertLogsContain(t, hook, "Panicked when handling p2p message!") +} + +func TestSafelyHandleMessage_NoData(t *testing.T) { + hook := logTest.NewGlobal() + + safelyHandleMessage(func(_ p2p.Message) { + panic("bad!") + }, p2p.Message{}) + + entry := hook.LastEntry() + if entry.Data["msg"] != "message contains no data" { + t.Errorf("Message logged was not what was expected: %s", entry.Data["msg"]) + } +} diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 33e92bfe91..b372e7424f 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -849,28 +849,3 @@ func TestHandleStateReq_OK(t *testing.T) { testutil.AssertLogsContain(t, hook, "Sending beacon state to peer") } - -func TestSafelyHandleMessage(t *testing.T) { - hook := logTest.NewGlobal() - - safelyHandleMessage(func(_ p2p.Message) { - panic("bad!") - }, p2p.Message{ - Data: &pb.BeaconBlock{}, - }) - - testutil.AssertLogsContain(t, hook, "Panicked when handling p2p message!") -} - -func TestSafelyHandleMessage_NoData(t *testing.T) { - hook := logTest.NewGlobal() - - safelyHandleMessage(func(_ p2p.Message) { - panic("bad!") - }, p2p.Message{}) - - entry := hook.LastEntry() - if entry.Data["msg"] != "message contains no data" { - t.Errorf("Message logged was not what was expected: %s", entry.Data["msg"]) - } -} diff --git a/shared/messagehandler/BUILD.bazel b/shared/messagehandler/BUILD.bazel new file mode 100644 index 0000000000..36d90846e4 --- /dev/null +++ b/shared/messagehandler/BUILD.bazel @@ -0,0 +1,25 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["messagehandler.go"], + importpath = "github.com/prysmaticlabs/prysm/shared/messagehandler", + visibility = ["//visibility:public"], + deps = [ + "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@io_opencensus_go//trace:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["messagehandler_test.go"], + embed = [":go_default_library"], + deps = [ + "//proto/beacon/p2p/v1:go_default_library", + "//shared/testutil:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/shared/messagehandler/messagehandler.go b/shared/messagehandler/messagehandler.go new file mode 100644 index 0000000000..6df19528d4 --- /dev/null +++ b/shared/messagehandler/messagehandler.go @@ -0,0 +1,42 @@ +package messagehandler + +import ( + "context" + "fmt" + + "github.com/gogo/protobuf/proto" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +var log = logrus.WithField("prefix", "message-handler") + +// SafelyHandleMessage will recover and log any panic that occurs from the +// function argument. +func SafelyHandleMessage(ctx context.Context, fn func(message proto.Message), msg proto.Message) { + defer func() { + if r := recover(); r != nil { + printedMsg := "message contains no data" + if msg != nil { + printedMsg = proto.MarshalTextString(msg) + } + log.WithFields(logrus.Fields{ + "r": r, + "msg": printedMsg, + }).Error("Panicked when handling p2p message! Recovering...") + + if ctx == nil { + return + } + if span := trace.FromContext(ctx); span != nil { + span.SetStatus(trace.Status{ + Code: trace.StatusCodeInternal, + Message: fmt.Sprintf("Panic: %v", r), + }) + } + } + }() + + // Fingers crossed that it doesn't panic... + fn(msg) +} diff --git a/shared/messagehandler/messagehandler_test.go b/shared/messagehandler/messagehandler_test.go new file mode 100644 index 0000000000..6b5312e5d6 --- /dev/null +++ b/shared/messagehandler/messagehandler_test.go @@ -0,0 +1,34 @@ +package messagehandler + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/testutil" + + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestSafelyHandleMessage(t *testing.T) { + hook := logTest.NewGlobal() + + SafelyHandleMessage(nil, func(_ proto.Message) { + panic("bad!") + }, &pb.BeaconBlock{}) + + testutil.AssertLogsContain(t, hook, "Panicked when handling p2p message!") +} + +func TestSafelyHandleMessage_NoData(t *testing.T) { + hook := logTest.NewGlobal() + + SafelyHandleMessage(nil, func(_ proto.Message) { + panic("bad!") + }, nil) + + entry := hook.LastEntry() + if entry.Data["msg"] != "message contains no data" { + t.Errorf("Message logged was not what was expected: %s", entry.Data["msg"]) + } +} diff --git a/validator/client/validator_propose_test.go b/validator/client/validator_propose_test.go index a1d8601722..faaab70b68 100644 --- a/validator/client/validator_propose_test.go +++ b/validator/client/validator_propose_test.go @@ -265,13 +265,10 @@ func TestProposeBlock_PendingAttestations_UsesCurrentSlot(t *testing.T) { StateRoot: []byte{'F'}, }, nil /*err*/) - var broadcastedBlock *pbp2p.BeaconBlock m.proposerClient.EXPECT().ProposeBlock( gomock.Any(), // context gomock.AssignableToTypeOf(&pbp2p.BeaconBlock{}), - ).Do(func(_ context.Context, blk *pbp2p.BeaconBlock) { - broadcastedBlock = blk - }).Return(&pb.ProposeResponse{}, nil /*error*/) + ).Return(&pb.ProposeResponse{}, nil /*error*/) validator.ProposeBlock(context.Background(), 55) if req.ProposalBlockSlot != 55 {