From 73fa8d66acb1e05f015adbb1598229afae69ca08 Mon Sep 17 00:00:00 2001 From: ian Date: Fri, 19 Aug 2022 04:07:33 +0800 Subject: [PATCH] Emit nested action events after the transaction completes (#14981) * Emit nested action events after the transaction completes * Propagate mutation options for the disabling of nested emits * Disable emit in test * Shift emitting --- api/src/services/items.test.ts | 10 ++- api/src/services/items.ts | 138 +++++++++++++++++++++++++-------- api/src/services/payload.ts | 58 +++++++++++--- api/src/types/items.ts | 14 ++++ 4 files changed, 174 insertions(+), 46 deletions(-) diff --git a/api/src/services/items.test.ts b/api/src/services/items.test.ts index f2eb3ff95c..829f32cb7b 100644 --- a/api/src/services/items.test.ts +++ b/api/src/services/items.test.ts @@ -938,9 +938,13 @@ describe('Integration Tests', () => { }, schema: schemas[schema].schema, }); - const response = await itemsService.updateOne(item.id, { - items: [], - }); + const response = await itemsService.updateOne( + item.id, + { + items: [], + }, + { emitEvents: false } + ); expect(tracker.history.select.length).toBe(4); expect(tracker.history.select[0].bindings).toStrictEqual([item.id, 1]); diff --git a/api/src/services/items.ts b/api/src/services/items.ts index 49e8a44cc2..eefaaf43d2 100644 --- a/api/src/services/items.ts +++ b/api/src/services/items.ts @@ -9,7 +9,14 @@ import emitter from '../emitter'; import env from '../env'; import { ForbiddenException, InvalidPayloadException } from '../exceptions'; import { translateDatabaseError } from '../exceptions/database/translate'; -import { AbstractService, AbstractServiceOptions, Item as AnyItem, MutationOptions, PrimaryKey } from '../types'; +import { + AbstractService, + AbstractServiceOptions, + ActionEventParams, + Item as AnyItem, + MutationOptions, + PrimaryKey, +} from '../types'; import getASTFromQuery from '../utils/get-ast-from-query'; import { validateKeys } from '../utils/validate-keys'; import { AuthorizationService } from './authorization'; @@ -69,6 +76,7 @@ export class ItemsService implements AbstractSer .map((field) => field.field); const payload: AnyItem = cloneDeep(data); + const nestedActionEvents: ActionEventParams[] = []; // By wrapping the logic in a transaction, we make sure we automatically roll back all the // changes in the DB if any of the parts contained within throws an error. This also means @@ -112,8 +120,16 @@ export class ItemsService implements AbstractSer ? await authorizationService.validatePayload('create', this.collection, payloadAfterHooks) : payloadAfterHooks; - const { payload: payloadWithM2O, revisions: revisionsM2O } = await payloadService.processM2O(payloadWithPresets); - const { payload: payloadWithA2O, revisions: revisionsA2O } = await payloadService.processA2O(payloadWithM2O); + const { + payload: payloadWithM2O, + revisions: revisionsM2O, + nestedActionEvents: nestedActionEventsM2O, + } = await payloadService.processM2O(payloadWithPresets, opts); + const { + payload: payloadWithA2O, + revisions: revisionsA2O, + nestedActionEvents: nestedActionEventsA2O, + } = await payloadService.processA2O(payloadWithM2O, opts); const payloadWithoutAliases = pick(payloadWithA2O, without(fields, ...aliases)); const payloadWithTypeCasting = await payloadService.processValues('create', payloadWithoutAliases); @@ -146,7 +162,15 @@ export class ItemsService implements AbstractSer payload[primaryKeyField] = primaryKey; } - const { revisions: revisionsO2M } = await payloadService.processO2M(payload, primaryKey); + const { revisions: revisionsO2M, nestedActionEvents: nestedActionEventsO2M } = await payloadService.processO2M( + payload, + primaryKey, + opts + ); + + nestedActionEvents.push(...nestedActionEventsM2O); + nestedActionEvents.push(...nestedActionEventsA2O); + nestedActionEvents.push(...nestedActionEventsO2M); // If this is an authenticated action, and accountability tracking is enabled, save activity row if (this.accountability && this.schema.collections[this.collection].accountability !== null) { @@ -196,21 +220,32 @@ export class ItemsService implements AbstractSer }); if (opts?.emitEvents !== false) { - emitter.emitAction( - this.eventScope === 'items' ? ['items.create', `${this.collection}.items.create`] : `${this.eventScope}.create`, - { + const actionEvent = { + event: + this.eventScope === 'items' + ? ['items.create', `${this.collection}.items.create`] + : `${this.eventScope}.create`, + meta: { payload, key: primaryKey, collection: this.collection, }, - { - // This hook is called async. If we would pass the transaction here, the hook can be - // called after the transaction is done #5460 - database: this.knex || getDatabase(), + context: { + database: getDatabase(), schema: this.schema, accountability: this.accountability, - } - ); + }, + }; + + if (!opts?.bypassEmitAction) { + emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context); + } else { + opts.bypassEmitAction(actionEvent); + } + + for (const nestedActionEvent of nestedActionEvents) { + emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context); + } } if (this.cache && env.CACHE_AUTO_PURGE && opts?.autoPurgeCache !== false) { @@ -416,6 +451,7 @@ export class ItemsService implements AbstractSer .map((field) => field.field); const payload: Partial = cloneDeep(data); + const nestedActionEvents: ActionEventParams[] = []; const authorizationService = new AuthorizationService({ accountability: this.accountability, @@ -462,8 +498,16 @@ export class ItemsService implements AbstractSer schema: this.schema, }); - const { payload: payloadWithM2O, revisions: revisionsM2O } = await payloadService.processM2O(payloadWithPresets); - const { payload: payloadWithA2O, revisions: revisionsA2O } = await payloadService.processA2O(payloadWithM2O); + const { + payload: payloadWithM2O, + revisions: revisionsM2O, + nestedActionEvents: nestedActionEventsM2O, + } = await payloadService.processM2O(payloadWithPresets, opts); + const { + payload: payloadWithA2O, + revisions: revisionsA2O, + nestedActionEvents: nestedActionEventsA2O, + } = await payloadService.processA2O(payloadWithM2O, opts); const payloadWithoutAliasAndPK = pick(payloadWithA2O, without(fields, primaryKeyField, ...aliases)); const payloadWithTypeCasting = await payloadService.processValues('update', payloadWithoutAliasAndPK); @@ -478,9 +522,17 @@ export class ItemsService implements AbstractSer const childrenRevisions = [...revisionsM2O, ...revisionsA2O]; + nestedActionEvents.push(...nestedActionEventsM2O); + nestedActionEvents.push(...nestedActionEventsA2O); + for (const key of keys) { - const { revisions } = await payloadService.processO2M(payload, key); + const { revisions, nestedActionEvents: nestedActionEventsO2M } = await payloadService.processO2M( + payload, + key, + opts + ); childrenRevisions.push(...revisions); + nestedActionEvents.push(...nestedActionEventsO2M); } // If this is an authenticated action, and accountability tracking is enabled, save activity row @@ -555,21 +607,32 @@ export class ItemsService implements AbstractSer } if (opts?.emitEvents !== false) { - emitter.emitAction( - this.eventScope === 'items' ? ['items.update', `${this.collection}.items.update`] : `${this.eventScope}.update`, - { + const actionEvent = { + event: + this.eventScope === 'items' + ? ['items.update', `${this.collection}.items.update`] + : `${this.eventScope}.update`, + meta: { payload, keys, collection: this.collection, }, - { - // This hook is called async. If we would pass the transaction here, the hook can be - // called after the transaction is done #5460 - database: this.knex || getDatabase(), + context: { + database: getDatabase(), schema: this.schema, accountability: this.accountability, - } - ); + }, + }; + + if (!opts?.bypassEmitAction) { + emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context); + } else { + opts.bypassEmitAction(actionEvent); + } + + for (const nestedActionEvent of nestedActionEvents) { + emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context); + } } return keys; @@ -711,21 +774,28 @@ export class ItemsService implements AbstractSer } if (opts?.emitEvents !== false) { - emitter.emitAction( - this.eventScope === 'items' ? ['items.delete', `${this.collection}.items.delete`] : `${this.eventScope}.delete`, - { + const actionEvent = { + event: + this.eventScope === 'items' + ? ['items.delete', `${this.collection}.items.delete`] + : `${this.eventScope}.delete`, + meta: { payload: keys, keys: keys, collection: this.collection, }, - { - // This hook is called async. If we would pass the transaction here, the hook can be - // called after the transaction is done #5460 - database: this.knex || getDatabase(), + context: { + database: getDatabase(), schema: this.schema, accountability: this.accountability, - } - ); + }, + }; + + if (!opts?.bypassEmitAction) { + emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context); + } else { + opts.bypassEmitAction(actionEvent); + } } return keys; diff --git a/api/src/services/payload.ts b/api/src/services/payload.ts index 2b5f628512..c1e2b65666 100644 --- a/api/src/services/payload.ts +++ b/api/src/services/payload.ts @@ -10,7 +10,7 @@ import { parse as wktToGeoJSON } from 'wellknown'; import getDatabase from '../database'; import { getHelpers, Helpers } from '../database/helpers'; import { ForbiddenException, InvalidPayloadException } from '../exceptions'; -import { AbstractServiceOptions, Alterations, Item, PrimaryKey } from '../types'; +import { AbstractServiceOptions, ActionEventParams, Alterations, Item, MutationOptions, PrimaryKey } from '../types'; import { generateHash } from '../utils/generate-hash'; import { ItemsService } from './items'; @@ -364,13 +364,18 @@ export class PayloadService { /** * Recursively save/update all nested related Any-to-One items */ - async processA2O(data: Partial): Promise<{ payload: Partial; revisions: PrimaryKey[] }> { + async processA2O( + data: Partial, + opts?: MutationOptions + ): Promise<{ payload: Partial; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> { const relations = this.schema.relations.filter((relation) => { return relation.collection === this.collection; }); const revisions: PrimaryKey[] = []; + const nestedActionEvents: ActionEventParams[] = []; + const payload = cloneDeep(data); // Only process related records that are actually in the payload @@ -427,11 +432,15 @@ export class PayloadService { if (Object.keys(fieldsToUpdate).length > 0) { await itemsService.updateOne(relatedPrimaryKey, relatedRecord, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, }); } } else { relatedPrimaryKey = await itemsService.createOne(relatedRecord, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, }); } @@ -439,18 +448,23 @@ export class PayloadService { payload[relation.field] = relatedPrimaryKey; } - return { payload, revisions }; + return { payload, revisions, nestedActionEvents }; } /** * Save/update all nested related m2o items inside the payload */ - async processM2O(data: Partial): Promise<{ payload: Partial; revisions: PrimaryKey[] }> { + async processM2O( + data: Partial, + opts?: MutationOptions + ): Promise<{ payload: Partial; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> { const payload = cloneDeep(data); // All the revisions saved on this level const revisions: PrimaryKey[] = []; + const nestedActionEvents: ActionEventParams[] = []; + // Many to one relations that exist on the current collection const relations = this.schema.relations.filter((relation) => { return relation.collection === this.collection; @@ -495,11 +509,15 @@ export class PayloadService { if (Object.keys(fieldsToUpdate).length > 0) { await itemsService.updateOne(relatedPrimaryKey, relatedRecord, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, }); } } else { relatedPrimaryKey = await itemsService.createOne(relatedRecord, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, }); } @@ -507,15 +525,21 @@ export class PayloadService { payload[relation.field] = relatedPrimaryKey; } - return { payload, revisions }; + return { payload, revisions, nestedActionEvents }; } /** * Recursively save/update all nested related o2m items */ - async processO2M(data: Partial, parent: PrimaryKey): Promise<{ revisions: PrimaryKey[] }> { + async processO2M( + data: Partial, + parent: PrimaryKey, + opts?: MutationOptions + ): Promise<{ revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> { const revisions: PrimaryKey[] = []; + const nestedActionEvents: ActionEventParams[] = []; + const relations = this.schema.relations.filter((relation) => { return relation.related_collection === this.collection; }); @@ -598,6 +622,8 @@ export class PayloadService { savedPrimaryKeys.push( ...(await itemsService.upsertMany(recordsToUpsert, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, })) ); @@ -621,13 +647,18 @@ export class PayloadService { // Nullify all related items that aren't included in the current payload if (relation.meta.one_deselect_action === 'delete') { // There's no revision for a deletion - await itemsService.deleteByQuery(query); + await itemsService.deleteByQuery(query, { + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, + }); } else { await itemsService.updateByQuery( query, { [relation.field]: null }, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, } ); } @@ -646,6 +677,8 @@ export class PayloadService { })), { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, } ); } @@ -662,6 +695,8 @@ export class PayloadService { }, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, } ); } @@ -686,13 +721,18 @@ export class PayloadService { }; if (relation.meta.one_deselect_action === 'delete') { - await itemsService.deleteByQuery(query); + await itemsService.deleteByQuery(query, { + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, + }); } else { await itemsService.updateByQuery( query, { [relation.field]: null }, { onRevisionCreate: (pk) => revisions.push(pk), + bypassEmitAction: (params) => nestedActionEvents.push(params), + emitEvents: opts?.emitEvents, } ); } @@ -700,7 +740,7 @@ export class PayloadService { } } - return { revisions }; + return { revisions, nestedActionEvents }; } /** diff --git a/api/src/types/items.ts b/api/src/types/items.ts index 5add72b3f5..2c49dc0c00 100644 --- a/api/src/types/items.ts +++ b/api/src/types/items.ts @@ -3,6 +3,8 @@ * expecting an item vs any other generic object. */ +import { EventContext } from '@directus/shared/types'; + export type Item = Record; export type PrimaryKey = string | number; @@ -32,4 +34,16 @@ export type MutationOptions = { * Allow disabling the emitting of hooks. Useful if a custom hook is fired (like files.upload) */ emitEvents?: boolean; + + /** + * To bypass the emitting of action events if emitEvents is enabled + * Can be used to queue up the nested events from item service's create, update and delete + */ + bypassEmitAction?: (params: ActionEventParams) => void; +}; + +export type ActionEventParams = { + event: string | string[]; + meta: Record; + context: EventContext; };