mirror of
https://github.com/zkopru-network/zkopru.git
synced 2026-04-24 03:00:03 -04:00
297 lines
7.9 KiB
TypeScript
297 lines
7.9 KiB
TypeScript
import Web3 from 'web3'
|
|
import { logger } from '@zkopru/utils'
|
|
import {
|
|
DB,
|
|
UpsertOptions,
|
|
UpdateOptions,
|
|
DeleteManyOptions,
|
|
TransactionDB,
|
|
} from './types'
|
|
|
|
// process block in memory, write to DB when confirmed by enough blocks
|
|
|
|
const DEFAULT_BLOCK_CONFIRMATIONS = 8
|
|
|
|
enum OperationType {
|
|
UPSERT,
|
|
CREATE,
|
|
UPDATE,
|
|
TRANSACTION,
|
|
DELETE,
|
|
}
|
|
|
|
type TransactionOperation = {
|
|
collection: string
|
|
type: OperationType
|
|
where?: any
|
|
create?: any
|
|
update?: any
|
|
}
|
|
|
|
type PendingOperation = {
|
|
blockNumber: number
|
|
blockHash: string
|
|
collection: string
|
|
type: OperationType
|
|
where?: any
|
|
create?: any
|
|
update?: any
|
|
transactionOperations?: TransactionOperation[]
|
|
onWrite?: () => void
|
|
}
|
|
|
|
export class BlockCache {
|
|
web3: Web3
|
|
|
|
db: DB
|
|
|
|
pendingOperations = [] as PendingOperation[]
|
|
|
|
currentBlockNumber = 0
|
|
|
|
blockHeaderSubscription: any
|
|
|
|
BLOCK_CONFIRMATIONS = +(
|
|
process.env.BLOCK_CONFIRMATIONS ?? DEFAULT_BLOCK_CONFIRMATIONS
|
|
)
|
|
|
|
constructor(web3: Web3, db: DB) {
|
|
this.web3 = web3
|
|
this.db = db
|
|
this.blockHeaderSubscription = this.web3.eth
|
|
.subscribe('newBlockHeaders', err => {
|
|
if (err) {
|
|
logger.info(
|
|
`database/block-cache - Error subscribing to block headers: ${err.toString()}`,
|
|
)
|
|
}
|
|
})
|
|
.on('data', async blockHeader => {
|
|
this.currentBlockNumber = blockHeader.number
|
|
// write stuff if needed
|
|
try {
|
|
await this.writeChangesIfNeeded()
|
|
} catch (err) {
|
|
logger.info(
|
|
`database/block-cache - Error writing block cache changes: ${err.toString()}`,
|
|
)
|
|
}
|
|
})
|
|
}
|
|
|
|
async blockNumber() {
|
|
if (this.currentBlockNumber === 0) {
|
|
// async load it
|
|
this.currentBlockNumber = await this.web3.eth.getBlockNumber()
|
|
}
|
|
return this.currentBlockNumber
|
|
}
|
|
|
|
async clearChangesForBlockHash(hash: string) {
|
|
this.pendingOperations = this.pendingOperations.filter(({ blockHash }) => {
|
|
return blockHash !== hash
|
|
})
|
|
}
|
|
|
|
// Write any data that is old enough to be considered confirmed
|
|
async writeChangesIfNeeded() {
|
|
const docsToRemove = [] as any[]
|
|
for (const op of this.pendingOperations) {
|
|
if (this.currentBlockNumber - op.blockNumber < this.BLOCK_CONFIRMATIONS) {
|
|
// eslint-disable-next-line no-continue
|
|
continue
|
|
}
|
|
// otherwise write
|
|
try {
|
|
logger.info(`Writing ${op.collection}`)
|
|
await this.writeChange(op)
|
|
docsToRemove.push(op)
|
|
} catch (err) {
|
|
console.log(err)
|
|
console.log(`Error writing document`)
|
|
}
|
|
}
|
|
this.pendingOperations = this.pendingOperations.filter(
|
|
doc => docsToRemove.indexOf(doc) === -1,
|
|
)
|
|
}
|
|
|
|
async writeChange(operation: PendingOperation) {
|
|
if (operation.type === OperationType.CREATE) {
|
|
await this.db.create(operation.collection, operation.create)
|
|
} else if (operation.type === OperationType.UPSERT) {
|
|
await this.db.upsert(operation.collection, {
|
|
where: operation.where,
|
|
create: operation.create,
|
|
update: operation.update,
|
|
})
|
|
} else if (operation.type === OperationType.UPDATE) {
|
|
await this.db.update(operation.collection, {
|
|
where: operation.where,
|
|
update: operation.update,
|
|
})
|
|
} else if (operation.type === OperationType.TRANSACTION) {
|
|
await this.db.transaction(db => {
|
|
if (!operation.transactionOperations) return
|
|
for (const op of operation.transactionOperations) {
|
|
if (op.type === OperationType.CREATE) {
|
|
db.create(op.collection, op.create)
|
|
} else if (op.type === OperationType.UPSERT) {
|
|
db.upsert(op.collection, {
|
|
where: op.where,
|
|
create: op.create,
|
|
update: op.update,
|
|
})
|
|
} else if (op.type === OperationType.UPDATE) {
|
|
db.update(op.collection, {
|
|
where: op.where,
|
|
update: op.update,
|
|
})
|
|
} else if (op.type === OperationType.DELETE) {
|
|
db.delete(op.collection, {
|
|
where: op.where,
|
|
})
|
|
} else {
|
|
throw new Error(
|
|
`Unrecognized transaction operation type: "${op.type}"`,
|
|
)
|
|
}
|
|
}
|
|
})
|
|
} else if (operation.type === OperationType.DELETE) {
|
|
await this.db.delete(operation.collection, {
|
|
where: operation.where,
|
|
})
|
|
} else {
|
|
throw new Error(`Unrecognized operation type: "${operation.type}"`)
|
|
}
|
|
if (typeof operation.onWrite === 'function') {
|
|
operation.onWrite()
|
|
}
|
|
}
|
|
|
|
async upsertCache(
|
|
collection: string,
|
|
options: UpsertOptions,
|
|
blockNumber: number,
|
|
blockHash: string,
|
|
) {
|
|
if (typeof blockNumber !== 'number')
|
|
throw new Error('Invalid block number provided to BlockCache.upsertCache')
|
|
const currentBlockNumber = await this.blockNumber()
|
|
const pendingOperation = {
|
|
blockNumber,
|
|
blockHash,
|
|
collection,
|
|
type: OperationType.UPSERT,
|
|
...options,
|
|
}
|
|
if (
|
|
this.BLOCK_CONFIRMATIONS === 0 ||
|
|
currentBlockNumber - blockNumber >= this.BLOCK_CONFIRMATIONS
|
|
) {
|
|
await this.writeChange(pendingOperation)
|
|
} else {
|
|
// store in memory
|
|
this.pendingOperations.push(pendingOperation)
|
|
}
|
|
}
|
|
|
|
async updateCache(
|
|
collection: string,
|
|
options: UpdateOptions,
|
|
blockNumber: number,
|
|
blockHash: string,
|
|
) {
|
|
if (typeof blockNumber !== 'number')
|
|
throw new Error('Invalid block number provided to BlockCache.upsertCache')
|
|
const currentBlockNumber = await this.blockNumber()
|
|
const pendingOperation = {
|
|
blockNumber,
|
|
blockHash,
|
|
collection,
|
|
type: OperationType.UPDATE,
|
|
...options,
|
|
}
|
|
if (
|
|
this.BLOCK_CONFIRMATIONS === 0 ||
|
|
currentBlockNumber - blockNumber >= this.BLOCK_CONFIRMATIONS
|
|
) {
|
|
await this.writeChange(pendingOperation)
|
|
} else {
|
|
// store in memory
|
|
this.pendingOperations.push(pendingOperation)
|
|
}
|
|
}
|
|
|
|
async transactionCache(
|
|
operation: (db: TransactionDB) => void | Promise<void>,
|
|
blockNumber: number,
|
|
blockHash: string,
|
|
onWrite?: () => void,
|
|
) {
|
|
if (typeof blockNumber !== 'number')
|
|
throw new Error('Invalid block number provided to BlockCache.upsertCache')
|
|
const currentBlockNumber = await this.blockNumber()
|
|
// store in memory
|
|
const operations = [] as TransactionOperation[]
|
|
const db = {
|
|
create: (collection: string, doc: any | any[]) => {
|
|
operations.push({
|
|
type: OperationType.CREATE,
|
|
collection,
|
|
create: doc,
|
|
})
|
|
},
|
|
update: (collection: string, options: UpdateOptions) => {
|
|
operations.push({
|
|
type: OperationType.UPDATE,
|
|
collection,
|
|
...options,
|
|
})
|
|
},
|
|
upsert: (collection: string, options: UpsertOptions) => {
|
|
operations.push({
|
|
type: OperationType.UPSERT,
|
|
collection,
|
|
...options,
|
|
})
|
|
},
|
|
delete: (collection: string, options: DeleteManyOptions) => {
|
|
operations.push({
|
|
type: OperationType.DELETE,
|
|
collection,
|
|
...options,
|
|
})
|
|
},
|
|
onCommit: () => {
|
|
throw new Error('Not supported')
|
|
},
|
|
onError: () => {
|
|
throw new Error('Not supported')
|
|
},
|
|
onComplete: () => {
|
|
throw new Error('Not supported')
|
|
},
|
|
}
|
|
await Promise.resolve(operation(db))
|
|
const pendingOperation = {
|
|
blockNumber,
|
|
blockHash,
|
|
collection: '',
|
|
type: OperationType.TRANSACTION,
|
|
transactionOperations: operations,
|
|
onWrite,
|
|
}
|
|
if (
|
|
this.BLOCK_CONFIRMATIONS === 0 ||
|
|
currentBlockNumber - blockNumber >= this.BLOCK_CONFIRMATIONS
|
|
) {
|
|
await this.writeChange(pendingOperation)
|
|
if (typeof onWrite === 'function') onWrite()
|
|
} else {
|
|
this.pendingOperations.push(pendingOperation)
|
|
}
|
|
}
|
|
}
|