Add max batch mutation (#17535)

Co-authored-by: Brainslug <br41nslug@users.noreply.github.com>
Co-authored-by: Pascal Jufer <pascal-jufer@bluewin.ch>
This commit is contained in:
ian
2023-04-08 00:25:25 +08:00
committed by GitHub
parent ec5852b23b
commit fdf0fa2fb8
20 changed files with 2376 additions and 67 deletions

View File

@@ -43,6 +43,9 @@ PUBLIC_URL="/"
# Whether or not to enable GraphQL Introspection [true]
# GRAPHQL_INTROSPECTION=true
# The maximum number of items for batch mutations when creating, updating and deleting. ["Infinity"]
# MAX_BATCH_MUTATION="Infinity"
####################################################################################################
### Database

View File

@@ -26,6 +26,7 @@ const allowedEnvironmentVars = [
'ROOT_REDIRECT',
'SERVE_APP',
'GRAPHQL_INTROSPECTION',
'MAX_BATCH_MUTATION',
'LOGGER_.+',
'ROBOTS_TXT',
// server
@@ -201,6 +202,7 @@ const defaults: Record<string, any> = {
PUBLIC_URL: '/',
MAX_PAYLOAD_SIZE: '1mb',
MAX_RELATIONAL_DEPTH: 10,
MAX_BATCH_MUTATION: Infinity,
ROBOTS_TXT: 'User-agent: *\nDisallow: /',
DB_EXCLUDE_TABLES: 'spatial_ref_sys,sysdiagrams',
@@ -312,6 +314,8 @@ const typeMap: Record<string, string> = {
GRAPHQL_INTROSPECTION: 'boolean',
MAX_BATCH_MUTATION: 'number',
SERVER_SHUTDOWN_TIMEOUT: 'number',
};

View File

@@ -143,6 +143,7 @@ export class CollectionsService {
await fieldItemsService.createMany(fieldPayloads, {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
bypassLimits: true,
});
}

View File

@@ -29,6 +29,11 @@ export type QueryOptions = {
emitEvents?: boolean;
};
export type MutationTracker = {
trackMutations: (count: number) => void;
getCount: () => number;
};
export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractService {
collection: string;
knex: Knex;
@@ -48,6 +53,22 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
return this;
}
createMutationTracker(initialCount = 0): MutationTracker {
const maxCount = Number(env['MAX_BATCH_MUTATION']);
let mutationCount = initialCount;
return {
trackMutations(count: number) {
mutationCount += count;
if (mutationCount > maxCount) {
throw new InvalidPayloadException(`Exceeded max batch mutation limit of ${maxCount}.`);
}
},
getCount() {
return mutationCount;
},
};
}
async getKeysByQuery(query: Query): Promise<PrimaryKey[]> {
const primaryKeyField = this.schema.collections[this.collection]!.primary;
const readQuery = cloneDeep(query);
@@ -68,7 +89,12 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Create a single new item.
*/
async createOne(data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
async createOne(data: Partial<Item>, opts: MutationOptions = {}): Promise<PrimaryKey> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
if (!opts.bypassLimits) {
opts.mutationTracker.trackMutations(1);
}
const { ActivityService } = await import('./activity.js');
const { RevisionsService } = await import('./revisions.js');
@@ -102,7 +128,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
// Run all hooks that are attached to this event so the end user has the chance to augment the
// item that is about to be saved
const payloadAfterHooks =
opts?.emitEvents !== false
opts.emitEvents !== false
? await emitter.emitFilter(
this.eventScope === 'items'
? ['items.create', `${this.collection}.items.create`]
@@ -123,7 +149,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
? authorizationService.validatePayload('create', this.collection, payloadAfterHooks)
: payloadAfterHooks;
if (opts?.preMutationException) {
if (opts.preMutationException) {
throw opts.preMutationException;
}
@@ -222,10 +248,10 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
const childrenRevisions = [...revisionsM2O, ...revisionsA2O, ...revisionsO2M];
if (childrenRevisions.length > 0) {
await revisionsService.updateMany(childrenRevisions, { parent: revision });
await revisionsService.updateMany(childrenRevisions, { parent: revision }, { bypassLimits: true });
}
if (opts?.onRevisionCreate) {
if (opts.onRevisionCreate) {
opts.onRevisionCreate(revision);
}
}
@@ -234,7 +260,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
return primaryKey;
});
if (opts?.emitEvents !== false) {
if (opts.emitEvents !== false) {
const actionEvent = {
event:
this.eventScope === 'items'
@@ -252,14 +278,14 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
},
};
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(actionEvent);
} else {
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}
for (const nestedActionEvent of nestedActionEvents) {
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(nestedActionEvent);
} else {
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
@@ -267,7 +293,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
}
}
if (this.cache && env['CACHE_AUTO_PURGE'] && opts?.autoPurgeCache !== false) {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts.autoPurgeCache !== false) {
await this.cache.clear();
}
@@ -277,7 +303,9 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Create multiple new items at once. Inserts all provided records sequentially wrapped in a transaction.
*/
async createMany(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
async createMany(data: Partial<Item>[], opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
const { primaryKeys, nestedActionEvents } = await this.knex.transaction(async (trx) => {
const service = new ItemsService(this.collection, {
accountability: this.accountability,
@@ -293,6 +321,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
...(opts || {}),
autoPurgeCache: false,
bypassEmitAction: (params) => nestedActionEvents.push(params),
mutationTracker: opts.mutationTracker,
});
primaryKeys.push(primaryKey);
}
@@ -300,9 +329,9 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
return { primaryKeys, nestedActionEvents };
});
if (opts?.emitEvents !== false) {
if (opts.emitEvents !== false) {
for (const nestedActionEvent of nestedActionEvents) {
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(nestedActionEvent);
} else {
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
@@ -310,7 +339,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
}
}
if (this.cache && env['CACHE_AUTO_PURGE'] && opts?.autoPurgeCache !== false) {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts.autoPurgeCache !== false) {
await this.cache.clear();
}
@@ -469,11 +498,13 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Update multiple items in a single transaction
*/
async updateBatch(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
async updateBatch(data: Partial<Item>[], opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!Array.isArray(data)) {
throw new InvalidPayloadException('Input should be an array of items.');
}
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
const primaryKeyField = this.schema.collections[this.collection]!.primary;
const keys: PrimaryKey[] = [];
@@ -493,7 +524,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
}
});
} finally {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts?.autoPurgeCache !== false) {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts.autoPurgeCache !== false) {
await this.cache.clear();
}
}
@@ -504,7 +535,12 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Update many items by primary key, setting all items to the same change
*/
async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey[]> {
async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
if (!opts.bypassLimits) {
opts.mutationTracker.trackMutations(keys.length);
}
const { ActivityService } = await import('./activity.js');
const { RevisionsService } = await import('./revisions.js');
@@ -528,7 +564,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
// Run all hooks that are attached to this event so the end user has the chance to augment the
// item that is about to be saved
const payloadAfterHooks =
opts?.emitEvents !== false
opts.emitEvents !== false
? await emitter.emitFilter(
this.eventScope === 'items'
? ['items.update', `${this.collection}.items.update`]
@@ -557,7 +593,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
? authorizationService.validatePayload('update', this.collection, payloadAfterHooks)
: payloadAfterHooks;
if (opts?.preMutationException) {
if (opts.preMutationException) {
throw opts.preMutationException;
}
@@ -621,7 +657,8 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
user_agent: this.accountability!.userAgent,
origin: this.accountability!.origin,
item: key,
}))
})),
{ bypassLimits: true }
);
if (this.schema.collections[this.collection]!.accountability === 'all') {
@@ -650,12 +687,12 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
)
).filter((revision) => revision.delta);
const revisionIDs = await revisionsService.createMany(revisions);
const revisionIDs = await revisionsService.createMany(revisions, { bypassLimits: true });
for (let i = 0; i < revisionIDs.length; i++) {
const revisionID = revisionIDs[i]!;
if (opts?.onRevisionCreate) {
if (opts.onRevisionCreate) {
opts.onRevisionCreate(revisionID);
}
@@ -665,7 +702,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
// with all other revisions on the current level as regular "flat" updates, and
// nested revisions as children of this first "root" item.
if (childrenRevisions.length > 0) {
await revisionsService.updateMany(childrenRevisions, { parent: revisionID });
await revisionsService.updateMany(childrenRevisions, { parent: revisionID }, { bypassLimits: true });
}
}
}
@@ -673,11 +710,11 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
}
});
if (this.cache && env['CACHE_AUTO_PURGE'] && opts?.autoPurgeCache !== false) {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts.autoPurgeCache !== false) {
await this.cache.clear();
}
if (opts?.emitEvents !== false) {
if (opts.emitEvents !== false) {
const actionEvent = {
event:
this.eventScope === 'items'
@@ -695,14 +732,14 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
},
};
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(actionEvent);
} else {
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}
for (const nestedActionEvent of nestedActionEvents) {
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(nestedActionEvent);
} else {
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
@@ -742,7 +779,9 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Upsert many items
*/
async upsertMany(payloads: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
async upsertMany(payloads: Partial<Item>[], opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
const primaryKeys = await this.knex.transaction(async (trx) => {
const service = new ItemsService(this.collection, {
accountability: this.accountability,
@@ -760,7 +799,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
return primaryKeys;
});
if (this.cache && env['CACHE_AUTO_PURGE'] && opts?.autoPurgeCache !== false) {
if (this.cache && env['CACHE_AUTO_PURGE'] && opts.autoPurgeCache !== false) {
await this.cache.clear();
}
@@ -793,7 +832,12 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
/**
* Delete multiple items by primary key
*/
async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise<PrimaryKey[]> {
async deleteMany(keys: PrimaryKey[], opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
if (!opts.bypassLimits) {
opts.mutationTracker.trackMutations(keys.length);
}
const { ActivityService } = await import('./activity.js');
const primaryKeyField = this.schema.collections[this.collection]!.primary;
@@ -809,11 +853,11 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
await authorizationService.checkAccess('delete', this.collection, keys);
}
if (opts?.preMutationException) {
if (opts.preMutationException) {
throw opts.preMutationException;
}
if (opts?.emitEvents !== false) {
if (opts.emitEvents !== false) {
await emitter.emitFilter(
this.eventScope === 'items' ? ['items.delete', `${this.collection}.items.delete`] : `${this.eventScope}.delete`,
keys,
@@ -846,7 +890,8 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
user_agent: this.accountability!.userAgent,
origin: this.accountability!.origin,
item: key,
}))
})),
{ bypassLimits: true }
);
}
});
@@ -855,7 +900,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
await this.cache.clear();
}
if (opts?.emitEvents !== false) {
if (opts.emitEvents !== false) {
const actionEvent = {
event:
this.eventScope === 'items'
@@ -873,7 +918,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
},
};
if (opts?.bypassEmitAction) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(actionEvent);
} else {
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);

View File

@@ -444,6 +444,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
}
} else {
@@ -452,6 +453,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
}
@@ -523,6 +525,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
}
} else {
@@ -531,6 +534,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
}
@@ -638,6 +642,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
}))
);
@@ -665,6 +670,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
} else {
await itemsService.updateByQuery(
@@ -675,6 +681,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
}
);
}
@@ -723,6 +730,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
}
@@ -741,6 +749,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
}
);
}
@@ -769,6 +778,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
});
} else {
await itemsService.updateByQuery(
@@ -779,6 +789,7 @@ export class PayloadService {
bypassEmitAction: (params) =>
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
emitEvents: opts?.emitEvents,
mutationTracker: opts?.mutationTracker,
}
);
}

View File

@@ -159,14 +159,14 @@ export class RolesService extends ItemsService {
{
filter: { role: { _in: keys } },
},
opts
{ ...opts, bypassLimits: true }
);
await presetsService.deleteByQuery(
{
filter: { role: { _in: keys } },
},
opts
{ ...opts, bypassLimits: true }
);
await usersService.updateByQuery(
@@ -177,7 +177,7 @@ export class RolesService extends ItemsService {
status: 'suspended',
role: null,
},
opts
{ ...opts, bypassLimits: true }
);
await itemsService.deleteMany(keys, opts);

View File

@@ -180,7 +180,9 @@ export class UsersService extends ItemsService {
return key;
}
override async updateBatch(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
override async updateBatch(data: Partial<Item>[], opts: MutationOptions = {}): Promise<PrimaryKey[]> {
if (!opts.mutationTracker) opts.mutationTracker = this.createMutationTracker();
const primaryKeyField = this.schema.collections[this.collection]!.primary;
const keys: PrimaryKey[] = [];

View File

@@ -5,6 +5,7 @@
import type { BaseException } from '@directus/exceptions';
import type { EventContext } from '@directus/types';
import type { MutationTracker } from '../services/items.js';
export type Item = Record<string, any>;
@@ -24,17 +25,17 @@ export type MutationOptions = {
/**
* Callback function that's fired whenever a revision is made in the mutation
*/
onRevisionCreate?: (pk: PrimaryKey) => void;
onRevisionCreate?: ((pk: PrimaryKey) => void) | undefined;
/**
* Flag to disable the auto purging of the cache. Is ignored when CACHE_AUTO_PURGE isn't enabled.
*/
autoPurgeCache?: false;
autoPurgeCache?: false | undefined;
/**
* Flag to disable the auto purging of the system cache.
*/
autoPurgeSystemCache?: false;
autoPurgeSystemCache?: false | undefined;
/**
* Allow disabling the emitting of hooks. Useful if a custom hook is fired (like files.upload)
@@ -45,12 +46,22 @@ export type MutationOptions = {
* To bypass the emitting of action events if emitEvents is enabled
* Can be used to queue up the nested events from item service's create, update and delete
*/
bypassEmitAction?: (params: ActionEventParams) => void;
bypassEmitAction?: ((params: ActionEventParams) => void) | undefined;
/**
* To bypass limits so that functions would work as intended
*/
bypassLimits?: boolean | undefined;
/**
* To keep track of mutation limits
*/
mutationTracker?: MutationTracker | undefined;
/*
* The validation error to throw right before the mutation takes place
*/
preMutationException?: BaseException;
preMutationException?: BaseException | undefined;
};
export type ActionEventParams = {

View File

@@ -38,6 +38,7 @@ export async function applyDiff(
const mutationOptions: MutationOptions = {
autoPurgeSystemCache: false,
bypassEmitAction: (params) => nestedActionEvents.push(params),
bypassLimits: true,
};
await database.transaction(async (trx) => {

View File

@@ -23,6 +23,7 @@ describe('applySnapshot', () => {
const mutationOptions = {
autoPurgeSystemCache: false,
bypassEmitAction: expect.any(Function),
bypassLimits: true,
};
beforeEach(() => {