merge feat/record_transaction branch

This commit is contained in:
maskpp
2023-04-06 15:55:08 +08:00
12 changed files with 185 additions and 66 deletions

View File

@@ -162,6 +162,7 @@ type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -6,12 +6,12 @@ import (
"math/big"
)
// TxType scroll tx type (l1_message_tx, l1_gasOracle_tx, l2_message_tx, l2_gasOracle_tx, l2_rollupCommit_tx, l2_rollupFinalize_tx)
type TxType int
// ScrollTxType scroll tx type (l1_message_tx, l1_gasOracle_tx, l2_message_tx, l2_gasOracle_tx, l2_rollupCommit_tx, l2_rollupFinalize_tx)
type ScrollTxType int
const (
// UndefinedTx undefined scroll tx type
UndefinedTx TxType = iota
UndefinedTx ScrollTxType = iota
// L1toL2MessageTx is sent by l1 relayer but to L2
L1toL2MessageTx
// L1toL2GasOracleTx is sent by l1 relayer but to L2
@@ -20,9 +20,9 @@ const (
L2toL1MessageTx
// L2toL1GasOracleTx is sent by l2 relayer but to L1
L2toL1GasOracleTx
// RollUpCommitTx is sent to L2
// RollUpCommitTx is sent to L1
RollUpCommitTx
// RollupFinalizeTx is sent to L2
// RollupFinalizeTx is sent to L1
RollupFinalizeTx
)

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v3.0.4"
var tag = "v3.0.6"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -2,6 +2,7 @@
"roller_manager_config": {
"compression_level": 9,
"rollers_per_session": 1,
"session_attempts": 2,
"collection_time": 180,
"token_time_to_live": 60,
"verifier": {

View File

@@ -11,7 +11,8 @@ import (
)
const (
defaultNumberOfVerifierWorkers = 10
defaultNumberOfVerifierWorkers = 10
defaultNumberOfSessionRetryAttempts = 2
)
// RollerManagerConfig loads sequencer configuration items.
@@ -21,6 +22,9 @@ type RollerManagerConfig struct {
OrderSession string `json:"order_session,omitempty"`
// The amount of rollers to pick per proof generation session.
RollersPerSession uint8 `json:"rollers_per_session"`
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts,omitempty"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier,omitempty"`
// Proof collection time (in minutes).
@@ -74,6 +78,9 @@ func NewConfig(file string) (*Config, error) {
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
}
if cfg.RollerManagerConfig.SessionAttempts == 0 {
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
}
return cfg, nil
}

View File

@@ -177,7 +177,7 @@ func (m *Manager) Loop() {
}
}
// Select roller and send message
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0]) {
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0], nil) {
tasks = tasks[1:]
}
case <-m.ctx.Done():
@@ -339,20 +339,22 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
//Cleanup roller sessions before return.
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if m.StartProofGenerationSession(nil, sess) {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
@@ -364,6 +366,12 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
@@ -387,6 +395,11 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := mathrand.Intn(len(validRollers))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
}
@@ -440,27 +453,39 @@ func (m *Manager) APIs() []rpc.API {
}
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.Hash
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers() == 0 {
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
log.Warn("no idle roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start proof generation session", "id", task.Hash)
log.Info("start proof generation session", "id", taskId)
defer func() {
if !success {
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
if task != nil {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
}
} else {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// Get block traces.
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
if err != nil {
log.Error(
"could not GetBlockInfos",
"batch_hash", task.Hash,
"batch_hash", taskId,
"error", err,
)
return false
@@ -487,35 +512,39 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.Hash, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
if !roller.sendTask(taskId, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
log.Error("no roller assigned", "id", taskId, "number of idle rollers", m.GetNumberOfIdleRollers())
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.Hash, "err", err)
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: task.Hash,
ID: taskId,
Rollers: rollers,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
@@ -532,7 +561,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
}
m.mu.Lock()
m.sessions[task.Hash] = sess
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)

View File

@@ -87,6 +87,7 @@ func TestApis(t *testing.T) {
t.Run("TestSeveralConnections", testSeveralConnections)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestTimedoutProof", testTimedoutProof)
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
// TODO: Restart roller alone when received task, can add this test case in integration-test.
//t.Run("TestRollerReconnect", testRollerReconnect)
@@ -356,6 +357,86 @@ func testInvalidProof(t *testing.T) {
}
}
func testTimedoutProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create first mock roller, that will not send any proof.
roller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL)
defer func() {
// close connection
roller1.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
var hashes = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range hashes {
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
hashes[i] = batchData.Hash().Hex()
}
assert.NoError(t, dbTx.Commit())
// verify proof status, it should be assigned, because roller didn't send any proof
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskAssigned {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
// create second mock roller, that will send valid proof.
roller2 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL)
roller2.waitTaskAndSendProof(t, time.Second, false, true)
defer func() {
// close connection
roller2.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
// wait manager to finish first CollectProofs
<-time.After(60 * time.Second)
// verify proof status, it should be verified now, because second roller sent valid proof
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskVerified {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testIdleRollerSelection(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
@@ -505,6 +586,7 @@ func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession
CollectionTime: 1,
TokenTimeToLive: 5,
MaxVerifierWorkers: 10,
SessionAttempts: 2,
}, db, nil)
assert.NoError(t, err)
assert.NoError(t, rollerManager.Start())

View File

@@ -1,7 +1,7 @@
-- +goose Up
-- +goose StatementBegin
create table transaction
create table scroll_transaction
(
id VARCHAR NOT NULL,
tx_hash VARCHAR NOT NULL,
@@ -14,15 +14,15 @@ create table transaction
created_time TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP
);
create unique index transaction_id_uindex
on transaction (id);
create unique index scroll_transaction_id_uindex
on scroll_transaction (id);
create unique index transaction_tx_hash_uindex
on transaction (tx_hash);
create unique index scroll_transaction_tx_hash_uindex
on scroll_transaction (tx_hash);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
drop table if exists transaction;
drop table if exists scroll_transaction;
-- +goose StatementEnd

View File

@@ -112,9 +112,9 @@ type L2MessageOrm interface {
GetRelayL2MessageTxHash(nonce uint64) (sql.NullString, error) // for unit tests only
}
// TxOrm transaction operation interfaces.
type TxOrm interface {
SaveTx(id, sender string, txType types.TxType, tx *etypes.Transaction) error
// ScrollTxOrm transaction operation interfaces.
type ScrollTxOrm interface {
SaveTx(id, sender string, txType types.ScrollTxType, tx *etypes.Transaction) error
UpdateTxMsgByID(hash string, txHash string) error
GetTxByID(id string) (*types.ScrollTx, error)
GetL1TxMessages(fields map[string]interface{}, args ...string) (uint64, []*types.ScrollTx, error)

View File

@@ -11,19 +11,19 @@ import (
stypes "scroll-tech/common/types"
)
type txOrm struct {
type scrollTxOrm struct {
db *sqlx.DB
}
var _ TxOrm = (*txOrm)(nil)
var _ ScrollTxOrm = (*scrollTxOrm)(nil)
// NewTxOrm create an TxOrm instance.
func NewTxOrm(db *sqlx.DB) TxOrm {
return &txOrm{db: db}
// NewScrollTxOrm create an ScrollTxOrm instance.
func NewScrollTxOrm(db *sqlx.DB) ScrollTxOrm {
return &scrollTxOrm{db: db}
}
// SaveTx stores tx message into db.
func (t *txOrm) SaveTx(id, sender string, txType stypes.TxType, tx *types.Transaction) error {
func (t *scrollTxOrm) SaveTx(id, sender string, txType stypes.ScrollTxType, tx *types.Transaction) error {
if tx == nil {
return nil
}
@@ -32,7 +32,7 @@ func (t *txOrm) SaveTx(id, sender string, txType stypes.TxType, tx *types.Transa
target = tx.To().String()
}
_, err := t.db.Exec(
t.db.Rebind("INSERT INTO transaction (id, tx_hash, sender, nonce, target, value, data, type) VALUES (?, ?, ?, ?, ?, ?, ?, ?);"),
t.db.Rebind("INSERT INTO scroll_transaction (id, tx_hash, sender, nonce, target, value, data, type) VALUES (?, ?, ?, ?, ?, ?, ?, ?);"),
id,
tx.Hash().String(),
sender,
@@ -46,16 +46,16 @@ func (t *txOrm) SaveTx(id, sender string, txType stypes.TxType, tx *types.Transa
}
// UpdateTxMsgByID remove data content by id.
func (t *txOrm) UpdateTxMsgByID(id string, txHash string) error {
func (t *scrollTxOrm) UpdateTxMsgByID(id string, txHash string) error {
db := t.db
_, err := db.Exec(db.Rebind("UPDATE transaction SET data = '', tx_hash = ? WHERE id = ?;"), txHash, id)
_, err := db.Exec(db.Rebind("UPDATE scroll_transaction SET data = '', tx_hash = ? WHERE id = ?;"), txHash, id)
return err
}
// GetTxByID returns tx message by message id.
func (t *txOrm) GetTxByID(id string) (*stypes.ScrollTx, error) {
func (t *scrollTxOrm) GetTxByID(id string) (*stypes.ScrollTx, error) {
db := t.db
row := db.QueryRowx(db.Rebind("SELECT id, tx_hash, sender, nonce, target, value, data FROM transaction WHERE id = ?"), id)
row := db.QueryRowx(db.Rebind("SELECT id, tx_hash, sender, nonce, target, value, data FROM scroll_transaction WHERE id = ?"), id)
txMsg := &stypes.ScrollTx{}
if err := row.StructScan(txMsg); err != nil {
return nil, err
@@ -63,18 +63,17 @@ func (t *txOrm) GetTxByID(id string) (*stypes.ScrollTx, error) {
return txMsg, nil
}
// GetL1TxMessages gets tx messages by transaction right join l1_message.
// GetL1TxMessages gets tx messages by scroll_transaction right join l1_message.
// sql i.g:
// select l1.msg_hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data
// from transaction as tx
// from scroll_transaction as tx
// right join (select msg_hash
//
// from l1_message
// where 1 = 1 AND status = :status AND queue_index > 0
// ORDER BY queue_index ASC
// LIMIT 10) as l1 on tx.id = l1.msg_hash;
func (t *txOrm) GetL1TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
func (t *scrollTxOrm) GetL1TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
query := "select msg_hash, queue_index from l1_message where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)
@@ -107,7 +106,7 @@ func (t *txOrm) GetL1TxMessages(fields map[string]interface{}, args ...string) (
}
// GetL2TxMessages gets tx messages by transaction right join l2_message.
func (t *txOrm) GetL2TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
func (t *scrollTxOrm) GetL2TxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
query := "select msg_hash from l2_message where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)
@@ -140,7 +139,7 @@ func (t *txOrm) GetL2TxMessages(fields map[string]interface{}, args ...string) (
}
// GetBlockBatchTxMessages gets tx messages by transaction right join block_batch.
func (t *txOrm) GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
func (t *scrollTxOrm) GetBlockBatchTxMessages(fields map[string]interface{}, args ...string) (uint64, []*stypes.ScrollTx, error) {
query := "select hash, index from block_batch where 1 = 1"
for key := range fields {
query = query + fmt.Sprintf(" AND %s = :%s", key, key)

View File

@@ -15,7 +15,7 @@ type OrmFactory interface {
orm.L1MessageOrm
orm.L2MessageOrm
orm.SessionInfoOrm
orm.TxOrm
orm.ScrollTxOrm
GetDB() *sqlx.DB
Beginx() (*sqlx.Tx, error)
Close() error
@@ -28,7 +28,7 @@ type ormFactory struct {
orm.L1MessageOrm
orm.L2MessageOrm
orm.SessionInfoOrm
orm.TxOrm
orm.ScrollTxOrm
*sqlx.DB
}
@@ -53,7 +53,7 @@ func NewOrmFactory(cfg *DBConfig) (OrmFactory, error) {
L2MessageOrm: orm.NewL2MessageOrm(db),
L1BlockOrm: orm.NewL1BlockOrm(db),
SessionInfoOrm: orm.NewSessionInfoOrm(db),
TxOrm: orm.NewTxOrm(db),
ScrollTxOrm: orm.NewScrollTxOrm(db),
DB: db,
}, nil
}

View File

@@ -86,7 +86,7 @@ var (
ormLayer2 orm.L2MessageOrm
ormBatch orm.BlockBatchOrm
ormSession orm.SessionInfoOrm
ormTx orm.TxOrm
ormTx orm.ScrollTxOrm
auth *bind.TransactOpts
)
@@ -94,8 +94,8 @@ var (
func setupEnv(t *testing.T) error {
// Init db config and start db container.
dbConfig = &database.DBConfig{DriverName: "postgres"}
base.RunImages(t)
dbConfig.DSN = base.DBEndpoint()
//base.RunImages(t)
dbConfig.DSN = "postgres://maskpp:123456@localhost:5432/postgres?sslmode=disable" // base.DBEndpoint()
// Create db handler and reset db.
factory, err := database.NewOrmFactory(dbConfig)
@@ -109,7 +109,7 @@ func setupEnv(t *testing.T) error {
ormLayer2 = orm.NewL2MessageOrm(db)
ormBatch = orm.NewBlockBatchOrm(db)
ormSession = orm.NewSessionInfoOrm(db)
ormTx = orm.NewTxOrm(db)
ormTx = orm.NewScrollTxOrm(db)
templateBlockTrace, err := os.ReadFile("../common/testdata/blockTrace_02.json")
if err != nil {