feat: database transactions

fix upserts, allow constraintKey specification
This commit is contained in:
Chance Hudson
2021-03-10 23:40:21 -06:00
parent 986a9148b2
commit be8b176c03
7 changed files with 166 additions and 108 deletions

View File

@@ -260,55 +260,49 @@ export class BlockProcessor extends EventEmitter {
}
})
// TODO use a mutex lock here
for (const input of inputs) {
await this.db.upsert('Utxo', {
where: { hash: input.hash },
create: input,
update: input,
})
}
// await this.db.write(prisma =>
// prisma.$transaction(
// inputs.map(input =>
// prisma.utxo.upsert({
// where: { hash: input.hash },
// create: input,
// update: input,
// }),
// ),
// ),
// )
// for (const input of inputs) {
// await this.db.upsert('Utxo', {
// where: { hash: input.hash },
// create: input,
// update: input,
// })
// }
await this.db.transaction(db => {
inputs.map(input =>
db.upsert('Utxo', {
where: { hash: input.hash },
create: input,
update: input,
}),
)
})
}
private async saveTransactions(block: Block, challenged = false) {
for (const tx of block.body.txs) {
await this.db.create('Tx', {
hash: tx.hash().toString(),
blockHash: block.hash.toString(),
inflowCount: tx.inflow.length,
outflowCount: tx.outflow.length,
fee: tx.fee.toHex(),
challenged,
slashed: false,
})
}
// await this.db.write(prisma =>
// prisma.$transaction(
// block.body.txs.map(tx =>
// prisma.tx.create({
// data: {
// hash: tx.hash().toString(),
// blockHash: block.hash.toString(),
// inflowCount: tx.inflow.length,
// outflowCount: tx.outflow.length,
// fee: tx.fee.toHex(),
// challenged,
// slashed: false,
// },
// }),
// ),
// ),
// )
// for (const tx of block.body.txs) {
// await this.db.create('Tx', {
// hash: tx.hash().toString(),
// blockHash: block.hash.toString(),
// inflowCount: tx.inflow.length,
// outflowCount: tx.outflow.length,
// fee: tx.fee.toHex(),
// challenged,
// slashed: false,
// })
// }
await this.db.transaction(db => {
block.body.txs.map(tx =>
db.create('Tx', {
hash: tx.hash().toString(),
blockHash: block.hash.toString(),
inflowCount: tx.inflow.length,
outflowCount: tx.outflow.length,
fee: tx.fee.toHex(),
challenged,
slashed: false,
}),
)
})
}
private async saveMyWithdrawals(txs: ZkTx[], accounts: Address[]) {
@@ -501,28 +495,26 @@ export class BlockProcessor extends EventEmitter {
nullifier: nullifier.toString(),
})
}
for (const utxo of utxosToUpdate) {
await this.db.update('Utxo', {
where: { hash: utxo.hash },
update: {
index: utxo.index,
nullifier: utxo.nullifier,
},
})
}
// await this.db.write(prisma =>
// prisma.$transaction(
// utxosToUpdate.map(utxo =>
// prisma.utxo.update({
// where: { hash: utxo.hash },
// data: {
// index: utxo.index,
// nullifier: utxo.nullifier,
// },
// }),
// ),
// ),
// )
// for (const utxo of utxosToUpdate) {
// await this.db.update('Utxo', {
// where: { hash: utxo.hash },
// update: {
// index: utxo.index,
// nullifier: utxo.nullifier,
// },
// })
// }
await this.db.transaction(db => {
utxosToUpdate.map(utxo =>
db.update('Utxo', {
where: { hash: utxo.hash },
update: {
index: utxo.index,
nullifier: utxo.nullifier,
},
}),
)
})
}
private async updateMyWithdrawals(accounts: Address[], patch: Patch) {
@@ -563,29 +555,27 @@ export class BlockProcessor extends EventEmitter {
),
})
}
for (const withdrawal of withdrawalsToUpdate) {
await this.db.update('Withdrawal', {
where: { hash: withdrawal.hash },
update: {
index: withdrawal.index,
includedIn: withdrawal.includedIn,
siblings: withdrawal.siblings,
},
})
}
// await this.db.write(prisma =>
// prisma.$transaction(
// withdrawalsToUpdate.map(withdrawal =>
// prisma.withdrawal.update({
// where: { hash: withdrawal.hash },
// data: {
// index: withdrawal.index,
// includedIn: withdrawal.includedIn,
// siblings: withdrawal.siblings,
// },
// }),
// ),
// ),
// )
// for (const withdrawal of withdrawalsToUpdate) {
// await this.db.update('Withdrawal', {
// where: { hash: withdrawal.hash },
// update: {
// index: withdrawal.index,
// includedIn: withdrawal.includedIn,
// siblings: withdrawal.siblings,
// },
// })
// }
await this.db.transaction(db => {
withdrawalsToUpdate.map(withdrawal =>
db.update('Withdrawal', {
where: { hash: withdrawal.hash },
update: {
index: withdrawal.index,
includedIn: withdrawal.includedIn,
siblings: withdrawal.siblings,
},
}),
)
})
}
}

View File

@@ -13,6 +13,7 @@ import {
DeleteManyOptions,
TableData,
Relation,
TransactionDB,
} from '../types'
import {
tableCreationSql,
@@ -251,6 +252,58 @@ export class PostgresConnector implements DB {
await this.db.query(createTablesCommand)
}
async transaction(operation: (db: TransactionDB) => void) {
return this.lock.acquire('db', async () => this._transaction(operation))
}
private async _transaction(operation: (db: TransactionDB) => void) {
if (typeof operation !== 'function') throw new Error('Invalid operation')
const sqlOperations = [] as string[]
const transactionDB = {
create: (collection: string, _doc: any) => {
const table = this.schema[collection]
if (!table)
throw new Error(`Unable to find table ${collection} in schema`)
const docs = [_doc].flat()
const { sql } = createSql(table, docs)
sqlOperations.push(sql)
},
update: (collection: string, options: UpdateOptions) => {
const table = this.schema[collection]
if (!table)
throw new Error(`Unable to find table ${collection} in schema`)
sqlOperations.push(updateSql(table, options))
},
deleteOne: (collection: string, options: FindOneOptions) => {
const table = this.schema[collection]
if (!table) throw new Error(`Unable to find table "${collection}"`)
const sql = deleteManySql(table, {
...options,
limit: 1,
})
sqlOperations.push(sql)
},
deleteMany: (collection: string, options: DeleteManyOptions) => {
const table = this.schema[collection]
if (!table) throw new Error(`Unable to find table "${collection}"`)
const sql = deleteManySql(table, options)
sqlOperations.push(sql)
},
upsert: (collection: string, options: UpsertOptions) => {
const table = this.schema[collection]
if (!table) throw new Error(`Unable to find table "${collection}"`)
const sql = upsertSql(table, options)
sqlOperations.push(sql)
},
}
await Promise.resolve(operation(transactionDB))
// now apply the transaction
const transactionSql = `BEGIN TRANSACTION;
${sqlOperations.join('\n')}
COMMIT;`
await this.db.query(transactionSql)
}
async close() {
await this.db.end()
}

View File

@@ -224,8 +224,13 @@ export class SQLiteConnector implements DB {
const table = this.schema[collection]
if (!table) throw new Error(`Unable to find table ${collection} in schema`)
const sql = upsertSql(table, options)
const { changes } = await this.db.run(sql)
return changes
try {
const { changes } = await this.db.run(sql)
return changes
} catch (err) {
console.log(sql)
throw err
}
}
async deleteOne(collection: string, options: FindOneOptions) {
@@ -249,8 +254,12 @@ export class SQLiteConnector implements DB {
return changes || 0
}
// Allow only updates, upserts, deletes, and creates
async transaction(operation: (db: TransactionDB) => void) {
return this.lock.acquire('db', async () => this._transaction(operation))
}
// Allow only updates, upserts, deletes, and creates
private async _transaction(operation: (db: TransactionDB) => void) {
if (typeof operation !== 'function') throw new Error('Invalid operation')
const sqlOperations = [] as string[]
const transactionDB = {
@@ -290,19 +299,12 @@ export class SQLiteConnector implements DB {
sqlOperations.push(sql)
},
}
try {
await Promise.resolve(operation(transactionDB))
} catch (err) {
console.log('Error in transaction operation')
console.log(err)
}
await Promise.resolve(operation(transactionDB))
// now apply the transaction
const transactionSql = `BEGIN TRANSACTION;
${sqlOperations.join('\n')}
COMMIT;`
await this.lock.acquire('db', async () => {
await this.db.run(transactionSql)
})
await this.db.exec(transactionSql)
}
async close() {

View File

@@ -270,12 +270,20 @@ export function upsertSql(table: SchemaTable, options: UpsertOptions): string {
const { sql } = createSql(table, options.create)
// remove the semicolon in the creation sql command
const creationSql = sql.replace(';', '')
const uniqueFields = [table.primaryKey].flat() as string[]
const uniqueFields = [] as string[]
for (const rawRow of table.rows) {
const row = normalizeRowDef(rawRow)
if ([table.primaryKey].flat().indexOf(row.name) === -1 && row.unique)
if ([table.primaryKey].flat().indexOf(row.name) === -1 && !row.unique)
// eslint-disable-next-line no-continue
continue
// otherwise check if the key is present in the where clause
if (typeof options.where[row.name] !== 'undefined')
uniqueFields.push(row.name)
}
const conflictConstraint = [options.constraintKey || uniqueFields]
.flat()
.map(name => `"${name}"`)
.join(',')
const updateSqlCommand = Object.keys(options.update)
.map(key => {
const rowDef = table.rows[key]
@@ -289,6 +297,6 @@ export function upsertSql(table: SchemaTable, options: UpsertOptions): string {
? 'DO NOTHING;'
: `DO UPDATE SET ${updateSqlCommand};`
return `${creationSql}
ON CONFLICT(${uniqueFields.map(name => `"${name}"`).join(',')})
ON CONFLICT(${conflictConstraint})
${conflictClause}`
}

View File

@@ -38,6 +38,7 @@ export type UpsertOptions = {
where: WhereClause
update: any
create: any
constraintKey?: string
}
export type DataType = 'Int' | 'Bool' | 'String' | 'Object'
@@ -92,7 +93,7 @@ export interface DB {
collection: string,
options: DeleteManyOptions,
) => Promise<number>
transaction?: (operation: (db: TransactionDB) => void) => Promise<void>
transaction: (operation: (db: TransactionDB) => void) => Promise<void>
// close the db and cleanup
close: () => Promise<void>
}

View File

@@ -414,6 +414,7 @@ export class Grove {
where: { species: TreeSpecies.UTXO },
update: { ...data },
create: { species: TreeSpecies.UTXO, ...data },
constraintKey: 'species',
})
const treeSql = await this.db.findOne('LightTree', {
where: {
@@ -460,6 +461,7 @@ export class Grove {
where: { species: TreeSpecies.WITHDRAWAL },
update: { ...data },
create: { species: TreeSpecies.WITHDRAWAL, ...data },
constraintKey: 'species',
})
const treeSql = await this.db.findOne('LightTree', {
where: {

View File

@@ -425,6 +425,7 @@ export abstract class LightRollUpTree<T extends Fp | BN> {
...rollUpSnapshot,
species: this.species,
},
constraintKey: 'species',
})
// update cached nodes
for (const nodeIndex of Object.keys(cached)) {
@@ -507,6 +508,7 @@ export abstract class LightRollUpTree<T extends Fp | BN> {
create: {
...tree,
},
constraintKey: 'species',
})
const newTree = await db.findOne('LightTree', {
where: {