mirror of
https://github.com/directus/directus.git
synced 2026-01-23 11:47:59 -05:00
Emit nested action events after the transaction completes (#14981)
* Emit nested action events after the transaction completes * Propagate mutation options for the disabling of nested emits * Disable emit in test * Shift emitting
This commit is contained in:
@@ -938,9 +938,13 @@ describe('Integration Tests', () => {
|
||||
},
|
||||
schema: schemas[schema].schema,
|
||||
});
|
||||
const response = await itemsService.updateOne(item.id, {
|
||||
items: [],
|
||||
});
|
||||
const response = await itemsService.updateOne(
|
||||
item.id,
|
||||
{
|
||||
items: [],
|
||||
},
|
||||
{ emitEvents: false }
|
||||
);
|
||||
|
||||
expect(tracker.history.select.length).toBe(4);
|
||||
expect(tracker.history.select[0].bindings).toStrictEqual([item.id, 1]);
|
||||
|
||||
@@ -9,7 +9,14 @@ import emitter from '../emitter';
|
||||
import env from '../env';
|
||||
import { ForbiddenException, InvalidPayloadException } from '../exceptions';
|
||||
import { translateDatabaseError } from '../exceptions/database/translate';
|
||||
import { AbstractService, AbstractServiceOptions, Item as AnyItem, MutationOptions, PrimaryKey } from '../types';
|
||||
import {
|
||||
AbstractService,
|
||||
AbstractServiceOptions,
|
||||
ActionEventParams,
|
||||
Item as AnyItem,
|
||||
MutationOptions,
|
||||
PrimaryKey,
|
||||
} from '../types';
|
||||
import getASTFromQuery from '../utils/get-ast-from-query';
|
||||
import { validateKeys } from '../utils/validate-keys';
|
||||
import { AuthorizationService } from './authorization';
|
||||
@@ -69,6 +76,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
.map((field) => field.field);
|
||||
|
||||
const payload: AnyItem = cloneDeep(data);
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
// By wrapping the logic in a transaction, we make sure we automatically roll back all the
|
||||
// changes in the DB if any of the parts contained within throws an error. This also means
|
||||
@@ -112,8 +120,16 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
? await authorizationService.validatePayload('create', this.collection, payloadAfterHooks)
|
||||
: payloadAfterHooks;
|
||||
|
||||
const { payload: payloadWithM2O, revisions: revisionsM2O } = await payloadService.processM2O(payloadWithPresets);
|
||||
const { payload: payloadWithA2O, revisions: revisionsA2O } = await payloadService.processA2O(payloadWithM2O);
|
||||
const {
|
||||
payload: payloadWithM2O,
|
||||
revisions: revisionsM2O,
|
||||
nestedActionEvents: nestedActionEventsM2O,
|
||||
} = await payloadService.processM2O(payloadWithPresets, opts);
|
||||
const {
|
||||
payload: payloadWithA2O,
|
||||
revisions: revisionsA2O,
|
||||
nestedActionEvents: nestedActionEventsA2O,
|
||||
} = await payloadService.processA2O(payloadWithM2O, opts);
|
||||
|
||||
const payloadWithoutAliases = pick(payloadWithA2O, without(fields, ...aliases));
|
||||
const payloadWithTypeCasting = await payloadService.processValues('create', payloadWithoutAliases);
|
||||
@@ -146,7 +162,15 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
payload[primaryKeyField] = primaryKey;
|
||||
}
|
||||
|
||||
const { revisions: revisionsO2M } = await payloadService.processO2M(payload, primaryKey);
|
||||
const { revisions: revisionsO2M, nestedActionEvents: nestedActionEventsO2M } = await payloadService.processO2M(
|
||||
payload,
|
||||
primaryKey,
|
||||
opts
|
||||
);
|
||||
|
||||
nestedActionEvents.push(...nestedActionEventsM2O);
|
||||
nestedActionEvents.push(...nestedActionEventsA2O);
|
||||
nestedActionEvents.push(...nestedActionEventsO2M);
|
||||
|
||||
// If this is an authenticated action, and accountability tracking is enabled, save activity row
|
||||
if (this.accountability && this.schema.collections[this.collection].accountability !== null) {
|
||||
@@ -196,21 +220,32 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
});
|
||||
|
||||
if (opts?.emitEvents !== false) {
|
||||
emitter.emitAction(
|
||||
this.eventScope === 'items' ? ['items.create', `${this.collection}.items.create`] : `${this.eventScope}.create`,
|
||||
{
|
||||
const actionEvent = {
|
||||
event:
|
||||
this.eventScope === 'items'
|
||||
? ['items.create', `${this.collection}.items.create`]
|
||||
: `${this.eventScope}.create`,
|
||||
meta: {
|
||||
payload,
|
||||
key: primaryKey,
|
||||
collection: this.collection,
|
||||
},
|
||||
{
|
||||
// This hook is called async. If we would pass the transaction here, the hook can be
|
||||
// called after the transaction is done #5460
|
||||
database: this.knex || getDatabase(),
|
||||
context: {
|
||||
database: getDatabase(),
|
||||
schema: this.schema,
|
||||
accountability: this.accountability,
|
||||
}
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
}
|
||||
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cache && env.CACHE_AUTO_PURGE && opts?.autoPurgeCache !== false) {
|
||||
@@ -416,6 +451,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
.map((field) => field.field);
|
||||
|
||||
const payload: Partial<AnyItem> = cloneDeep(data);
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
const authorizationService = new AuthorizationService({
|
||||
accountability: this.accountability,
|
||||
@@ -462,8 +498,16 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
schema: this.schema,
|
||||
});
|
||||
|
||||
const { payload: payloadWithM2O, revisions: revisionsM2O } = await payloadService.processM2O(payloadWithPresets);
|
||||
const { payload: payloadWithA2O, revisions: revisionsA2O } = await payloadService.processA2O(payloadWithM2O);
|
||||
const {
|
||||
payload: payloadWithM2O,
|
||||
revisions: revisionsM2O,
|
||||
nestedActionEvents: nestedActionEventsM2O,
|
||||
} = await payloadService.processM2O(payloadWithPresets, opts);
|
||||
const {
|
||||
payload: payloadWithA2O,
|
||||
revisions: revisionsA2O,
|
||||
nestedActionEvents: nestedActionEventsA2O,
|
||||
} = await payloadService.processA2O(payloadWithM2O, opts);
|
||||
|
||||
const payloadWithoutAliasAndPK = pick(payloadWithA2O, without(fields, primaryKeyField, ...aliases));
|
||||
const payloadWithTypeCasting = await payloadService.processValues('update', payloadWithoutAliasAndPK);
|
||||
@@ -478,9 +522,17 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
|
||||
const childrenRevisions = [...revisionsM2O, ...revisionsA2O];
|
||||
|
||||
nestedActionEvents.push(...nestedActionEventsM2O);
|
||||
nestedActionEvents.push(...nestedActionEventsA2O);
|
||||
|
||||
for (const key of keys) {
|
||||
const { revisions } = await payloadService.processO2M(payload, key);
|
||||
const { revisions, nestedActionEvents: nestedActionEventsO2M } = await payloadService.processO2M(
|
||||
payload,
|
||||
key,
|
||||
opts
|
||||
);
|
||||
childrenRevisions.push(...revisions);
|
||||
nestedActionEvents.push(...nestedActionEventsO2M);
|
||||
}
|
||||
|
||||
// If this is an authenticated action, and accountability tracking is enabled, save activity row
|
||||
@@ -555,21 +607,32 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
}
|
||||
|
||||
if (opts?.emitEvents !== false) {
|
||||
emitter.emitAction(
|
||||
this.eventScope === 'items' ? ['items.update', `${this.collection}.items.update`] : `${this.eventScope}.update`,
|
||||
{
|
||||
const actionEvent = {
|
||||
event:
|
||||
this.eventScope === 'items'
|
||||
? ['items.update', `${this.collection}.items.update`]
|
||||
: `${this.eventScope}.update`,
|
||||
meta: {
|
||||
payload,
|
||||
keys,
|
||||
collection: this.collection,
|
||||
},
|
||||
{
|
||||
// This hook is called async. If we would pass the transaction here, the hook can be
|
||||
// called after the transaction is done #5460
|
||||
database: this.knex || getDatabase(),
|
||||
context: {
|
||||
database: getDatabase(),
|
||||
schema: this.schema,
|
||||
accountability: this.accountability,
|
||||
}
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
}
|
||||
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
}
|
||||
|
||||
return keys;
|
||||
@@ -711,21 +774,28 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
}
|
||||
|
||||
if (opts?.emitEvents !== false) {
|
||||
emitter.emitAction(
|
||||
this.eventScope === 'items' ? ['items.delete', `${this.collection}.items.delete`] : `${this.eventScope}.delete`,
|
||||
{
|
||||
const actionEvent = {
|
||||
event:
|
||||
this.eventScope === 'items'
|
||||
? ['items.delete', `${this.collection}.items.delete`]
|
||||
: `${this.eventScope}.delete`,
|
||||
meta: {
|
||||
payload: keys,
|
||||
keys: keys,
|
||||
collection: this.collection,
|
||||
},
|
||||
{
|
||||
// This hook is called async. If we would pass the transaction here, the hook can be
|
||||
// called after the transaction is done #5460
|
||||
database: this.knex || getDatabase(),
|
||||
context: {
|
||||
database: getDatabase(),
|
||||
schema: this.schema,
|
||||
accountability: this.accountability,
|
||||
}
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
}
|
||||
}
|
||||
|
||||
return keys;
|
||||
|
||||
@@ -10,7 +10,7 @@ import { parse as wktToGeoJSON } from 'wellknown';
|
||||
import getDatabase from '../database';
|
||||
import { getHelpers, Helpers } from '../database/helpers';
|
||||
import { ForbiddenException, InvalidPayloadException } from '../exceptions';
|
||||
import { AbstractServiceOptions, Alterations, Item, PrimaryKey } from '../types';
|
||||
import { AbstractServiceOptions, ActionEventParams, Alterations, Item, MutationOptions, PrimaryKey } from '../types';
|
||||
import { generateHash } from '../utils/generate-hash';
|
||||
import { ItemsService } from './items';
|
||||
|
||||
@@ -364,13 +364,18 @@ export class PayloadService {
|
||||
/**
|
||||
* Recursively save/update all nested related Any-to-One items
|
||||
*/
|
||||
async processA2O(data: Partial<Item>): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[] }> {
|
||||
async processA2O(
|
||||
data: Partial<Item>,
|
||||
opts?: MutationOptions
|
||||
): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
|
||||
const relations = this.schema.relations.filter((relation) => {
|
||||
return relation.collection === this.collection;
|
||||
});
|
||||
|
||||
const revisions: PrimaryKey[] = [];
|
||||
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
const payload = cloneDeep(data);
|
||||
|
||||
// Only process related records that are actually in the payload
|
||||
@@ -427,11 +432,15 @@ export class PayloadService {
|
||||
if (Object.keys(fieldsToUpdate).length > 0) {
|
||||
await itemsService.updateOne(relatedPrimaryKey, relatedRecord, {
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
relatedPrimaryKey = await itemsService.createOne(relatedRecord, {
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -439,18 +448,23 @@ export class PayloadService {
|
||||
payload[relation.field] = relatedPrimaryKey;
|
||||
}
|
||||
|
||||
return { payload, revisions };
|
||||
return { payload, revisions, nestedActionEvents };
|
||||
}
|
||||
|
||||
/**
|
||||
* Save/update all nested related m2o items inside the payload
|
||||
*/
|
||||
async processM2O(data: Partial<Item>): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[] }> {
|
||||
async processM2O(
|
||||
data: Partial<Item>,
|
||||
opts?: MutationOptions
|
||||
): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
|
||||
const payload = cloneDeep(data);
|
||||
|
||||
// All the revisions saved on this level
|
||||
const revisions: PrimaryKey[] = [];
|
||||
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
// Many to one relations that exist on the current collection
|
||||
const relations = this.schema.relations.filter((relation) => {
|
||||
return relation.collection === this.collection;
|
||||
@@ -495,11 +509,15 @@ export class PayloadService {
|
||||
if (Object.keys(fieldsToUpdate).length > 0) {
|
||||
await itemsService.updateOne(relatedPrimaryKey, relatedRecord, {
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
relatedPrimaryKey = await itemsService.createOne(relatedRecord, {
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -507,15 +525,21 @@ export class PayloadService {
|
||||
payload[relation.field] = relatedPrimaryKey;
|
||||
}
|
||||
|
||||
return { payload, revisions };
|
||||
return { payload, revisions, nestedActionEvents };
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively save/update all nested related o2m items
|
||||
*/
|
||||
async processO2M(data: Partial<Item>, parent: PrimaryKey): Promise<{ revisions: PrimaryKey[] }> {
|
||||
async processO2M(
|
||||
data: Partial<Item>,
|
||||
parent: PrimaryKey,
|
||||
opts?: MutationOptions
|
||||
): Promise<{ revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
|
||||
const revisions: PrimaryKey[] = [];
|
||||
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
const relations = this.schema.relations.filter((relation) => {
|
||||
return relation.related_collection === this.collection;
|
||||
});
|
||||
@@ -598,6 +622,8 @@ export class PayloadService {
|
||||
savedPrimaryKeys.push(
|
||||
...(await itemsService.upsertMany(recordsToUpsert, {
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
}))
|
||||
);
|
||||
|
||||
@@ -621,13 +647,18 @@ export class PayloadService {
|
||||
// Nullify all related items that aren't included in the current payload
|
||||
if (relation.meta.one_deselect_action === 'delete') {
|
||||
// There's no revision for a deletion
|
||||
await itemsService.deleteByQuery(query);
|
||||
await itemsService.deleteByQuery(query, {
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
} else {
|
||||
await itemsService.updateByQuery(
|
||||
query,
|
||||
{ [relation.field]: null },
|
||||
{
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -646,6 +677,8 @@ export class PayloadService {
|
||||
})),
|
||||
{
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -662,6 +695,8 @@ export class PayloadService {
|
||||
},
|
||||
{
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -686,13 +721,18 @@ export class PayloadService {
|
||||
};
|
||||
|
||||
if (relation.meta.one_deselect_action === 'delete') {
|
||||
await itemsService.deleteByQuery(query);
|
||||
await itemsService.deleteByQuery(query, {
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
});
|
||||
} else {
|
||||
await itemsService.updateByQuery(
|
||||
query,
|
||||
{ [relation.field]: null },
|
||||
{
|
||||
onRevisionCreate: (pk) => revisions.push(pk),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
emitEvents: opts?.emitEvents,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -700,7 +740,7 @@ export class PayloadService {
|
||||
}
|
||||
}
|
||||
|
||||
return { revisions };
|
||||
return { revisions, nestedActionEvents };
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
* expecting an item vs any other generic object.
|
||||
*/
|
||||
|
||||
import { EventContext } from '@directus/shared/types';
|
||||
|
||||
export type Item = Record<string, any>;
|
||||
|
||||
export type PrimaryKey = string | number;
|
||||
@@ -32,4 +34,16 @@ export type MutationOptions = {
|
||||
* Allow disabling the emitting of hooks. Useful if a custom hook is fired (like files.upload)
|
||||
*/
|
||||
emitEvents?: boolean;
|
||||
|
||||
/**
|
||||
* To bypass the emitting of action events if emitEvents is enabled
|
||||
* Can be used to queue up the nested events from item service's create, update and delete
|
||||
*/
|
||||
bypassEmitAction?: (params: ActionEventParams) => void;
|
||||
};
|
||||
|
||||
export type ActionEventParams = {
|
||||
event: string | string[];
|
||||
meta: Record<string, any>;
|
||||
context: EventContext;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user