Use transactions for batch insert

This commit is contained in:
rijkvanzanten
2020-07-17 12:38:07 -04:00
parent efae7f57fb
commit acd8ee1460
3 changed files with 149 additions and 97 deletions

View File

@@ -5,6 +5,7 @@ import sanitizeQuery from '../middleware/sanitize-query';
import collectionExists from '../middleware/collection-exists';
import * as MetaService from '../services/meta';
import { RouteNotFoundException } from '../exceptions';
import { Accountability } from '../types';
const router = express.Router();
@@ -17,34 +18,36 @@ router.post(
throw new RouteNotFoundException(req.path);
}
if (Array.isArray(req.body)) {
const items = await Promise.all(req.body.map(createItem));
const accountability: Accountability = {
user: req.user,
role: req.role,
admin: req.admin,
ip: req.ip,
userAgent: req.get('user-agent'),
};
const isBatch = Array.isArray(req.body);
if (isBatch) {
const body: Record<string, any>[] = req.body;
const primaryKeys = await ItemsService.createItem(req.collection, body, accountability);
const items = await ItemsService.readItem(
req.collection,
primaryKeys,
req.sanitizedQuery,
accountability
);
res.json({ data: items || null });
} else {
const item = await createItem(req.body);
res.json({ data: item || null });
}
async function createItem(body: Record<string, any>) {
const primaryKey = await ItemsService.createItem(req.collection, body, {
user: req.user,
role: req.role,
admin: req.admin,
ip: req.ip,
userAgent: req.get('user-agent'),
});
const body: Record<string, any> = req.body;
const primaryKey = await ItemsService.createItem(req.collection, body, accountability);
const item = await ItemsService.readItem(
req.collection,
primaryKey,
req.sanitizedQuery,
{
role: req.role,
admin: req.admin,
}
accountability
);
return item;
res.json({ data: item || null });
}
})
);
@@ -101,24 +104,22 @@ router.patch(
collectionExists,
sanitizeQuery,
asyncHandler(async (req, res) => {
if (req.single === false) {
throw new RouteNotFoundException(req.path);
if (req.single === true) {
await ItemsService.upsertSingleton(req.collection, req.body, {
role: req.role,
admin: req.admin,
ip: req.ip,
userAgent: req.get('user-agent'),
user: req.user,
});
const item = await ItemsService.readSingleton(req.collection, req.sanitizedQuery, {
role: req.role,
admin: req.admin,
});
return res.json({ data: item || null });
}
await ItemsService.upsertSingleton(req.collection, req.body, {
role: req.role,
admin: req.admin,
ip: req.ip,
userAgent: req.get('user-agent'),
user: req.user,
});
const item = await ItemsService.readSingleton(req.collection, req.sanitizedQuery, {
role: req.role,
admin: req.admin,
});
return res.json({ data: item || null });
})
);
@@ -132,7 +133,6 @@ router.patch(
}
const primaryKey = req.params.pk;
const isBatch = primaryKey.includes(',');
if (isBatch) {

View File

@@ -41,56 +41,80 @@ async function saveActivityAndRevision(
}
}
export const createItem = async (
export async function createItem(
collection: string,
data: Record<string, any>[],
accountability?: Accountability
): Promise<(string | number)[]>;
export async function createItem(
collection: string,
data: Record<string, any>,
accountability?: Accountability
): Promise<string | number> => {
let payload = data;
): Promise<string | number>;
export async function createItem(
collection: string,
data: Record<string, any> | Record<string, any>[],
accountability?: Accountability
): Promise<string | number | (string | number)[]> {
const isBatch = Array.isArray(data);
if (accountability && accountability.admin === false) {
payload = await PermissionsService.processValues(
'create',
collection,
accountability?.role,
data
return database.transaction(async (tx) => {
let payloads = isBatch ? data : [data];
const primaryKeys: (string | number)[] = await Promise.all(
payloads.map(async (payload: Record<string, any>) => {
if (accountability && accountability.admin === false) {
payload = await PermissionsService.processValues(
'create',
collection,
accountability?.role,
payload
);
}
payload = await PayloadService.processValues('create', collection, payload);
payload = await PayloadService.processM2O(collection, payload);
const primaryKeyField = await schemaInspector.primary(collection);
// Only insert the values that actually save to an existing column. This ensures we ignore aliases etc
const columns = await schemaInspector.columns(collection);
const payloadWithoutAlias = pick(
payload,
columns.map(({ column }) => column)
);
const primaryKeys = await tx(collection)
.insert(payloadWithoutAlias)
.returning(primaryKeyField);
// This allows the o2m values to be populated correctly
payload[primaryKeyField] = primaryKeys[0];
await PayloadService.processO2M(collection, payload);
if (accountability) {
// Don't await this. It can run async in the background
saveActivityAndRevision(
ActivityService.Action.CREATE,
collection,
primaryKeys[0],
payloadWithoutAlias,
accountability
).catch((err) => logger.error(err));
}
return primaryKeys[0];
})
);
}
payload = await PayloadService.processValues('create', collection, payload);
payload = await PayloadService.processM2O(collection, payload);
const primaryKeyField = await schemaInspector.primary(collection);
// Only insert the values that actually save to an existing column. This ensures we ignore aliases etc
const columns = await schemaInspector.columns(collection);
const payloadWithoutAlias = pick(
payload,
columns.map(({ column }) => column)
);
const primaryKeys = await database(collection)
.insert(payloadWithoutAlias)
.returning(primaryKeyField);
// This allows the o2m values to be populated correctly
payload[primaryKeyField] = primaryKeys[0];
await PayloadService.processO2M(collection, payload);
if (accountability) {
// Don't await this. It can run async in the background
saveActivityAndRevision(
ActivityService.Action.CREATE,
collection,
primaryKeys[0],
payloadWithoutAlias,
accountability
).catch((err) => logger.error(err));
}
return primaryKeys[0];
};
if (isBatch) {
return primaryKeys;
} else {
return primaryKeys[0];
}
});
}
export const readItems = async <T = Record<string, any>>(
collection: string,
@@ -107,28 +131,55 @@ export const readItems = async <T = Record<string, any>>(
return await PayloadService.processValues('read', collection, records);
};
export const readItem = async <T = any>(
export async function readItem<T = Record<string, any>>(
collection: string,
pk: number | string,
query?: Query,
accountability?: Accountability,
operation?: Operation
): Promise<T>;
export async function readItem<T = Record<string, any>>(
collection: string,
pk: (number | string)[],
query?: Query,
accountability?: Accountability,
operation?: Operation
): Promise<T[]>;
export async function readItem<T = Record<string, any>>(
collection: string,
pk: number | string | (number | string)[],
query: Query = {},
accountability?: Accountability,
operation?: Operation
): Promise<T> => {
): Promise<T | T[]> {
// We allow overriding the operation, so we can use the item read logic to validate permissions
// for update and delete as well
operation = operation || 'read';
const primaryKeyField = await schemaInspector.primary(collection);
const isBatch = Array.isArray(pk);
query = {
...query,
filter: {
...(query.filter || {}),
[primaryKeyField]: {
_eq: pk,
if (isBatch) {
query = {
...query,
filter: {
...(query.filter || {}),
[primaryKeyField]: {
_in: pk,
},
},
},
};
};
} else {
query = {
...query,
filter: {
...(query.filter || {}),
[primaryKeyField]: {
_eq: pk,
},
},
};
}
let ast = await getASTFromQuery(collection, query, accountability, operation);
@@ -137,8 +188,9 @@ export const readItem = async <T = any>(
}
const records = await runAST(ast);
return await PayloadService.processValues('read', collection, records[0]);
};
const processedRecords = await PayloadService.processValues('read', collection, records);
return isBatch ? processedRecords : processedRecords[0];
}
export const updateItem = async (
collection: string,

View File

@@ -145,7 +145,7 @@ export const processM2O = async (collection: string, payload: Record<string, any
// Save all nested m2o records
await Promise.all(
relationsToProcess.map(async (relation) => {
const relatedRecord = payloadClone[relation.field_many];
const relatedRecord: Record<string, any> = payloadClone[relation.field_many];
const hasPrimaryKey = relatedRecord.hasOwnProperty(relation.primary_one);
let relatedPrimaryKey: string | number;
@@ -194,7 +194,7 @@ export const processO2M = async (collection: string, payload: Record<string, any
const relatedRecords = payloadClone[relation.field_one];
await Promise.all(
relatedRecords.map(async (relatedRecord: any, index: number) => {
relatedRecords.map(async (relatedRecord: Record<string, any>, index: number) => {
relatedRecord[relation.field_many] = payloadClone[relation.primary_one];
const hasPrimaryKey = relatedRecord.hasOwnProperty(relation.primary_many);