diff --git a/api/src/flows.ts b/api/src/flows.ts index 9c9690bd5d..7e3fcfa764 100644 --- a/api/src/flows.ts +++ b/api/src/flows.ts @@ -28,6 +28,7 @@ import { omit } from 'lodash'; import { getMessenger } from './messenger'; import fastRedact from 'fast-redact'; import { applyOperationOptions } from './utils/operation-options'; +import { JobQueue } from './utils/job-queue'; let flowManager: FlowManager | undefined; @@ -57,13 +58,74 @@ const ACCOUNTABILITY_KEY = '$accountability'; const LAST_KEY = '$last'; class FlowManager { + private isLoaded = false; + private operations: Record = {}; private triggerHandlers: TriggerHandler[] = []; private operationFlowHandlers: Record = {}; private webhookFlowHandlers: Record = {}; + private reloadQueue = new JobQueue(); + + constructor() { + const messenger = getMessenger(); + + messenger.subscribe('flows', (event) => { + if (event.type === 'reload') { + this.reloadQueue.enqueue(async () => { + if (this.isLoaded) { + await this.unload(); + await this.load(); + } + }); + } + }); + } + public async initialize(): Promise { + if (!this.isLoaded) { + await this.load(); + } + } + + public async reload(): Promise { + const messenger = getMessenger(); + + messenger.publish('flows', { type: 'reload' }); + } + + public addOperation(id: string, operation: OperationHandler): void { + this.operations[id] = operation; + } + + public clearOperations(): void { + this.operations = {}; + } + + public async runOperationFlow(id: string, data: unknown, context: Record): Promise { + if (!(id in this.operationFlowHandlers)) { + logger.warn(`Couldn't find operation triggered flow with id "${id}"`); + return null; + } + + const handler = this.operationFlowHandlers[id]; + + return handler(data, context); + } + + public async runWebhookFlow(id: string, data: unknown, context: Record): Promise { + if (!(id in this.webhookFlowHandlers)) { + logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`); + throw new exceptions.ForbiddenException(); + } + + const handler = this.webhookFlowHandlers[id]; + + return handler(data, context); + } + + private async load(): Promise { const flowsService = new FlowsService({ knex: getDatabase(), schema: await getSchema() }); const flows = await flowsService.readByQuery({ @@ -193,14 +255,10 @@ class FlowManager { } } - getMessenger().subscribe('flows', (event) => { - if (event.type === 'reload') { - this.reload(); - } - }); + this.isLoaded = true; } - public async reload(): Promise { + private async unload(): Promise { for (const trigger of this.triggerHandlers) { trigger.events.forEach((event) => { switch (event.type) { @@ -221,37 +279,7 @@ class FlowManager { this.operationFlowHandlers = {}; this.webhookFlowHandlers = {}; - await this.initialize(); - } - - public addOperation(id: string, operation: OperationHandler): void { - this.operations[id] = operation; - } - - public clearOperations(): void { - this.operations = {}; - } - - public async runOperationFlow(id: string, data: unknown, context: Record): Promise { - if (!(id in this.operationFlowHandlers)) { - logger.warn(`Couldn't find operation triggered flow with id "${id}"`); - return null; - } - - const handler = this.operationFlowHandlers[id]; - - return handler(data, context); - } - - public async runWebhookFlow(id: string, data: unknown, context: Record): Promise { - if (!(id in this.webhookFlowHandlers)) { - logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`); - throw new exceptions.ForbiddenException(); - } - - const handler = this.webhookFlowHandlers[id]; - - return handler(data, context); + this.isLoaded = false; } private async executeFlow(flow: Flow, data: unknown = null, context: Record = {}): Promise { diff --git a/api/src/services/flows.ts b/api/src/services/flows.ts index a1b5559fd6..5c0e749f68 100644 --- a/api/src/services/flows.ts +++ b/api/src/services/flows.ts @@ -1,49 +1,64 @@ import { FlowRaw } from '@directus/shared/types'; -import { getMessenger, Messenger } from '../messenger'; +import { getFlowManager } from '../flows'; import { AbstractServiceOptions, Item, MutationOptions, PrimaryKey } from '../types'; import { ItemsService } from './items'; export class FlowsService extends ItemsService { - messenger: Messenger; - constructor(options: AbstractServiceOptions) { super('directus_flows', options); - this.messenger = getMessenger(); } async createOne(data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.createOne(data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async createMany(data: Partial[], opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.createMany(data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async updateOne(key: PrimaryKey, data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.updateOne(key, data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async updateMany(keys: PrimaryKey[], data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.updateMany(keys, data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.deleteOne(key, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.deleteMany(keys, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } } diff --git a/api/src/services/operations.ts b/api/src/services/operations.ts index c5ad2abe36..deb85b646f 100644 --- a/api/src/services/operations.ts +++ b/api/src/services/operations.ts @@ -1,49 +1,64 @@ import { OperationRaw } from '@directus/shared/types'; -import { getMessenger, Messenger } from '../messenger'; +import { getFlowManager } from '../flows'; import { AbstractServiceOptions, Item, MutationOptions, PrimaryKey } from '../types'; import { ItemsService } from './items'; export class OperationsService extends ItemsService { - messenger: Messenger; - constructor(options: AbstractServiceOptions) { super('directus_operations', options); - this.messenger = getMessenger(); } async createOne(data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.createOne(data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async createMany(data: Partial[], opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.createMany(data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async updateOne(key: PrimaryKey, data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.updateOne(key, data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async updateMany(keys: PrimaryKey[], data: Partial, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.updateMany(keys, data, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.deleteOne(key, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise { + const flowManager = getFlowManager(); + const result = await super.deleteMany(keys, opts); - this.messenger.publish('flows', { type: 'reload' }); + await flowManager.reload(); + return result; } } diff --git a/api/src/utils/job-queue.ts b/api/src/utils/job-queue.ts new file mode 100644 index 0000000000..bfdfc45747 --- /dev/null +++ b/api/src/utils/job-queue.ts @@ -0,0 +1,31 @@ +type Job = () => Promise | void; + +export class JobQueue { + private running: boolean; + private jobs: Job[]; + + constructor() { + this.running = false; + this.jobs = []; + } + + public enqueue(job: Job): void { + this.jobs.push(job); + + if (!this.running) { + this.run(); + } + } + + private async run(): Promise { + this.running = true; + + while (this.jobs.length > 0) { + const job = this.jobs.shift()!; + + await job(); + } + + this.running = false; + } +}