Skips Attestation Older than Finalized Slot (#1623)

* fixed epoch_processing

* sync skips attestation with slot < finalized slot

* lint
This commit is contained in:
terence tsao
2019-02-17 14:10:57 -08:00
committed by Raul Jordan
parent 25c5c9f6cc
commit fc1aacaa54
3 changed files with 86 additions and 26 deletions

View File

@@ -10,7 +10,6 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/attestations:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/sync/initial-sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",

View File

@@ -8,7 +8,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/ssz"
"github.com/gogo/protobuf/proto"
att "github.com/prysmaticlabs/prysm/beacon-chain/core/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -302,24 +301,41 @@ func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) {
// receiveAttestation accepts an broadcasted attestation from the p2p layer,
// discard the attestation if we have gotten before, send it to attestation
// service if we have not.
// pool if we have not.
func (rs *RegularSync) receiveAttestation(msg p2p.Message) {
data := msg.Data.(*pb.Attestation)
a := data
h := att.Key(a.Data)
ctx, receiveAttestationSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveAttestation")
defer receiveAttestationSpan.End()
attestation, err := rs.db.Attestation(h)
attestation := msg.Data.(*pb.Attestation)
attestationRoot, err := ssz.TreeHash(attestation)
if err != nil {
log.Errorf("Could not check for attestation in DB: %v", err)
return
log.Errorf("Could not hash received attestation: %v", err)
}
if rs.db.HasAttestation(h) {
log.Debugf("Received, skipping attestation #%x", h)
// Skip if attestation has been seen before.
if rs.db.HasAttestation(attestationRoot) {
log.Debugf("Received, skipping attestation #%x", attestationRoot)
return
}
log.WithField("attestationHash", fmt.Sprintf("%#x", h)).Debug("Forwarding attestation to subscribed services")
// Skip if attestation slot is older than last finalized slot in state.
beaconState, err := rs.db.State()
if err != nil {
log.Errorf("Failed to get beacon state: %v", err)
return
}
finalizedSlot := beaconState.FinalizedEpoch * params.BeaconConfig().EpochLength
if attestation.Data.Slot < finalizedSlot {
log.Debugf("Skipping received attestation with slot smaller than last finalized slot, %d < %d",
attestation.Data.Slot, finalizedSlot)
return
}
_, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation")
log.WithField("attestationHash", fmt.Sprintf("%#x", attestationRoot)).Debug("Sending newly received attestation to subscribers")
rs.operationsService.IncomingAttFeed().Send(attestation)
sendAttestationSpan.End()
}
// receiveExitRequest accepts an broadcasted exit from the p2p layer,

View File

@@ -2,13 +2,12 @@ package sync
import (
"context"
"fmt"
"io/ioutil"
"strconv"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/ssz"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
@@ -19,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/ssz"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
@@ -381,16 +381,16 @@ func TestReceiveAttestation_Ok(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
if err := db.SaveState(&pb.BeaconState{
FinalizedEpoch: params.BeaconConfig().GenesisEpoch,
}); err != nil {
t.Fatalf("Could not save state: %v", err)
}
cfg := &RegularSyncConfig{
BlockAnnounceBufferSize: 0,
BlockBufferSize: 0,
BlockReqHashBufferSize: 0,
BlockReqSlotBufferSize: 0,
ChainService: ms,
OperationService: os,
P2P: &mockP2P{},
BeaconDB: db,
ChainService: ms,
OperationService: os,
P2P: &mockP2P{},
BeaconDB: db,
}
ss := NewRegularSyncService(context.Background(), cfg)
@@ -401,9 +401,8 @@ func TestReceiveAttestation_Ok(t *testing.T) {
}()
request1 := &pb.Attestation{
AggregationBitfield: []byte{99},
Data: &pb.AttestationData{
Slot: 0,
Slot: params.BeaconConfig().GenesisSlot + 1,
},
}
@@ -416,7 +415,53 @@ func TestReceiveAttestation_Ok(t *testing.T) {
ss.attestationBuf <- msg1
ss.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, "Forwarding attestation to subscribed services")
testutil.AssertLogsContain(t, hook, "Sending newly received attestation to subscribers")
}
func TestReceiveAttestation_OlderThanFinalizedEpoch(t *testing.T) {
hook := logTest.NewGlobal()
ms := &mockChainService{}
os := &mockOperationService{}
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
state := &pb.BeaconState{FinalizedEpoch: params.BeaconConfig().GenesisEpoch + 1}
if err := db.SaveState(state); err != nil {
t.Fatalf("Could not save state: %v", err)
}
cfg := &RegularSyncConfig{
ChainService: ms,
OperationService: os,
P2P: &mockP2P{},
BeaconDB: db,
}
ss := NewRegularSyncService(context.Background(), cfg)
exitRoutine := make(chan bool)
go func() {
ss.run()
exitRoutine <- true
}()
request1 := &pb.Attestation{
Data: &pb.AttestationData{
Slot: params.BeaconConfig().GenesisSlot + 1,
},
}
msg1 := p2p.Message{
Ctx: context.Background(),
Data: request1,
Peer: p2p.Peer{},
}
ss.attestationBuf <- msg1
ss.cancel()
<-exitRoutine
want := fmt.Sprintf(
"Skipping received attestation with slot smaller than last finalized slot, %d < %d",
request1.Data.Slot, state.FinalizedEpoch*params.BeaconConfig().EpochLength)
testutil.AssertLogsContain(t, hook, want)
}
func TestReceiveExitReq_Ok(t *testing.T) {