Compare commits

...

2 Commits

Author SHA1 Message Date
terence tsao
631b3b7af5 Pcli hack 2024-11-25 10:13:31 -08:00
kasey
944f94a9bf recover from panics when writing the event stream (#14545)
* recover from panics when writing the event stream

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit 1086bdf2b3)
2024-10-16 13:38:56 -05:00
3 changed files with 26 additions and 90 deletions

View File

@@ -71,6 +71,7 @@ var (
errSlowReader = errors.New("client failed to read fast enough to keep outgoing buffer below threshold")
errNotRequested = errors.New("event not requested by client")
errUnhandledEventData = errors.New("unable to represent event data in the event stream")
errWriterUnusable = errors.New("http response writer is unusable")
)
// StreamingResponseWriter defines a type that can be used by the eventStreamer.
@@ -309,10 +310,21 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can
}
}
func writeLazyReaderWithRecover(w StreamingResponseWriter, lr lazyReader) (err error) {
defer func() {
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
}
}()
_, err = io.Copy(w, lr())
return err
}
func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error {
needKeepAlive := true
if first != nil {
if _, err := io.Copy(w, first()); err != nil {
if err := writeLazyReaderWithRecover(w, first); err != nil {
return err
}
needKeepAlive = false
@@ -325,13 +337,13 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWri
case <-ctx.Done():
return ctx.Err()
case rf := <-es.outbox:
if _, err := io.Copy(w, rf()); err != nil {
if err := writeLazyReaderWithRecover(w, rf); err != nil {
return err
}
needKeepAlive = false
default:
if needKeepAlive {
if _, err := io.Copy(w, newlineReader()); err != nil {
if err := writeLazyReaderWithRecover(w, newlineReader); err != nil {
return err
}
}

View File

@@ -101,72 +101,15 @@ func FromForkVersion(cv [fieldparams.VersionLength]byte) (*VersionedUnmarshaler,
// UnmarshalBeaconState uses internal knowledge in the VersionedUnmarshaler to pick the right concrete BeaconState type,
// then Unmarshal()s the type and returns an instance of state.BeaconState if successful.
func (cf *VersionedUnmarshaler) UnmarshalBeaconState(marshaled []byte) (s state.BeaconState, err error) {
forkName := version.String(cf.Fork)
switch fork := cf.Fork; fork {
case version.Phase0:
st := &ethpb.BeaconState{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafePhase0(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
case version.Altair:
st := &ethpb.BeaconStateAltair{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafeAltair(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
case version.Bellatrix:
st := &ethpb.BeaconStateBellatrix{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafeBellatrix(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
case version.Capella:
st := &ethpb.BeaconStateCapella{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafeCapella(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
case version.Deneb:
st := &ethpb.BeaconStateDeneb{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafeDeneb(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
case version.Electra:
st := &ethpb.BeaconStateElectra{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
s, err = state_native.InitializeFromProtoUnsafeElectra(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", forkName)
}
default:
return nil, fmt.Errorf("unable to initialize BeaconState for fork version=%s", forkName)
st := &ethpb.BeaconStateElectra{}
err = st.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", "makong")
}
s, err = state_native.InitializeFromProtoUnsafeElectra(st)
if err != nil {
return nil, errors.Wrapf(err, "failed to init state trie from state, detected fork=%s", "makong")
}
return s, nil
}
@@ -199,24 +142,7 @@ func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfac
return nil, err
}
var blk ssz.Unmarshaler
switch cf.Fork {
case version.Phase0:
blk = &ethpb.SignedBeaconBlock{}
case version.Altair:
blk = &ethpb.SignedBeaconBlockAltair{}
case version.Bellatrix:
blk = &ethpb.SignedBeaconBlockBellatrix{}
case version.Capella:
blk = &ethpb.SignedBeaconBlockCapella{}
case version.Deneb:
blk = &ethpb.SignedBeaconBlockDeneb{}
case version.Electra:
blk = &ethpb.SignedBeaconBlockElectra{}
default:
forkName := version.String(cf.Fork)
return nil, fmt.Errorf("unable to initialize ReadOnlyBeaconBlock for fork version=%s at slot=%d", forkName, slot)
}
blk := &ethpb.SignedBeaconBlockElectra{}
err = blk.UnmarshalSSZ(marshaled)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal ReadOnlySignedBeaconBlock in UnmarshalSSZ")

View File

@@ -70,7 +70,7 @@ var prettyCommand = &cli.Command{
case "block":
data = &ethpb.BeaconBlock{}
case "signed_block":
data = &ethpb.SignedBeaconBlock{}
data = &ethpb.SignedBeaconBlockElectra{}
case "blinded_block":
data = &ethpb.BlindedBeaconBlockBellatrix{}
case "attestation":
@@ -333,9 +333,7 @@ func detectState(fPath string) (state.BeaconState, error) {
return nil, err
}
vu, err := detect.FromState(rawFile)
if err != nil {
return nil, errors.Wrap(err, "error detecting state from file")
}
fmt.Println(err)
s, err := vu.UnmarshalBeaconState(rawFile)
if err != nil {
return nil, errors.Wrap(err, "error unmarshalling state")