REST VC: Subscribe to Beacon API events (#13354)

* Initial code for head event streaming

* handle events and error

* keepalive event

* tests

* generate new mock

* remove single case select

* cleanup

* explain eventByteLimit

* use 2 channels in test

* review

* more review

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
Radosław Kapka
2024-01-04 18:14:45 +01:00
committed by GitHub
parent cfef8f4676
commit e68b2821c1
25 changed files with 351 additions and 49 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/network/httputil"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
@@ -39,7 +40,7 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs")
defer span.End()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Content-Type", api.EventStreamMediaType)
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
@@ -108,7 +109,7 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
close(ch)
}()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Content-Type", api.EventStreamMediaType)
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

View File

@@ -11,6 +11,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes/empty"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/io/logs/mock"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -92,7 +93,7 @@ func TestStreamBeaconLogs(t *testing.T) {
}
ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true)
require.Equal(t, ct[0], "text/event-stream")
require.Equal(t, ct[0], api.EventStreamMediaType)
cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive")
@@ -143,7 +144,7 @@ func TestStreamValidatorLogs(t *testing.T) {
}
ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true)
require.Equal(t, ct[0], "text/event-stream")
require.Equal(t, ct[0], api.EventStreamMediaType)
cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive")