mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Run text/event-stream values returned from grpc-gateway through API Middleware (#9080)
* register events handler in v1 * begin `status.Error` messages with uppercase * something works * handle errors * topic comments * handle the rest of event types * gzl * make code more testable + test * better error handling * tests * gzl * fix deps Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
@@ -665,13 +665,13 @@ func (b *BeaconNode) registerGRPCGateway() error {
|
||||
ethpb.RegisterNodeHandler,
|
||||
ethpb.RegisterBeaconChainHandler,
|
||||
ethpb.RegisterBeaconNodeValidatorHandler,
|
||||
ethpbv1.RegisterEventsHandler,
|
||||
pbrpc.RegisterHealthHandler,
|
||||
}
|
||||
v1Registrations := []gateway.PbHandlerRegistration{
|
||||
ethpbv1.RegisterBeaconNodeHandler,
|
||||
ethpbv1.RegisterBeaconChainHandler,
|
||||
ethpbv1.RegisterBeaconValidatorHandler,
|
||||
ethpbv1.RegisterEventsHandler,
|
||||
}
|
||||
if enableDebugRPCEndpoints {
|
||||
v1Alpha1Registrations = append(v1Alpha1Registrations, pbrpc.RegisterDebugHandler)
|
||||
|
||||
@@ -12,11 +12,13 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/apimiddleware",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/rpc/eventsv1:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/gateway:go_default_library",
|
||||
"//shared/grpcutils:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_r3labs_sse//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -28,10 +30,12 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//beacon-chain/rpc/eventsv1:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/gateway:go_default_library",
|
||||
"//shared/grpcutils:go_default_library",
|
||||
"//shared/testutil/assert:go_default_library",
|
||||
"//shared/testutil/require:go_default_library",
|
||||
"@com_github_r3labs_sse//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -3,6 +3,8 @@ package apimiddleware
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -10,8 +12,10 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1"
|
||||
"github.com/prysmaticlabs/prysm/shared/gateway"
|
||||
"github.com/prysmaticlabs/prysm/shared/grpcutils"
|
||||
"github.com/r3labs/sse"
|
||||
)
|
||||
|
||||
type sszConfig struct {
|
||||
@@ -157,3 +161,120 @@ func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWrite
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleEvents(m *gateway.ApiProxyMiddleware, _ gateway.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
|
||||
sseClient := sse.NewClient("http://" + m.GatewayAddress + req.URL.RequestURI())
|
||||
eventChan := make(chan *sse.Event)
|
||||
|
||||
// We use grpc-gateway as the server side of events, not the sse library.
|
||||
// Because of this subscribing to streams doesn't work as intended, resulting in each event being handled by all subscriptions.
|
||||
// To handle events properly, we subscribe just once using a placeholder value ('events') and handle all topics inside this subscription.
|
||||
if err := sseClient.SubscribeChan("events", eventChan); err != nil {
|
||||
gateway.WriteError(w, &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}, nil)
|
||||
sseClient.Unsubscribe(eventChan)
|
||||
return
|
||||
}
|
||||
|
||||
errJson := receiveEvents(eventChan, w, req)
|
||||
if errJson != nil {
|
||||
gateway.WriteError(w, errJson, nil)
|
||||
}
|
||||
|
||||
sseClient.Unsubscribe(eventChan)
|
||||
return true
|
||||
}
|
||||
|
||||
func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) gateway.ErrorJson {
|
||||
for {
|
||||
select {
|
||||
case msg := <-eventChan:
|
||||
var data interface{}
|
||||
|
||||
switch strings.TrimSpace(string(msg.Event)) {
|
||||
case eventsv1.HeadTopic:
|
||||
data = &eventHeadJson{}
|
||||
case eventsv1.BlockTopic:
|
||||
data = &receivedBlockDataJson{}
|
||||
case eventsv1.AttestationTopic:
|
||||
data = &attestationJson{}
|
||||
|
||||
// Data received in the event does not fit the expected event stream output.
|
||||
// We extract the underlying attestation from event data
|
||||
// and assign the attestation back to event data for further processing.
|
||||
eventData := &aggregatedAttReceivedDataJson{}
|
||||
if err := json.Unmarshal(msg.Data, eventData); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
attData, err := json.Marshal(eventData.Aggregate)
|
||||
if err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
msg.Data = attData
|
||||
case eventsv1.VoluntaryExitTopic:
|
||||
data = &signedVoluntaryExitJson{}
|
||||
case eventsv1.FinalizedCheckpointTopic:
|
||||
data = &eventFinalizedCheckpointJson{}
|
||||
case eventsv1.ChainReorgTopic:
|
||||
data = &eventChainReorgJson{}
|
||||
case "error":
|
||||
data = &eventErrorJson{}
|
||||
default:
|
||||
return &gateway.DefaultErrorJson{
|
||||
Message: fmt.Sprintf("Event type '%s' not supported", strings.TrimSpace(string(msg.Event))),
|
||||
Code: http.StatusInternalServerError,
|
||||
}
|
||||
}
|
||||
|
||||
if errJson := writeEvent(msg, w, data); errJson != nil {
|
||||
return errJson
|
||||
}
|
||||
if errJson := flushEvent(w); errJson != nil {
|
||||
return errJson
|
||||
}
|
||||
case <-req.Context().Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeEvent(msg *sse.Event, w http.ResponseWriter, data interface{}) gateway.ErrorJson {
|
||||
if err := json.Unmarshal(msg.Data, data); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
if errJson := gateway.ProcessMiddlewareResponseFields(data); errJson != nil {
|
||||
return errJson
|
||||
}
|
||||
dataJson, errJson := gateway.SerializeMiddlewareResponseIntoJson(data)
|
||||
if errJson != nil {
|
||||
return errJson
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
|
||||
if _, err := w.Write([]byte("event: ")); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
if _, err := w.Write(msg.Event); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
if _, err := w.Write([]byte("\ndata: ")); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
if _, err := w.Write(dataJson); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
if _, err := w.Write([]byte("\n\n")); err != nil {
|
||||
return &gateway.DefaultErrorJson{Message: err.Error(), Code: http.StatusInternalServerError}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushEvent(w http.ResponseWriter) gateway.ErrorJson {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
return &gateway.DefaultErrorJson{Message: fmt.Sprintf("Flush not supported in %T", w), Code: http.StatusInternalServerError}
|
||||
}
|
||||
flusher.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,15 +2,20 @@ package apimiddleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/eventsv1"
|
||||
"github.com/prysmaticlabs/prysm/shared/gateway"
|
||||
"github.com/prysmaticlabs/prysm/shared/grpcutils"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
"github.com/r3labs/sse"
|
||||
)
|
||||
|
||||
func TestSSZRequested(t *testing.T) {
|
||||
@@ -136,3 +141,74 @@ func TestWriteSSZResponseHeaderAndBody(t *testing.T) {
|
||||
assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode())
|
||||
})
|
||||
}
|
||||
|
||||
func TestReceiveEvents(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ch := make(chan *sse.Event)
|
||||
w := httptest.NewRecorder()
|
||||
w.Body = &bytes.Buffer{}
|
||||
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
go func() {
|
||||
base64Val := "Zm9v"
|
||||
data := &eventFinalizedCheckpointJson{
|
||||
Block: base64Val,
|
||||
State: base64Val,
|
||||
Epoch: "1",
|
||||
}
|
||||
bData, err := json.Marshal(data)
|
||||
require.NoError(t, err)
|
||||
msg := &sse.Event{
|
||||
Data: bData,
|
||||
Event: []byte(eventsv1.FinalizedCheckpointTopic),
|
||||
}
|
||||
ch <- msg
|
||||
time.Sleep(time.Second)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
errJson := receiveEvents(ch, w, req)
|
||||
assert.Equal(t, true, errJson == nil)
|
||||
}
|
||||
|
||||
func TestReceiveEvents_EventNotSupported(t *testing.T) {
|
||||
ch := make(chan *sse.Event)
|
||||
w := httptest.NewRecorder()
|
||||
w.Body = &bytes.Buffer{}
|
||||
req := httptest.NewRequest("GET", "http://foo.example", &bytes.Buffer{})
|
||||
|
||||
go func() {
|
||||
msg := &sse.Event{
|
||||
Data: []byte("foo"),
|
||||
Event: []byte("not_supported"),
|
||||
}
|
||||
ch <- msg
|
||||
}()
|
||||
|
||||
errJson := receiveEvents(ch, w, req)
|
||||
require.NotNil(t, errJson)
|
||||
assert.Equal(t, "Event type 'not_supported' not supported", errJson.Msg())
|
||||
}
|
||||
|
||||
func TestWriteEvent(t *testing.T) {
|
||||
base64Val := "Zm9v"
|
||||
data := &eventFinalizedCheckpointJson{
|
||||
Block: base64Val,
|
||||
State: base64Val,
|
||||
Epoch: "1",
|
||||
}
|
||||
bData, err := json.Marshal(data)
|
||||
require.NoError(t, err)
|
||||
msg := &sse.Event{
|
||||
Data: bData,
|
||||
Event: []byte("test_event"),
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
w.Body = &bytes.Buffer{}
|
||||
|
||||
errJson := writeEvent(msg, w, &eventFinalizedCheckpointJson{})
|
||||
require.Equal(t, true, errJson == nil)
|
||||
written := w.Body.String()
|
||||
assert.Equal(t, "event: test_event\ndata: {\"block\":\"0x666f6f\",\"state\":\"0x666f6f\",\"epoch\":\"1\"}\n\n", written)
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ func (f *BeaconEndpointFactory) Paths() []string {
|
||||
"/eth/v1/config/fork_schedule",
|
||||
"/eth/v1/config/deposit_contract",
|
||||
"/eth/v1/config/spec",
|
||||
"/eth/v1/events",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,6 +218,13 @@ func (f *BeaconEndpointFactory) Create(path string) (*gateway.Endpoint, error) {
|
||||
GetResponse: &specResponseJson{},
|
||||
Err: &gateway.DefaultErrorJson{},
|
||||
}
|
||||
case "/eth/v1/events":
|
||||
endpoint = gateway.Endpoint{
|
||||
Err: &gateway.DefaultErrorJson{},
|
||||
Hooks: gateway.HookCollection{
|
||||
CustomHandlers: []gateway.CustomHandler{handleEvents},
|
||||
},
|
||||
}
|
||||
default:
|
||||
return nil, errors.New("invalid path")
|
||||
}
|
||||
|
||||
@@ -455,6 +455,45 @@ func (ssz *beaconStateSSZResponseJson) SSZData() string {
|
||||
return ssz.Data
|
||||
}
|
||||
|
||||
// TODO: Documentation
|
||||
// ---------------
|
||||
// Events.
|
||||
// ---------------
|
||||
|
||||
type eventHeadJson struct {
|
||||
Slot string `json:"slot"`
|
||||
Block string `json:"block" hex:"true"`
|
||||
State string `json:"state" hex:"true"`
|
||||
EpochTransition bool `json:"epoch_transition"`
|
||||
PreviousDutyDependentRoot string `json:"previous_duty_dependent_root" hex:"true"`
|
||||
CurrentDutyDependentRoot string `json:"current_duty_dependent_root" hex:"true"`
|
||||
}
|
||||
|
||||
type receivedBlockDataJson struct {
|
||||
Slot string `json:"slot"`
|
||||
Block string `json:"block" hex:"true"`
|
||||
}
|
||||
|
||||
type aggregatedAttReceivedDataJson struct {
|
||||
Aggregate *attestationJson `json:"aggregate"`
|
||||
}
|
||||
|
||||
type eventFinalizedCheckpointJson struct {
|
||||
Block string `json:"block" hex:"true"`
|
||||
State string `json:"state" hex:"true"`
|
||||
Epoch string `json:"epoch"`
|
||||
}
|
||||
|
||||
type eventChainReorgJson struct {
|
||||
Slot string `json:"slot"`
|
||||
Depth string `json:"depth"`
|
||||
OldHeadBlock string `json:"old_head_block" hex:"true"`
|
||||
NewHeadBlock string `json:"old_head_state" hex:"true"`
|
||||
OldHeadState string `json:"new_head_block" hex:"true"`
|
||||
NewHeadState string `json:"new_head_state" hex:"true"`
|
||||
Epoch string `json:"epoch"`
|
||||
}
|
||||
|
||||
// ---------------
|
||||
// Error handling.
|
||||
// ---------------
|
||||
@@ -470,3 +509,8 @@ type singleAttestationVerificationFailureJson struct {
|
||||
Index int `json:"index"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type eventErrorJson struct {
|
||||
StatusCode int `json:"status_code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ go_library(
|
||||
"//proto/eth/v1:go_default_library",
|
||||
"//proto/migration:go_default_library",
|
||||
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@org_golang_google_grpc//codes:go_default_library",
|
||||
"@org_golang_google_grpc//status:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
|
||||
@@ -2,6 +2,7 @@ package eventsv1
|
||||
|
||||
import (
|
||||
gwpb "github.com/grpc-ecosystem/grpc-gateway/v2/proto/gateway"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
||||
@@ -15,21 +16,27 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
headTopic = "head"
|
||||
blockTopic = "block"
|
||||
attestationTopic = "attestation"
|
||||
voluntaryExitTopic = "voluntary_exit"
|
||||
finalizedCheckpointTopic = "finalized_checkpoint"
|
||||
chainReorgTopic = "chain_reorg"
|
||||
// HeadTopic represents a new chain head event topic.
|
||||
HeadTopic = "head"
|
||||
// BlockTopic represents a new produced block event topic.
|
||||
BlockTopic = "block"
|
||||
// AttestationTopic represents a new submitted attestation event topic.
|
||||
AttestationTopic = "attestation"
|
||||
// VoluntaryExitTopic represents a new performed voluntary exit event topic.
|
||||
VoluntaryExitTopic = "voluntary_exit"
|
||||
// FinalizedCheckpointTopic represents a new finalized checkpoint event topic.
|
||||
FinalizedCheckpointTopic = "finalized_checkpoint"
|
||||
// ChainReorgTopic represents a chain reorganization event topic.
|
||||
ChainReorgTopic = "chain_reorg"
|
||||
)
|
||||
|
||||
var casesHandled = map[string]bool{
|
||||
headTopic: true,
|
||||
blockTopic: true,
|
||||
attestationTopic: true,
|
||||
voluntaryExitTopic: true,
|
||||
finalizedCheckpointTopic: true,
|
||||
chainReorgTopic: true,
|
||||
HeadTopic: true,
|
||||
BlockTopic: true,
|
||||
AttestationTopic: true,
|
||||
VoluntaryExitTopic: true,
|
||||
FinalizedCheckpointTopic: true,
|
||||
ChainReorgTopic: true,
|
||||
}
|
||||
|
||||
// StreamEvents allows requesting all events from a set of topics defined in the eth2.0-apis standard.
|
||||
@@ -39,13 +46,13 @@ func (s *Server) StreamEvents(
|
||||
req *ethpb.StreamEventsRequest, stream ethpb.Events_StreamEventsServer,
|
||||
) error {
|
||||
if req == nil || len(req.Topics) == 0 {
|
||||
return status.Error(codes.InvalidArgument, "no topics specified to subscribe to")
|
||||
return status.Error(codes.InvalidArgument, "No topics specified to subscribe to")
|
||||
}
|
||||
// Check if the topics in the request are valid.
|
||||
requestedTopics := make(map[string]bool)
|
||||
for _, topic := range req.Topics {
|
||||
if _, ok := casesHandled[topic]; !ok {
|
||||
return status.Errorf(codes.InvalidArgument, "topic %s not allowed for event subscriptions", topic)
|
||||
return status.Errorf(codes.InvalidArgument, "Topic %s not allowed for event subscriptions", topic)
|
||||
}
|
||||
requestedTopics[topic] = true
|
||||
}
|
||||
@@ -69,20 +76,20 @@ func (s *Server) StreamEvents(
|
||||
select {
|
||||
case event := <-blockChan:
|
||||
if err := s.handleBlockEvents(stream, requestedTopics, event); err != nil {
|
||||
return status.Errorf(codes.Internal, "could not handle block event: %v", err)
|
||||
return status.Errorf(codes.Internal, "Could not handle block event: %v", err)
|
||||
}
|
||||
case event := <-opsChan:
|
||||
if err := s.handleBlockOperationEvents(stream, requestedTopics, event); err != nil {
|
||||
return status.Errorf(codes.Internal, "could not handle block operations event: %v", err)
|
||||
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err)
|
||||
}
|
||||
case event := <-stateChan:
|
||||
if err := s.handleStateEvents(stream, requestedTopics, event); err != nil {
|
||||
return status.Errorf(codes.Internal, "could not handle state event: %v", err)
|
||||
return status.Errorf(codes.Internal, "Could not handle state event: %v", err)
|
||||
}
|
||||
case <-s.Ctx.Done():
|
||||
return status.Errorf(codes.Canceled, "context canceled")
|
||||
return status.Errorf(codes.Canceled, "Context canceled")
|
||||
case <-stream.Context().Done():
|
||||
return status.Errorf(codes.Canceled, "context canceled")
|
||||
return status.Errorf(codes.Canceled, "Context canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,7 +99,7 @@ func (s *Server) handleBlockEvents(
|
||||
) error {
|
||||
switch event.Type {
|
||||
case blockfeed.ReceivedBlock:
|
||||
if _, ok := requestedTopics[blockTopic]; !ok {
|
||||
if _, ok := requestedTopics[BlockTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
blkData, ok := event.Data.(*blockfeed.ReceivedBlockData)
|
||||
@@ -105,13 +112,13 @@ func (s *Server) handleBlockEvents(
|
||||
}
|
||||
item, err := v1Data.HashTreeRoot()
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "could not hash tree root block %v", err)
|
||||
return errors.Wrap(err, "could not hash tree root block")
|
||||
}
|
||||
eventBlock := ðpb.EventBlock{
|
||||
Slot: v1Data.Message.Slot,
|
||||
Block: item[:],
|
||||
}
|
||||
return s.streamData(stream, blockTopic, eventBlock)
|
||||
return s.streamData(stream, BlockTopic, eventBlock)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
@@ -122,7 +129,7 @@ func (s *Server) handleBlockOperationEvents(
|
||||
) error {
|
||||
switch event.Type {
|
||||
case operation.AggregatedAttReceived:
|
||||
if _, ok := requestedTopics[attestationTopic]; !ok {
|
||||
if _, ok := requestedTopics[AttestationTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
attData, ok := event.Data.(*operation.AggregatedAttReceivedData)
|
||||
@@ -130,9 +137,9 @@ func (s *Server) handleBlockOperationEvents(
|
||||
return nil
|
||||
}
|
||||
v1Data := migration.V1Alpha1AggregateAttAndProofToV1(attData.Attestation)
|
||||
return s.streamData(stream, attestationTopic, v1Data)
|
||||
return s.streamData(stream, AttestationTopic, v1Data)
|
||||
case operation.UnaggregatedAttReceived:
|
||||
if _, ok := requestedTopics[attestationTopic]; !ok {
|
||||
if _, ok := requestedTopics[AttestationTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
|
||||
@@ -140,9 +147,9 @@ func (s *Server) handleBlockOperationEvents(
|
||||
return nil
|
||||
}
|
||||
v1Data := migration.V1Alpha1AttestationToV1(attData.Attestation)
|
||||
return s.streamData(stream, attestationTopic, v1Data)
|
||||
return s.streamData(stream, AttestationTopic, v1Data)
|
||||
case operation.ExitReceived:
|
||||
if _, ok := requestedTopics[voluntaryExitTopic]; !ok {
|
||||
if _, ok := requestedTopics[VoluntaryExitTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
exitData, ok := event.Data.(*operation.ExitReceivedData)
|
||||
@@ -150,7 +157,7 @@ func (s *Server) handleBlockOperationEvents(
|
||||
return nil
|
||||
}
|
||||
v1Data := migration.V1Alpha1ExitToV1(exitData.Exit)
|
||||
return s.streamData(stream, voluntaryExitTopic, v1Data)
|
||||
return s.streamData(stream, VoluntaryExitTopic, v1Data)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
@@ -161,32 +168,32 @@ func (s *Server) handleStateEvents(
|
||||
) error {
|
||||
switch event.Type {
|
||||
case statefeed.NewHead:
|
||||
if _, ok := requestedTopics[headTopic]; !ok {
|
||||
if _, ok := requestedTopics[HeadTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
head, ok := event.Data.(*ethpb.EventHead)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return s.streamData(stream, headTopic, head)
|
||||
return s.streamData(stream, HeadTopic, head)
|
||||
case statefeed.FinalizedCheckpoint:
|
||||
if _, ok := requestedTopics[finalizedCheckpointTopic]; !ok {
|
||||
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
finalizedCheckpoint, ok := event.Data.(*ethpb.EventFinalizedCheckpoint)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return s.streamData(stream, finalizedCheckpointTopic, finalizedCheckpoint)
|
||||
return s.streamData(stream, FinalizedCheckpointTopic, finalizedCheckpoint)
|
||||
case statefeed.Reorg:
|
||||
if _, ok := requestedTopics[chainReorgTopic]; !ok {
|
||||
if _, ok := requestedTopics[ChainReorgTopic]; !ok {
|
||||
return nil
|
||||
}
|
||||
reorg, ok := event.Data.(*ethpb.EventChainReorg)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return s.streamData(stream, chainReorgTopic, reorg)
|
||||
return s.streamData(stream, ChainReorgTopic, reorg)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestStreamEvents_Preconditions(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
mockStream := mock.NewMockEvents_StreamEventsServer(ctrl)
|
||||
err := srv.StreamEvents(ðpb.StreamEventsRequest{Topics: nil}, mockStream)
|
||||
require.ErrorContains(t, "no topics specified", err)
|
||||
require.ErrorContains(t, "No topics specified", err)
|
||||
})
|
||||
t.Run("topic_not_allowed", func(t *testing.T) {
|
||||
srv := &Server{}
|
||||
@@ -38,12 +38,12 @@ func TestStreamEvents_Preconditions(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
mockStream := mock.NewMockEvents_StreamEventsServer(ctrl)
|
||||
err := srv.StreamEvents(ðpb.StreamEventsRequest{Topics: []string{"foobar"}}, mockStream)
|
||||
require.ErrorContains(t, "topic foobar not allowed", err)
|
||||
require.ErrorContains(t, "Topic foobar not allowed", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamEvents_BlockEvents(t *testing.T) {
|
||||
t.Run(blockTopic, func(t *testing.T) {
|
||||
t.Run(BlockTopic, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, ctrl, mockStream := setupServer(ctx, t)
|
||||
defer ctrl.Finish()
|
||||
@@ -61,14 +61,14 @@ func TestStreamEvents_BlockEvents(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: blockTopic,
|
||||
Event: BlockTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{blockTopic},
|
||||
topics: []string{BlockTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -98,14 +98,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: attestationTopic,
|
||||
Event: AttestationTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{attestationTopic},
|
||||
topics: []string{AttestationTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -130,14 +130,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: attestationTopic,
|
||||
Event: AttestationTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{attestationTopic},
|
||||
topics: []string{AttestationTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -149,7 +149,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
feed: srv.OperationNotifier.OperationFeed(),
|
||||
})
|
||||
})
|
||||
t.Run(voluntaryExitTopic, func(t *testing.T) {
|
||||
t.Run(VoluntaryExitTopic, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, ctrl, mockStream := setupServer(ctx, t)
|
||||
defer ctrl.Finish()
|
||||
@@ -166,14 +166,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: voluntaryExitTopic,
|
||||
Event: VoluntaryExitTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{voluntaryExitTopic},
|
||||
topics: []string{VoluntaryExitTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -188,7 +188,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
t.Run(headTopic, func(t *testing.T) {
|
||||
t.Run(HeadTopic, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, ctrl, mockStream := setupServer(ctx, t)
|
||||
defer ctrl.Finish()
|
||||
@@ -204,14 +204,14 @@ func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
genericResponse, err := anypb.New(wantedHead)
|
||||
require.NoError(t, err)
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: headTopic,
|
||||
Event: HeadTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{headTopic},
|
||||
topics: []string{HeadTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -221,7 +221,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
feed: srv.StateNotifier.StateFeed(),
|
||||
})
|
||||
})
|
||||
t.Run(finalizedCheckpointTopic, func(t *testing.T) {
|
||||
t.Run(FinalizedCheckpointTopic, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, ctrl, mockStream := setupServer(ctx, t)
|
||||
defer ctrl.Finish()
|
||||
@@ -234,14 +234,14 @@ func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
genericResponse, err := anypb.New(wantedCheckpoint)
|
||||
require.NoError(t, err)
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: finalizedCheckpointTopic,
|
||||
Event: FinalizedCheckpointTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{finalizedCheckpointTopic},
|
||||
topics: []string{FinalizedCheckpointTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
@@ -251,7 +251,7 @@ func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
feed: srv.StateNotifier.StateFeed(),
|
||||
})
|
||||
})
|
||||
t.Run(chainReorgTopic, func(t *testing.T) {
|
||||
t.Run(ChainReorgTopic, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, ctrl, mockStream := setupServer(ctx, t)
|
||||
defer ctrl.Finish()
|
||||
@@ -268,14 +268,14 @@ func TestStreamEvents_StateEvents(t *testing.T) {
|
||||
genericResponse, err := anypb.New(wantedReorg)
|
||||
require.NoError(t, err)
|
||||
wantedMessage := &gateway.EventSource{
|
||||
Event: chainReorgTopic,
|
||||
Event: ChainReorgTopic,
|
||||
Data: genericResponse,
|
||||
}
|
||||
|
||||
assertFeedSendAndReceive(ctx, &assertFeedArgs{
|
||||
t: t,
|
||||
srv: srv,
|
||||
topics: []string{chainReorgTopic},
|
||||
topics: []string{ChainReorgTopic},
|
||||
stream: mockStream,
|
||||
shouldReceive: wantedMessage,
|
||||
itemToSend: &feed.Event{
|
||||
|
||||
@@ -46,13 +46,13 @@ func main() {
|
||||
ethpb.RegisterNodeHandler,
|
||||
ethpb.RegisterBeaconChainHandler,
|
||||
ethpb.RegisterBeaconNodeValidatorHandler,
|
||||
ethpbv1.RegisterEventsHandler,
|
||||
pbrpc.RegisterHealthHandler,
|
||||
}
|
||||
v1Registrations := []gateway.PbHandlerRegistration{
|
||||
ethpbv1.RegisterBeaconNodeHandler,
|
||||
ethpbv1.RegisterBeaconChainHandler,
|
||||
ethpbv1.RegisterBeaconValidatorHandler,
|
||||
ethpbv1.RegisterEventsHandler,
|
||||
}
|
||||
if *enableDebugRPCEndpoints {
|
||||
v1Alpha1Registrations = append(v1Alpha1Registrations, pbrpc.RegisterDebugHandler)
|
||||
|
||||
15
deps.bzl
15
deps.bzl
@@ -1800,6 +1800,7 @@ def prysm_deps():
|
||||
sum = "h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=",
|
||||
version = "v1.2.3",
|
||||
)
|
||||
|
||||
go_repository(
|
||||
name = "com_github_klauspost_cpuid_v2",
|
||||
importpath = "github.com/klauspost/cpuid/v2",
|
||||
@@ -2805,6 +2806,13 @@ def prysm_deps():
|
||||
sum = "h1:JCHLVE3B+kJde7bIEo5N4J+ZbLhp0J1Fs+ulyRws4gE=",
|
||||
version = "v0.0.0-20160726150825-5bd2802263f2",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_r3labs_sse",
|
||||
importpath = "github.com/r3labs/sse",
|
||||
sum = "h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=",
|
||||
version = "v0.0.0-20210224172625-26fe804710bc",
|
||||
)
|
||||
|
||||
go_repository(
|
||||
name = "com_github_rcrowley_go_metrics",
|
||||
importpath = "github.com/rcrowley/go-metrics",
|
||||
@@ -3411,6 +3419,13 @@ def prysm_deps():
|
||||
sum = "h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY=",
|
||||
version = "v1.0.0-20160220154919-db14e161995a",
|
||||
)
|
||||
go_repository(
|
||||
name = "in_gopkg_cenkalti_backoff_v1",
|
||||
importpath = "gopkg.in/cenkalti/backoff.v1",
|
||||
sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=",
|
||||
version = "v1.1.0",
|
||||
)
|
||||
|
||||
go_repository(
|
||||
name = "in_gopkg_check_v1",
|
||||
importpath = "gopkg.in/check.v1",
|
||||
|
||||
3
go.mod
3
go.mod
@@ -90,6 +90,7 @@ require (
|
||||
github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d
|
||||
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c
|
||||
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1
|
||||
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/schollz/progressbar/v3 v3.3.4
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
@@ -133,6 +134,6 @@ replace github.com/ethereum/go-ethereum => github.com/prysmaticlabs/bazel-go-eth
|
||||
replace github.com/json-iterator/go => github.com/prestonvanloon/go v1.1.7-0.20190722034630-4f2e55fcf87b
|
||||
|
||||
// See https://github.com/prysmaticlabs/grpc-gateway/issues/2
|
||||
replace github.com/grpc-ecosystem/grpc-gateway/v2 => github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503
|
||||
replace github.com/grpc-ecosystem/grpc-gateway/v2 => github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380
|
||||
|
||||
replace github.com/ferranbt/fastssz => github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f
|
||||
|
||||
9
go.sum
9
go.sum
@@ -1071,12 +1071,14 @@ github.com/prysmaticlabs/eth2-types v0.0.0-20210303084904-c9735a06829d/go.mod h1
|
||||
github.com/prysmaticlabs/go-bitfield v0.0.0-20210108222456-8e92c3709aa0/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s=
|
||||
github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d h1:46gKr69IlRpv/ENdlzG0SWo5nMLKJxS3tI5NOSdZndQ=
|
||||
github.com/prysmaticlabs/go-bitfield v0.0.0-20210607200045-4da71aaf6c2d/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s=
|
||||
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503 h1:QzTDCXA7FV2tIJ7TGHfEsYfa8QaAeMB1F4B5jAsGQNg=
|
||||
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210604200058-f148bcf3f503/go.mod h1:IOyTYjcIO0rkmnGBfJTL0NJ11exy/Tc2QEuv7hCXp24=
|
||||
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380 h1:KzQOksIZB8poBiMk8h5Txzbp/OoBLFhS3H20ZN06hWg=
|
||||
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20210622145107-ca3041e1b380/go.mod h1:IOyTYjcIO0rkmnGBfJTL0NJ11exy/Tc2QEuv7hCXp24=
|
||||
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO/VN0s9k+RmLykho7AjDxblNYI5bYKed16NPU=
|
||||
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M=
|
||||
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1 h1:k7CCMwN7VooQ7GhfySnaVyI4/9+QbhJTdasoC6VOZOI=
|
||||
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20210504233148-1e141af6a0a1/go.mod h1:au9l1XcWNEKixIlSRzEe54fYGhyELWgJJIxKu8W75Mc=
|
||||
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
|
||||
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
|
||||
github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f h1:CoD/RaM9s8qfHA7jAqntW3jv+z9zPBXaxCaCByrKOmg=
|
||||
github.com/rauljordan/fastssz v0.0.0-20210622230010-a131010e198f/go.mod h1:DyEu2iuLBnb/T51BlsiO3yLYdJC6UbGMrIkqK1KmQxM=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
@@ -1353,6 +1355,7 @@ golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -1682,6 +1685,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610=
|
||||
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
|
||||
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
|
||||
Reference in New Issue
Block a user