Fix flow triggers being registered multiple times (#13783)

Fixes #13739
This commit is contained in:
Nicola Krumschmidt
2022-06-08 16:27:00 +02:00
committed by GitHub
parent ef2b629c9b
commit 2efff9fd16
4 changed files with 146 additions and 57 deletions

View File

@@ -28,6 +28,7 @@ import { omit } from 'lodash';
import { getMessenger } from './messenger';
import fastRedact from 'fast-redact';
import { applyOperationOptions } from './utils/operation-options';
import { JobQueue } from './utils/job-queue';
let flowManager: FlowManager | undefined;
@@ -57,13 +58,74 @@ const ACCOUNTABILITY_KEY = '$accountability';
const LAST_KEY = '$last';
class FlowManager {
private isLoaded = false;
private operations: Record<string, OperationHandler> = {};
private triggerHandlers: TriggerHandler[] = [];
private operationFlowHandlers: Record<string, any> = {};
private webhookFlowHandlers: Record<string, any> = {};
private reloadQueue = new JobQueue();
constructor() {
const messenger = getMessenger();
messenger.subscribe('flows', (event) => {
if (event.type === 'reload') {
this.reloadQueue.enqueue(async () => {
if (this.isLoaded) {
await this.unload();
await this.load();
}
});
}
});
}
public async initialize(): Promise<void> {
if (!this.isLoaded) {
await this.load();
}
}
public async reload(): Promise<void> {
const messenger = getMessenger();
messenger.publish('flows', { type: 'reload' });
}
public addOperation(id: string, operation: OperationHandler): void {
this.operations[id] = operation;
}
public clearOperations(): void {
this.operations = {};
}
public async runOperationFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.operationFlowHandlers)) {
logger.warn(`Couldn't find operation triggered flow with id "${id}"`);
return null;
}
const handler = this.operationFlowHandlers[id];
return handler(data, context);
}
public async runWebhookFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.webhookFlowHandlers)) {
logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`);
throw new exceptions.ForbiddenException();
}
const handler = this.webhookFlowHandlers[id];
return handler(data, context);
}
private async load(): Promise<void> {
const flowsService = new FlowsService({ knex: getDatabase(), schema: await getSchema() });
const flows = await flowsService.readByQuery({
@@ -193,14 +255,10 @@ class FlowManager {
}
}
getMessenger().subscribe('flows', (event) => {
if (event.type === 'reload') {
this.reload();
}
});
this.isLoaded = true;
}
public async reload(): Promise<void> {
private async unload(): Promise<void> {
for (const trigger of this.triggerHandlers) {
trigger.events.forEach((event) => {
switch (event.type) {
@@ -221,37 +279,7 @@ class FlowManager {
this.operationFlowHandlers = {};
this.webhookFlowHandlers = {};
await this.initialize();
}
public addOperation(id: string, operation: OperationHandler): void {
this.operations[id] = operation;
}
public clearOperations(): void {
this.operations = {};
}
public async runOperationFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.operationFlowHandlers)) {
logger.warn(`Couldn't find operation triggered flow with id "${id}"`);
return null;
}
const handler = this.operationFlowHandlers[id];
return handler(data, context);
}
public async runWebhookFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.webhookFlowHandlers)) {
logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`);
throw new exceptions.ForbiddenException();
}
const handler = this.webhookFlowHandlers[id];
return handler(data, context);
this.isLoaded = false;
}
private async executeFlow(flow: Flow, data: unknown = null, context: Record<string, unknown> = {}): Promise<unknown> {

View File

@@ -1,49 +1,64 @@
import { FlowRaw } from '@directus/shared/types';
import { getMessenger, Messenger } from '../messenger';
import { getFlowManager } from '../flows';
import { AbstractServiceOptions, Item, MutationOptions, PrimaryKey } from '../types';
import { ItemsService } from './items';
export class FlowsService extends ItemsService<FlowRaw> {
messenger: Messenger;
constructor(options: AbstractServiceOptions) {
super('directus_flows', options);
this.messenger = getMessenger();
}
async createOne(data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.createOne(data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async createMany(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.createMany(data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async updateOne(key: PrimaryKey, data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.updateOne(key, data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.updateMany(keys, data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.deleteOne(key, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.deleteMany(keys, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
}

View File

@@ -1,49 +1,64 @@
import { OperationRaw } from '@directus/shared/types';
import { getMessenger, Messenger } from '../messenger';
import { getFlowManager } from '../flows';
import { AbstractServiceOptions, Item, MutationOptions, PrimaryKey } from '../types';
import { ItemsService } from './items';
export class OperationsService extends ItemsService<OperationRaw> {
messenger: Messenger;
constructor(options: AbstractServiceOptions) {
super('directus_operations', options);
this.messenger = getMessenger();
}
async createOne(data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.createOne(data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async createMany(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.createMany(data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async updateOne(key: PrimaryKey, data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.updateOne(key, data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.updateMany(keys, data, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise<PrimaryKey> {
const flowManager = getFlowManager();
const result = await super.deleteOne(key, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const flowManager = getFlowManager();
const result = await super.deleteMany(keys, opts);
this.messenger.publish('flows', { type: 'reload' });
await flowManager.reload();
return result;
}
}

View File

@@ -0,0 +1,31 @@
type Job = () => Promise<void> | void;
export class JobQueue {
private running: boolean;
private jobs: Job[];
constructor() {
this.running = false;
this.jobs = [];
}
public enqueue(job: Job): void {
this.jobs.push(job);
if (!this.running) {
this.run();
}
}
private async run(): Promise<void> {
this.running = true;
while (this.jobs.length > 0) {
const job = this.jobs.shift()!;
await job();
}
this.running = false;
}
}