attempt to fix WaitForTextInFile log corruption (#11819)

* attempt to fix WaitForTextInFile log corruption

* restore Seek, just in case!

* lint

* handle file close error in all cases with a log

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2022-12-22 19:08:32 -06:00
committed by GitHub
parent dac555a57c
commit c1563e40bd

View File

@@ -19,7 +19,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/config/params"
eth "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
e2e "github.com/prysmaticlabs/prysm/v3/testing/endtoend/params"
@@ -73,40 +73,71 @@ func DeleteAndCreatePath(fp string) (*os.File, error) {
}
// WaitForTextInFile checks a file every polling interval for the text requested.
func WaitForTextInFile(file *os.File, text string) error {
func WaitForTextInFile(src *os.File, match string) error {
d := time.Now().Add(maxPollingWaitTime)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
// Use a ticker with a deadline to poll a given file.
ticker := time.NewTicker(filePollingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
contents, err := io.ReadAll(file)
if err != nil {
return err
}
return fmt.Errorf("could not find requested text \"%s\" in logs:\n%s", text, contents)
case <-ticker.C:
fileScanner := bufio.NewScanner(file)
buf := make([]byte, 0, fileBufferSize)
fileScanner.Buffer(buf, maxFileBufferSize)
for fileScanner.Scan() {
scanned := fileScanner.Text()
if strings.Contains(scanned, text) {
return nil
}
}
if err := fileScanner.Err(); err != nil {
return err
}
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return err
// Open a new file descriptor pointed at the same path.
f, err := os.Open(src.Name())
if err != nil {
return err
}
defer func() {
if ferr := f.Close(); ferr != nil {
if !errors.Is(err, os.ErrClosed) {
log.WithError(ferr).Errorf("error calling .Close on the file handle for %s", f.Name())
}
}
}()
// spawn a goroutine to scan
errChan := make(chan error)
foundChan := make(chan struct{})
go func() {
t := time.NewTicker(filePollingInterval)
// This needs to happen in a loop because, even though the other process is still appending to the log file,
// scanner will see EOF once it hits the end of what's been written so far.
for {
select {
case <-ctx.Done():
return
case <-t.C:
// This is a paranoid check because I'm not sure if the underlying fd handle can be stuck mid-line
// when Scanner sees a partially written line at EOF. It's probably safest to just keep this.
_, err = f.Seek(0, io.SeekStart)
if err != nil {
errChan <- err
return
}
lineScanner := bufio.NewScanner(f)
// Scan will return true until it hits EOF or another error.
// If .Close is called on the underlying file, Scan will return false, causing this goroutine to exit.
for lineScanner.Scan() {
line := lineScanner.Text()
if strings.Contains(line, match) {
// closing foundChan causes the <-foundChan case in the outer select to execute,
// ending the function with a nil return (success result).
close(foundChan)
return
}
}
// If Scan returned false for an error (except EOF), Err will return it.
if err = lineScanner.Err(); err != nil {
// Bubble the error back up to the parent goroutine.
errChan <- err
}
}
}
}()
select {
case <-ctx.Done():
return fmt.Errorf("could not find requested text \"%s\" in %s before deadline:\n", match, f.Name())
case <-foundChan:
return nil
case err = <-errChan:
return errors.Wrapf(err, "received error while scanning %s for %s", f.Name(), match)
}
}