Prover: the controller skips waiting for the sub-process to yield on a SIGTERM in spot-instance mode (#1049)

* fix(controller): immediately exits when receiving the SIGTERM without waiting for the kill to complete

* feat(tmp-response): also remove the tmp response file in case it exists

* minor(execution): adds a comment explaining why we can safely close
This commit is contained in:
AlexandreBelling
2025-05-27 07:17:05 +02:00
committed by GitHub
parent d1ea5ea7d5
commit 91f3865438
2 changed files with 69 additions and 66 deletions

View File

@@ -50,7 +50,12 @@ func runController(ctx context.Context, cfg *config.Config) {
// SIGTERM is received, there would be no log entry about the signal
// until the proof completes.
<-ctx.Done()
cLog.Infoln("Received cancellation request, will exit as soon as possible or once current proof task is complete.")
if cfg.Controller.SpotInstanceMode {
cLog.Infoln("Received cancellation request. Killing the ongoing process and exiting immediately after.")
} else {
cLog.Infoln("Received cancellation request, will exit as soon as possible or once current proof task is complete.")
}
}()
for {
@@ -194,6 +199,13 @@ func runController(ctx context.Context, cfg *config.Config) {
)
}
// As an edge-case, it's possible (in theory) that the process
// completes exactly when we receive the kill signal. So we
// could end up in a situation where the tmp-response file
// exists. In that case, we simply delete it before exiting to
// keep the FS clean.
os.Remove(job.TmpResponseFile(cfg))
// Failure case
default:
// Move the inprogress to the done directory

View File

@@ -6,7 +6,6 @@ import (
"os"
"os/exec"
"strings"
"sync/atomic"
"syscall"
"time"
@@ -231,55 +230,65 @@ func runCmd(ctx context.Context, cmd string, job *Job, retry bool) Status {
}
}
// The motivation for this goroutine is to early-exit the process via a
// kill if the context is cancelled. KilledByUs is set if the goroutine
// detects that the context has been cancelled. And the done channel
// indicates that the process has finished.
var (
killedByUs = &atomic.Bool{}
done = make(chan struct{})
)
done := make(chan Status)
go func() {
select {
case <-ctx.Done():
_ = curProcess.Kill()
killedByUs.Store(true)
case <-done:
// Since the channel is used for sending only once and only in this
// goroutine, we can safely close it.
defer close(done)
// Lock on the process until it finishes
pstate, err := curProcess.Wait()
if err != nil {
// Here it means, the "os" package could not "wait" the process. It
// can happen for many different reasons essentially pertaining to
// the initialization of the process. It may be that some of theses
// errors are retryables but it remains to see which one. Until then
// we exited with a fatal code and the files will need to be
// manually reprocessed.
logrus.Errorf("unexpected : got an error trying to lock on %v : %v", pname, err)
done <- Status{
ExitCode: CodeFatal,
What: "got an error waiting for the process",
Err: err,
}
return
}
processingTime := time.Since(startTime)
exitCode, err := unixExitCode(pstate)
if err != nil {
// NB: this would be only possible if we did not wait for the process
// to finish trying to read the exit code.
utils.Panic("unexpectedly got an error while trying to read the exit code of the process: %v", err)
}
logrus.Infof(
"The processing of file `%s` (process=%v) took %v seconds to complete and returned exit code %v",
job.OriginalFile, pname, processingTime.Seconds(), exitCode,
)
// Build the response status
status := Status{ExitCode: exitCode}
switch status.ExitCode {
case CodeSuccess:
status.What = "success"
case CodeOom:
status.What = "out of memory error"
case CodeTraceLimit:
status.What = "trace limit overflow"
}
metrics.CollectPostProcess(job.Def.Name, status.ExitCode, processingTime, retry)
done <- status
}()
// Lock on the process until it finishes
pstate, err := curProcess.Wait()
if err != nil {
// Here it means, the "os" package could not "wait" the process. It
// can happen for many different reasons essentially pertaining to
// the initialization of the process. It may be that some of theses
// errors are retryables but it remains to see which one. Until then
// we exited with a fatal code and the files will need to be
// manually reprocessed.
logrus.Errorf("unexpected : got an error trying to lock on %v : %v", pname, err)
return Status{
ExitCode: CodeFatal,
What: "got an error waiting for the process",
Err: err,
}
}
processingTime := time.Since(startTime)
exitCode, err := unixExitCode(pstate)
if err != nil {
// NB: this would be only possible if we did not wait for the process
// to finish trying to read the exit code.
utils.Panic("unexpectedly got an error while trying to read the exit code of the process: %v", err)
}
// Closing the done channel tells the above goroutine to stop waiting for
// a context cancellation.
close(done)
if killedByUs.Load() {
logrus.Infof("The process %v was killed by the controller", pname)
select {
case <-ctx.Done():
logrus.Infof("The process %v is being killed by the controller", pname)
_ = curProcess.Kill()
// Not that we exit without providing post-execution metrics to
// prometheus as these would not be relevant anyway.
@@ -287,27 +296,9 @@ func runCmd(ctx context.Context, cmd string, job *Job, retry bool) Status {
ExitCode: CodeKilledByUs,
What: "the process was killed by the controller",
}
case status := <-done:
return status
}
logrus.Infof(
"The processing of file `%s` (process=%v) took %v seconds to complete and returned exit code %v",
job.OriginalFile, pname, processingTime.Seconds(), exitCode,
)
// Build the response status
status := Status{ExitCode: exitCode}
switch status.ExitCode {
case CodeSuccess:
status.What = "success"
case CodeOom:
status.What = "out of memory error"
case CodeTraceLimit:
status.What = "trace limit overflow"
}
metrics.CollectPostProcess(job.Def.Name, status.ExitCode, processingTime, retry)
return status
}
// Returns a human-readable process name. The process name is formatted as in