feat: update

This commit is contained in:
georgehao
2023-06-30 15:42:22 +08:00
parent 467dfbe37c
commit 05e287fd07
3 changed files with 11 additions and 8 deletions

View File

@@ -500,6 +500,12 @@ func (m *Manager) CollectProofs(sess *session) {
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
case ret := <-sess.finishChan:
m.mu.Lock()
for idx := range sess.sessionInfos {
if sess.sessionInfos[idx].RollerPublicKey == ret.pk {
sess.sessionInfos[idx].ProvingStatus = int(ret.status)
}
}
if sess.isSessionFailed() {
if ret.typ == message.BasicProve {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
@@ -514,11 +520,8 @@ func (m *Manager) CollectProofs(sess *session) {
coordinatorSessionsFailedTotalCounter.Inc(1)
}
for _, sessionInfo := range sess.sessionInfos {
sessionInfo.ProvingStatus = int(ret.status)
if err := m.orm.SetSessionInfo(sessionInfo); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
if err := m.orm.UpdateSessionInfoProvingStatus(m.ctx, ret.id, ret.pk, ret.status); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
//Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key.

View File

@@ -44,7 +44,7 @@ type BlockTraceOrm interface {
type SessionInfoOrm interface {
GetSessionInfosByHashes(hashes []string) ([]*types.SessionInfo, error)
SetSessionInfo(rollersInfo *types.SessionInfo) error
UpdateSessionInfoProvingStatus(ctx context.Context, dbTx *sqlx.Tx, hash string, pk string, status types.ProvingStatus) error
UpdateSessionInfoProvingStatus(ctx context.Context, taskID string, pk string, status types.RollerProveStatus) error
}
// AggTaskOrm is aggregator task

View File

@@ -55,8 +55,8 @@ func (o *sessionInfoOrm) SetSessionInfo(rollersInfo *types.SessionInfo) error {
}
// UpdateSessionInfoProvingStatus update the session info proving status
func (o *sessionInfoOrm) UpdateSessionInfoProvingStatus(ctx context.Context, dbTx *sqlx.Tx, hash string, pk string, status types.ProvingStatus) error {
if _, err := dbTx.ExecContext(ctx, o.db.Rebind("update session_info set proving_status = ? where task_id = ? and roller_public_key = ?;"), int(status), hash, pk); err != nil {
func (o *sessionInfoOrm) UpdateSessionInfoProvingStatus(ctx context.Context, taskID string, pk string, status types.RollerProveStatus) error {
if _, err := o.db.ExecContext(ctx, o.db.Rebind("update session_info set proving_status = ? where task_id = ? and roller_public_key = ?;"), int(status), taskID, pk); err != nil {
return err
}
return nil