diff --git a/endtoend/components/BUILD.bazel b/endtoend/components/BUILD.bazel index 5cdfc9e78d..c875280c2f 100644 --- a/endtoend/components/BUILD.bazel +++ b/endtoend/components/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "eth1.go", "log.go", "slasher.go", + "tracing_sink.go", "validator.go", ], importpath = "github.com/prysmaticlabs/prysm/endtoend/components", diff --git a/endtoend/components/tracing_sink.go b/endtoend/components/tracing_sink.go new file mode 100644 index 0000000000..892c1450fc --- /dev/null +++ b/endtoend/components/tracing_sink.go @@ -0,0 +1,106 @@ +package components + +import ( + "bytes" + "context" + "encoding/base64" + "io" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/prysmaticlabs/prysm/endtoend/helpers" + e2e "github.com/prysmaticlabs/prysm/endtoend/params" +) + +// TracingSink to capture HTTP requests from opentracing pushes. This is meant +// to capture all opentracing spans from Prysm during an end-to-end test. Spans +// are normally sent to a jaeger (https://www.jaegertracing.io/docs/1.25/getting-started/) +// endpoint, but here we instead replace that with our own http request sink. +// The request sink receives any requests, raw marshals them and base64-encodes them, +// then writes them newline-delimited into a file. +// +// The output file from this component can then be used by tools/replay-http in +// the Prysm repository to replay requests to a jaeger collector endpoint. This +// can then be used to visualize the spans themselves in the jaeger UI. +type TracingSink struct { + started chan struct{} + endpoint string + server *http.Server +} + +// NewTracingSink initializes the tracing sink component. +func NewTracingSink(endpoint string) *TracingSink { + return &TracingSink{ + started: make(chan struct{}, 1), + endpoint: endpoint, + } +} + +// Start the tracing sink. +func (ts *TracingSink) Start(ctx context.Context) error { + go ts.initializeSink() + close(ts.started) + return nil +} + +// Started checks whether a tracing sink is started and ready to be queried. +func (ts *TracingSink) Started() <-chan struct{} { + return ts.started +} + +// Initialize an http handler that writes all requests to a file. +func (ts *TracingSink) initializeSink() { + ts.server = &http.Server{Addr: ts.endpoint} + defer func() { + if err := ts.server.Close(); err != nil { + log.WithError(err).Error("Failed to close http server") + return + } + }() + stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, e2e.TracingRequestSinkFileName) + if err != nil { + log.WithError(err).Error("Failed to create stdout file") + return + } + cleanup := func() { + if err := stdOutFile.Close(); err != nil { + log.WithError(err).Error("Could not close stdout file") + } + } + + http.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) { + if err := captureRequest(stdOutFile, r); err != nil { + log.WithError(err).Error("Failed to capture http request") + return + } + }) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + cleanup() + os.Exit(0) + }() + if err := ts.server.ListenAndServe(); err != http.ErrServerClosed { + log.WithError(err).Error("Failed to serve http") + } +} + +// Captures raw requests in base64 encoded form in a line-delimited file. +func captureRequest(f io.Writer, r *http.Request) error { + buf := bytes.NewBuffer(nil) + err := r.Write(buf) + if err != nil { + return err + } + encoded := make([]byte, base64.StdEncoding.EncodedLen(len(buf.Bytes()))) + base64.StdEncoding.Encode(encoded, buf.Bytes()) + encoded = append(encoded, []byte("\n")...) + _, err = f.Write(encoded) + if err != nil { + return err + } + return nil +} diff --git a/endtoend/endtoend_test.go b/endtoend/endtoend_test.go index 2949dc87a5..c9d3f0b8e4 100644 --- a/endtoend/endtoend_test.go +++ b/endtoend/endtoend_test.go @@ -70,6 +70,11 @@ func (r *testRunner) run() { ctx, done := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) + tracingSink := components.NewTracingSink(config.TracingSinkEndpoint) + g.Go(func() error { + return tracingSink.Start(ctx) + }) + // ETH1 node. eth1Node := components.NewEth1Node() g.Go(func() error { @@ -129,7 +134,7 @@ func (r *testRunner) run() { // Wait for all required nodes to start. requiredComponents := []e2etypes.ComponentRunner{ - eth1Node, bootNode, beaconNodes, validatorNodes, + tracingSink, eth1Node, bootNode, beaconNodes, validatorNodes, } if config.TestSlasher && slasherNodes != nil { requiredComponents = append(requiredComponents, slasherNodes) diff --git a/endtoend/minimal_e2e_test.go b/endtoend/minimal_e2e_test.go index e16121c758..a1520f76a5 100644 --- a/endtoend/minimal_e2e_test.go +++ b/endtoend/minimal_e2e_test.go @@ -34,10 +34,13 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) { epochsToRun, err = strconv.Atoi(epochStr) require.NoError(t, err) } - + const tracingEndpoint = "127.0.0.1:9411" testConfig := &types.E2EConfig{ BeaconFlags: []string{ fmt.Sprintf("--slots-per-archive-point=%d", params.BeaconConfig().SlotsPerEpoch*16), + fmt.Sprintf("--tracing-endpoint=http://%s", tracingEndpoint), + "--enable-tracing", + "--trace-sample-fraction=1.0", }, ValidatorFlags: []string{}, EpochsToRun: uint64(epochsToRun), @@ -46,6 +49,7 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) { TestSlasher: true, UsePrysmShValidator: usePrysmSh, UsePprof: !longRunning, + TracingSinkEndpoint: tracingEndpoint, Evaluators: []types.Evaluator{ ev.PeersConnect, ev.HealthzCheck, diff --git a/endtoend/params/params.go b/endtoend/params/params.go index b22eedcb4a..f16154e38d 100644 --- a/endtoend/params/params.go +++ b/endtoend/params/params.go @@ -36,6 +36,9 @@ var TestParams *params // BootNodeLogFileName is the file name used for the beacon chain node logs. var BootNodeLogFileName = "bootnode.log" +// TracingRequestSinkFileName is the file name for writing raw trace requests. +var TracingRequestSinkFileName = "tracing-http-requests.log.gz" + // BeaconNodeLogFileName is the file name used for the beacon chain node logs. var BeaconNodeLogFileName = "beacon-%d.log" diff --git a/endtoend/types/types.go b/endtoend/types/types.go index a403c3e08e..f5a2f628fd 100644 --- a/endtoend/types/types.go +++ b/endtoend/types/types.go @@ -11,15 +11,16 @@ import ( // E2EConfig defines the struct for all configurations needed for E2E testing. type E2EConfig struct { + TestSync bool + UsePrysmShValidator bool + UsePprof bool + TestDeposits bool + TestSlasher bool + EpochsToRun uint64 + TracingSinkEndpoint string + Evaluators []Evaluator BeaconFlags []string ValidatorFlags []string - EpochsToRun uint64 - TestSync bool - TestSlasher bool - TestDeposits bool - UsePprof bool - UsePrysmShValidator bool - Evaluators []Evaluator } // Evaluator defines the structure of the evaluators used to diff --git a/tools/replay-http/BUILD.bazel b/tools/replay-http/BUILD.bazel new file mode 100644 index 0000000000..1351379223 --- /dev/null +++ b/tools/replay-http/BUILD.bazel @@ -0,0 +1,16 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary") +load("@prysm//tools/go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importpath = "github.com/prysmaticlabs/prysm/tools/replay-http", + visibility = ["//visibility:private"], + deps = ["@com_github_sirupsen_logrus//:go_default_library"], +) + +go_binary( + name = "replay-http", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) diff --git a/tools/replay-http/main.go b/tools/replay-http/main.go new file mode 100644 index 0000000000..c35d2fffd3 --- /dev/null +++ b/tools/replay-http/main.go @@ -0,0 +1,78 @@ +/** +Tool for replaying http requests from a file of base64 encoded, line-delimited +Go http raw requests. Credits to https://gist.github.com/kasey/c9e663eae5baebbf8fbe548c2b1d961b. +*/ +package main + +import ( + "bufio" + "bytes" + "encoding/base64" + "flag" + "io" + "net/http" + "net/url" + "os" + "path" + + log "github.com/sirupsen/logrus" +) + +var ( + filePath = flag.String("file", "", "file of line-delimited, base64-encoded Go http requests") + endpoint = flag.String("endpoint", "http://localhost:14268/api/traces", "host:port endpoint to make HTTP requests to") +) + +func main() { + flag.Parse() + if *filePath == "" { + log.Fatal("Must provide --file") + } + + f, err := os.Open(path.Clean(*filePath)) + if err != nil { + log.Fatal(err) + } + defer func() { + if err := f.Close(); err != nil { + log.WithError(err).Error("Could not close stdout file") + } + }() + lr := bufio.NewReader(f) + for { + line, err := lr.ReadBytes([]byte("\n")[0]) + if err == io.EOF { + os.Exit(0) + } + if err != nil { + log.Fatal(err) + } + line = line[0 : len(line)-1] + decoded := make([]byte, base64.StdEncoding.DecodedLen(len(line))) + _, err = base64.StdEncoding.Decode(decoded, line) + if err != nil { + log.Fatal(err) + } + dbuf := bytes.NewBuffer(decoded) + req, err := http.ReadRequest(bufio.NewReader(dbuf)) + if err != nil { + log.Fatal(err) + } + parsed, err := url.Parse(*endpoint) + if err != nil { + log.Fatal(err) + } + req.URL = parsed + req.RequestURI = "" + log.Println(req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Fatal(err) + } + respBuf := bytes.NewBuffer(nil) + if err := resp.Write(respBuf); err != nil { + log.Fatal(err) + } + log.Println(respBuf.String()) + } +}