mirror of
https://github.com/directus/directus.git
synced 2026-04-25 03:00:53 -04:00
Fix CreateMany action hooks (#17066)
* queue action events, emit after transaction * also queue nested action events * renamed vars for consistency * fix import csv/json hooks triggering * Add unit tests * Add action verify create tests * Flip check to improve legibility --------- Co-authored-by: Ewout Stortenbeker <me@appy.one> Co-authored-by: rijkvanzanten <rijkvanzanten@me.com>
This commit is contained in:
119
api/src/services/import-export.test.ts
Normal file
119
api/src/services/import-export.test.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import knex, { Knex } from 'knex';
|
||||
import { MockClient, Tracker, getTracker } from 'knex-mock-client';
|
||||
import { ImportService } from '.';
|
||||
import { describe, beforeAll, afterEach, it, expect, vi, beforeEach, MockedFunction } from 'vitest';
|
||||
import { Readable } from 'stream';
|
||||
import emitter from '../emitter';
|
||||
import { parse } from 'json2csv';
|
||||
|
||||
vi.mock('../../src/database/index', () => ({
|
||||
default: vi.fn(),
|
||||
getDatabaseClient: vi.fn().mockReturnValue('postgres'),
|
||||
}));
|
||||
|
||||
describe('Integration Tests', () => {
|
||||
let db: MockedFunction<Knex>;
|
||||
let tracker: Tracker;
|
||||
|
||||
beforeAll(async () => {
|
||||
db = vi.mocked(knex({ client: MockClient }));
|
||||
tracker = getTracker();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
tracker.reset();
|
||||
});
|
||||
|
||||
describe('Services / ImportService', () => {
|
||||
let service: ImportService;
|
||||
let insertedId = 1;
|
||||
const collection = 'test_coll';
|
||||
|
||||
beforeEach(() => {
|
||||
service = new ImportService({
|
||||
knex: db,
|
||||
schema: {
|
||||
collections: {
|
||||
[collection]: {
|
||||
collection,
|
||||
primary: 'id',
|
||||
singleton: false,
|
||||
sortField: null,
|
||||
note: null,
|
||||
accountability: null,
|
||||
fields: {
|
||||
id: {
|
||||
field: 'id',
|
||||
defaultValue: null,
|
||||
nullable: false,
|
||||
generated: true,
|
||||
type: 'integer',
|
||||
dbType: 'integer',
|
||||
precision: null,
|
||||
scale: null,
|
||||
special: [],
|
||||
note: null,
|
||||
validation: null,
|
||||
alias: false,
|
||||
},
|
||||
name: {
|
||||
field: 'name',
|
||||
defaultValue: null,
|
||||
nullable: true,
|
||||
generated: false,
|
||||
type: 'string',
|
||||
dbType: 'string',
|
||||
precision: null,
|
||||
scale: null,
|
||||
special: [],
|
||||
note: null,
|
||||
validation: null,
|
||||
alias: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
relations: [],
|
||||
},
|
||||
});
|
||||
|
||||
insertedId = 1;
|
||||
});
|
||||
|
||||
describe('importJSON', () => {
|
||||
it('Emits action for correct number of times', async () => {
|
||||
const emitActionSpy = vi.spyOn(emitter, 'emitAction');
|
||||
const data = [{ name: 'aaa' }, { name: 'bbb' }, { name: 'ccc' }];
|
||||
const stream = new Readable({
|
||||
read() {
|
||||
this.push(JSON.stringify(data));
|
||||
this.push(null);
|
||||
},
|
||||
});
|
||||
tracker.on.insert(collection).response(() => [insertedId++]);
|
||||
|
||||
await service.importJSON(collection, stream);
|
||||
|
||||
expect(emitActionSpy).toBeCalledTimes(insertedId - 1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('importCSV', () => {
|
||||
it('Emits action for correct number of times', async () => {
|
||||
const emitActionSpy = vi.spyOn(emitter, 'emitAction');
|
||||
const data = [{ name: 'ddd' }, { name: 'eee' }, { name: 'fff' }];
|
||||
const stream = new Readable({
|
||||
read() {
|
||||
this.push(parse(data));
|
||||
this.push(null);
|
||||
},
|
||||
});
|
||||
tracker.on.insert(collection).response(() => [insertedId++]);
|
||||
|
||||
await service.importCSV(collection, stream);
|
||||
|
||||
expect(emitActionSpy).toBeCalledTimes(insertedId - 1);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -20,11 +20,12 @@ import {
|
||||
UnsupportedMediaTypeException,
|
||||
} from '../exceptions';
|
||||
import logger from '../logger';
|
||||
import { AbstractServiceOptions, File } from '../types';
|
||||
import { AbstractServiceOptions, ActionEventParams, File } from '../types';
|
||||
import { getDateFormatted } from '../utils/get-date-formatted';
|
||||
import { FilesService } from './files';
|
||||
import { ItemsService } from './items';
|
||||
import { NotificationsService } from './notifications';
|
||||
import emitter from '../emitter';
|
||||
import type { Readable } from 'node:stream';
|
||||
|
||||
export class ImportService {
|
||||
@@ -66,6 +67,7 @@ export class ImportService {
|
||||
|
||||
importJSON(collection: string, stream: Readable): Promise<void> {
|
||||
const extractJSON = StreamArray.withParser();
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
return this.knex.transaction((trx) => {
|
||||
const service = new ItemsService(collection, {
|
||||
@@ -75,7 +77,7 @@ export class ImportService {
|
||||
});
|
||||
|
||||
const saveQueue = queue(async (value: Record<string, unknown>) => {
|
||||
return await service.upsertOne(value);
|
||||
return await service.upsertOne(value, { bypassEmitAction: (params) => nestedActionEvents.push(params) });
|
||||
});
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
@@ -98,6 +100,10 @@ export class ImportService {
|
||||
|
||||
extractJSON.on('end', () => {
|
||||
saveQueue.drain(() => {
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
|
||||
return resolve();
|
||||
});
|
||||
});
|
||||
@@ -106,6 +112,8 @@ export class ImportService {
|
||||
}
|
||||
|
||||
importCSV(collection: string, stream: Readable): Promise<void> {
|
||||
const nestedActionEvents: ActionEventParams[] = [];
|
||||
|
||||
return this.knex.transaction((trx) => {
|
||||
const service = new ItemsService(collection, {
|
||||
knex: trx,
|
||||
@@ -114,7 +122,7 @@ export class ImportService {
|
||||
});
|
||||
|
||||
const saveQueue = queue(async (value: Record<string, unknown>) => {
|
||||
return await service.upsertOne(value);
|
||||
return await service.upsertOne(value, { bypassEmitAction: (action) => nestedActionEvents.push(action) });
|
||||
});
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
@@ -147,6 +155,10 @@ export class ImportService {
|
||||
})
|
||||
.on('end', () => {
|
||||
saveQueue.drain(() => {
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
|
||||
return resolve();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -244,14 +244,18 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
} else {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
}
|
||||
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(nestedActionEvent);
|
||||
} else {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,8 +284,7 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
const primaryKey = await service.createOne(payload, {
|
||||
...(opts || {}),
|
||||
autoPurgeCache: false,
|
||||
bypassEmitAction: (params) =>
|
||||
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
|
||||
bypassEmitAction: (params) => nestedActionEvents.push(params),
|
||||
});
|
||||
primaryKeys.push(primaryKey);
|
||||
}
|
||||
@@ -291,10 +294,10 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
|
||||
if (opts?.emitEvents !== false) {
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
} else {
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(nestedActionEvent);
|
||||
} else {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -677,14 +680,18 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
} else {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
}
|
||||
|
||||
for (const nestedActionEvent of nestedActionEvents) {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(nestedActionEvent);
|
||||
} else {
|
||||
emitter.emitAction(nestedActionEvent.event, nestedActionEvent.meta, nestedActionEvent.context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -845,10 +852,10 @@ export class ItemsService<Item extends AnyItem = AnyItem> implements AbstractSer
|
||||
},
|
||||
};
|
||||
|
||||
if (!opts?.bypassEmitAction) {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
} else {
|
||||
if (opts?.bypassEmitAction) {
|
||||
opts.bypassEmitAction(actionEvent);
|
||||
} else {
|
||||
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user