From 69061c12849066bcd0f65d703ccb313e8f2951ea Mon Sep 17 00:00:00 2001 From: ian Date: Tue, 2 May 2023 23:17:07 +0800 Subject: [PATCH] Add Cache option for Flows webhook trigger (#18277) Co-authored-by: Azri Kahar <42867097+azrikahar@users.noreply.github.com> Co-authored-by: rijkvanzanten Co-authored-by: Nicola Krumschmidt Co-authored-by: Pascal Jufer --- .changeset/modern-eels-judge.md | 7 + api/src/controllers/flows.ts | 6 +- api/src/flows.ts | 22 +- app/src/lang/translations/en-US.yaml | 1 + .../modules/settings/routes/flows/triggers.ts | 15 +- docs/app/flows/triggers.md | 12 +- tests/blackbox/routes/flows/webhook.test.ts | 222 ++++++++++++++++++ tests/blackbox/setup/sequentialTests.js | 1 + 8 files changed, 272 insertions(+), 14 deletions(-) create mode 100644 .changeset/modern-eels-judge.md create mode 100644 tests/blackbox/routes/flows/webhook.test.ts diff --git a/.changeset/modern-eels-judge.md b/.changeset/modern-eels-judge.md new file mode 100644 index 0000000000..eb26015773 --- /dev/null +++ b/.changeset/modern-eels-judge.md @@ -0,0 +1,7 @@ +--- +"@directus/app": minor +"@directus/api": minor +"tests-blackbox": minor +--- + +Added `Cache` option so that caching can be disabled for GET requests to Flows webhook trigger diff --git a/api/src/controllers/flows.ts b/api/src/controllers/flows.ts index e92ac306c9..837e92e208 100644 --- a/api/src/controllers/flows.ts +++ b/api/src/controllers/flows.ts @@ -18,7 +18,7 @@ router.use(useCollection('directus_flows')); const webhookFlowHandler = asyncHandler(async (req, res, next) => { const flowManager = getFlowManager(); - const result = await flowManager.runWebhookFlow( + const { result, cacheEnabled } = await flowManager.runWebhookFlow( `${req.method}-${req.params['pk']}`, { path: req.path, @@ -33,6 +33,10 @@ const webhookFlowHandler = asyncHandler(async (req, res, next) => { } ); + if (!cacheEnabled) { + res.locals['cache'] = false; + } + res.locals['payload'] = result; return next(); }); diff --git a/api/src/flows.ts b/api/src/flows.ts index 85d50a70f3..729f69e93a 100644 --- a/api/src/flows.ts +++ b/api/src/flows.ts @@ -121,7 +121,11 @@ class FlowManager { return handler(data, context); } - public async runWebhookFlow(id: string, data: unknown, context: Record): Promise { + public async runWebhookFlow( + id: string, + data: unknown, + context: Record + ): Promise<{ result: unknown; cacheEnabled: boolean }> { if (!(id in this.webhookFlowHandlers)) { logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`); throw new exceptions.ForbiddenException(); @@ -220,17 +224,23 @@ class FlowManager { this.operationFlowHandlers[flow.id] = handler; } else if (flow.trigger === 'webhook') { - const handler = (data: unknown, context: Record) => { + const method = flow.options?.['method'] ?? 'GET'; + + const handler = async (data: unknown, context: Record) => { + let cacheEnabled = true; + + if (method === 'GET') { + cacheEnabled = flow.options['cacheEnabled'] !== false; + } + if (flow.options['async']) { this.executeFlow(flow, data, context); - return undefined; + return { result: undefined, cacheEnabled }; } else { - return this.executeFlow(flow, data, context); + return { result: await this.executeFlow(flow, data, context), cacheEnabled }; } }; - const method = flow.options?.['method'] ?? 'GET'; - // Default return to $last for webhooks flow.options['return'] = flow.options['return'] ?? '$last'; diff --git a/app/src/lang/translations/en-US.yaml b/app/src/lang/translations/en-US.yaml index 8a92773b59..18921689f4 100644 --- a/app/src/lang/translations/en-US.yaml +++ b/app/src/lang/translations/en-US.yaml @@ -2172,3 +2172,4 @@ operations: parallel: Parallel batch: Batch batch_size: Batch Size + cache: Cache diff --git a/app/src/modules/settings/routes/flows/triggers.ts b/app/src/modules/settings/routes/flows/triggers.ts index 8fdfc74676..3f394b33aa 100644 --- a/app/src/modules/settings/routes/flows/triggers.ts +++ b/app/src/modules/settings/routes/flows/triggers.ts @@ -197,7 +197,7 @@ export function getTriggers() { copyable: true, }, ], - options: ({ async }) => [ + options: ({ async, method }) => [ { field: 'method', name: t('triggers.webhook.method'), @@ -255,6 +255,19 @@ export function getTriggers() { hidden: async, }, }, + { + field: 'cacheEnabled', + name: '$t:operations.trigger.cache', + type: 'boolean', + meta: { + width: 'half', + hidden: method && method !== 'GET', + interface: 'toggle', + }, + schema: { + default_value: true, + }, + }, ], }, { diff --git a/docs/app/flows/triggers.md b/docs/app/flows/triggers.md index 69ce95adbe..f0b4119b3a 100644 --- a/docs/app/flows/triggers.md +++ b/docs/app/flows/triggers.md @@ -71,8 +71,7 @@ cancel the transaction. :::tip Cancelling Transactions To completely cancel a transaction, you'll need to throw an error within a -[script operation](/app/flows/operations#script) or end the flow on a -[failure path](/app/flows#control-flow). +[script operation](/app/flows/operations#script) or end the flow on a [failure path](/app/flows#control-flow). ::: @@ -109,6 +108,8 @@ trigger panel. - **Data of Last Operation** — Responds with value from `$last`. - **All Data** — Responds with the entire data chain. - **Other** — Responds with value from an ``. +- **Cache** — Choose whether responses to `GET` requests should be stored and served from cache or the cache should be + bypassed. :::tip Response Body @@ -151,8 +152,7 @@ This trigger enables you to create data at scheduled intervals, via ![Another Flow](https://cdn.directus.io/docs/v9/configuration/flows/triggers/triggers-20220603A/another-flow-20220602A.webp) -This trigger executes by the [trigger flow](/app/flows/operations#another-flow) operation, allowing you to -chain flows. +This trigger executes by the [trigger flow](/app/flows/operations#another-flow) operation, allowing you to chain flows. - **Response Body** — Optional. Defines data to return and append under the `` of [trigger flow](/app/flows/operations#another-flow) operation that tripped the trigger. Choose to return: @@ -185,8 +185,8 @@ automatically. These item IDs are passed in to `$trigger`. - **Collection Page (Requires Selection)** — Toggle whether a selection is required in the Collection Page to trigger. - **Require Confirmation** - Toggle whether a confirmation dialog will be shown before the flow is executed. -After the operation runs, a toast notification will appear in your [sidebar](/app/overview#4-sidebar) indicating -whether the flow ran successfully. +After the operation runs, a toast notification will appear in your [sidebar](/app/overview#4-sidebar) indicating whether +the flow ran successfully. ### Confirmation Dialog diff --git a/tests/blackbox/routes/flows/webhook.test.ts b/tests/blackbox/routes/flows/webhook.test.ts new file mode 100644 index 0000000000..ffd5f7732e --- /dev/null +++ b/tests/blackbox/routes/flows/webhook.test.ts @@ -0,0 +1,222 @@ +import config, { Env, getUrl, paths } from '@common/config'; +import vendors from '@common/get-dbs-to-test'; +import * as common from '@common/index'; +import request from 'supertest'; +import { awaitDirectusConnection } from '@utils/await-connection'; +import { ChildProcess, spawn } from 'child_process'; +import knex from 'knex'; +import type { Knex } from 'knex'; +import { cloneDeep } from 'lodash'; +import { sleep } from '@utils/sleep'; + +describe('/flows', () => { + const databases = new Map(); + const directusInstances = {} as { [vendor: string]: ChildProcess }; + const envs = {} as { [vendor: string]: Env }; + + beforeAll(async () => { + const promises = []; + + for (const vendor of vendors) { + databases.set(vendor, knex(config.knexConfig[vendor]!)); + + const env = cloneDeep(config.envs); + env[vendor].CACHE_ENABLED = 'true'; + env[vendor].CACHE_STORE = 'memory'; + + const newServerPort = Number(env[vendor]!.PORT) + 150; + env[vendor]!.PORT = String(newServerPort); + + const server = spawn('node', [paths.cli, 'start'], { cwd: paths.cwd, env: env[vendor] }); + + directusInstances[vendor] = server; + envs[vendor] = env; + + promises.push(awaitDirectusConnection(newServerPort)); + } + + // Give the server some time to start + await Promise.all(promises); + }, 180000); + + afterAll(async () => { + for (const [vendor, connection] of databases) { + directusInstances[vendor].kill(); + await connection.destroy(); + } + }); + + describe('Webhook Trigger', () => { + describe('cacheEnabled works for GET', () => { + it.each(vendors)('%s', async (vendor) => { + // Setup + const env = envs[vendor]; + + const payloadFlowCreate = { + name: 'webhook flow', + icon: 'bolt', + color: null, + description: null, + status: 'active', + accountability: null, + trigger: 'webhook', + options: {}, + }; + + const payloadOperationCreate = { + position_x: 19, + position_y: 1, + name: 'Get epoch milliseconds', + key: 'op_exev', + type: 'exec', + options: { code: 'module.exports = async function() { return { epoch: Date.now() }; }' }, + }; + + const flowId = ( + await request(getUrl(vendor, env)) + .post('/flows') + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .query({ fields: ['id'] }) + .send(payloadFlowCreate) + ).body.data.id; + + const flowCacheEnabledId = ( + await request(getUrl(vendor, env)) + .post('/flows') + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .query({ fields: ['id'] }) + .send({ + ...payloadFlowCreate, + name: 'webhook flow cache disabled', + options: { ...payloadFlowCreate.options, cacheEnabled: true }, + }) + ).body.data.id; + + const flowCacheDisabledId = ( + await request(getUrl(vendor, env)) + .post('/flows') + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .query({ fields: ['id'] }) + .send({ + ...payloadFlowCreate, + name: 'webhook flow cache enabled', + options: { ...payloadFlowCreate.options, cacheEnabled: false }, + }) + ).body.data.id; + + await request(getUrl(vendor, env)) + .patch(`/flows/${flowId}`) + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .send({ operation: { ...payloadOperationCreate, flow: flowId } }); + + await request(getUrl(vendor, env)) + .patch(`/flows/${flowCacheEnabledId}`) + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .send({ operation: { ...payloadOperationCreate, flow: flowCacheEnabledId } }); + + await request(getUrl(vendor, env)) + .patch(`/flows/${flowCacheDisabledId}`) + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .send({ operation: { ...payloadOperationCreate, flow: flowCacheDisabledId } }); + + // Action + const responseDefault = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowId}`); + const responseCacheEnabled = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowCacheEnabledId}`); + const responseCacheDisabled = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowCacheDisabledId}`); + + await sleep(100); + + const responseDefault2 = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowId}`); + const responseCacheEnabled2 = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowCacheEnabledId}`); + const responseCacheDisabled2 = await request(getUrl(vendor, env)).get(`/flows/trigger/${flowCacheDisabledId}`); + + // Assert + expect(responseDefault.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheEnabled.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheDisabled.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseDefault2.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheEnabled.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheDisabled2.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + + expect(responseDefault.body).toEqual(responseDefault2.body); + expect(responseCacheEnabled.body).toEqual(responseCacheEnabled2.body); + expect(responseCacheDisabled.body).not.toEqual(responseCacheDisabled2.body); + }); + }); + + describe('ignores cacheEnabled for POST', () => { + it.each(vendors)('%s', async (vendor) => { + // Setup + const env = envs[vendor]; + + const payloadFlowCreate = { + name: 'POST webhook flow', + icon: 'bolt', + color: null, + description: null, + status: 'active', + accountability: null, + trigger: 'webhook', + options: { method: 'POST' }, + }; + + const payloadOperationCreate = { + position_x: 19, + position_y: 1, + name: 'Get epoch milliseconds', + key: 'op_exev', + type: 'exec', + options: { code: 'module.exports = async function() { return { epoch: Date.now() }; }' }, + }; + + const flowId = ( + await request(getUrl(vendor, env)) + .post('/flows') + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .query({ fields: ['id'] }) + .send(payloadFlowCreate) + ).body.data.id; + + const flowCacheEnabledId = ( + await request(getUrl(vendor, env)) + .post('/flows') + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .query({ fields: ['id'] }) + .send({ + ...payloadFlowCreate, + name: 'POST webhook flow cache enabled', + options: { ...payloadFlowCreate.options, cacheEnabled: false }, + }) + ).body.data.id; + + await request(getUrl(vendor, env)) + .patch(`/flows/${flowId}`) + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .send({ operation: { ...payloadOperationCreate, flow: flowId } }); + + await request(getUrl(vendor, env)) + .patch(`/flows/${flowCacheEnabledId}`) + .set('Authorization', `Bearer ${common.USER.ADMIN.TOKEN}`) + .send({ operation: { ...payloadOperationCreate, flow: flowCacheEnabledId } }); + + // Action + const responseDefault = await request(getUrl(vendor, env)).post(`/flows/trigger/${flowId}`); + const responseCacheEnabled = await request(getUrl(vendor, env)).post(`/flows/trigger/${flowCacheEnabledId}`); + + await sleep(100); + + const responseDefault2 = await request(getUrl(vendor, env)).post(`/flows/trigger/${flowId}`); + const responseCacheEnabled2 = await request(getUrl(vendor, env)).post(`/flows/trigger/${flowCacheEnabledId}`); + + // Assert + expect(responseDefault.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheEnabled.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseDefault2.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + expect(responseCacheEnabled2.body).toEqual(expect.objectContaining({ epoch: expect.any(Number) })); + + expect(responseDefault.body).not.toEqual(responseDefault2.body); + expect(responseCacheEnabled.body).not.toEqual(responseCacheEnabled2.body); + }); + }); + }); +}); diff --git a/tests/blackbox/setup/sequentialTests.js b/tests/blackbox/setup/sequentialTests.js index 1119b9ca6c..979b9eacb3 100644 --- a/tests/blackbox/setup/sequentialTests.js +++ b/tests/blackbox/setup/sequentialTests.js @@ -12,6 +12,7 @@ exports.list = { { testFilePath: '/schema/timezone/timezone-changed-node-tz-america.test.ts' }, { testFilePath: '/schema/timezone/timezone-changed-node-tz-asia.test.ts' }, { testFilePath: '/logger/redact.test.ts' }, + { testFilePath: '/routes/flows/webhook.test.ts' }, { testFilePath: '/routes/collections/schema-cache.test.ts' }, { testFilePath: '/routes/assets/concurrency.test.ts' }, ],