Files
directus/api/src/flows.ts
Rijk van Zanten 32dd709778 Insights 2.0 (#14096)
* query function added to list

* dashboard reading query, adding to object

* typecasting of filter vals needed still

* numbers accepting strings too

* json-to-graphql-query => devD

* fixed unneeded return in list index.ts

* stitching and calling but not actually calling

* calls on panel change

* query object += new panel before dashboard save

* uuid generated in app not api

* fixed panel ids in query

* fixed the tests I just wrote

* passing the query data down!

* list showing data

* objDiff test moved to test

* metric bug fixes + data

* dashboard logic

* time series conversion started

* timeseries GQL query almost there

* query querying

* chart loading

* aggregate handling improved

* error handling for aggregate+filter errors

* removed query on empty queryObj

* maybe more error handling

* more error handling working

* improvements to erorr handling

* stitchGQL() error return type corrected

* added string fields to COUNT

* pushing up but needs work

* not an endless recursion

* its not pretty but it works.

* throws an error

* system collections supported

* refactor to solve some errors

* loading correct

* metric function fixed

* data loading but not blocking rendering

* removed redundant code.

* relational fields

* deep nesting relations

* options.precision has a default

* relational fields fix. (thanks azri)

* the limit

* limit and time series

* range has a default

* datat to workspace

* v-if

* panels loading

* workspaces dont get data anymore

* package.json

* requested changes

* loading

* get groups util

* timeseries => script setup

* list => script setup

* metric => script setup

* label => script setup

* declare optional props

* loadingPanels: only loading spinner on loading panels

* remove unneeded parseDate!!

* applyDataToPanels tests

* -.only

* remove unneeded steps

* processQuery tests

* tests

* removed unused var

* jest.config and some queryCaller tests

* one more test

* query tests

* typo

* clean up

* fix some but not all bugs

* bugs from merge fixed

* Start cleaning up 🧹

* Refactor custom input type

* Small tweaks in list index

* Cleanup imports

* Require Query object to be returned from query prop

* Tweak return statement

* Fix imports

* Cleanup metric watch effect

* Tweaks tweaks tweaks

* Don't rely on options, simplify fetch logic

* Add paths to validation errors

* [WIP] Start handling things in the store

* Rework query fetching logic into store

* Clean up data passing

* Use composition setup for insights store

* Remove outdated

* Fix missing return

* Allow batch updating in REST API

Allows sending an array of partial items to the endpoints, updating all to their own values

* Add batch update to graphql

* Start integrating edits

* Readd clear

* Add deletion

* Add duplication

* Finish create flow

* Resolve cache refresh on panel config

* Prevent warnings about component name

* Improve loading state

* Finalize dashboard overhaul

* Add auto-refresh sidebar detail

* Add efficient panel reloading

* Set/remove errors on succeeded requests

* Move options rendering to shared

* Fix wrong imports, render options in app

* Selectively reload panels with changed variables

* Ensure newly added panels don't lose data

* Only refresh panel if data query changed

* Never use empty filter object in metric query

* Add default value support to variable panel

* Centralize no-data state

* Only reload data on var change when query is altered

* Fix build

* Fix time series order

* Remove unused utils

* Remove no-longer-used logic

* Mark batch update result as non-nullable in GraphQL schema

* Interim flows fix

* Skip parsing undefined keys

* Refresh insights dashboard when discarding changes

* Don't submit primary key when updating batch

* Handle null prop field better

* Tweak panel padding

Co-authored-by: jaycammarano <jay.cammarano@gmail.com>
Co-authored-by: Azri Kahar <42867097+azrikahar@users.noreply.github.com>
Co-authored-by: ian <licitdev@gmail.com>
2022-06-27 15:26:42 -04:00

400 lines
11 KiB
TypeScript

import * as sharedExceptions from '@directus/shared/exceptions';
import {
Accountability,
Action,
ActionHandler,
FilterHandler,
Flow,
Operation,
OperationHandler,
SchemaOverview,
} from '@directus/shared/types';
import { applyOptionsData } from '@directus/shared/utils';
import fastRedact from 'fast-redact';
import { Knex } from 'knex';
import { omit } from 'lodash';
import { get } from 'micromustache';
import { schedule, validate } from 'node-cron';
import getDatabase from './database';
import emitter from './emitter';
import env from './env';
import * as exceptions from './exceptions';
import logger from './logger';
import { getMessenger } from './messenger';
import * as services from './services';
import { FlowsService } from './services';
import { ActivityService } from './services/activity';
import { RevisionsService } from './services/revisions';
import { EventHandler } from './types';
import { constructFlowTree } from './utils/construct-flow-tree';
import { getSchema } from './utils/get-schema';
import { JobQueue } from './utils/job-queue';
let flowManager: FlowManager | undefined;
const redactLogs = fastRedact({
censor: '--redacted--',
paths: ['*.headers.authorization', '*.access_token', '*.headers.cookie'],
serialize: false,
});
export function getFlowManager(): FlowManager {
if (flowManager) {
return flowManager;
}
flowManager = new FlowManager();
return flowManager;
}
type TriggerHandler = {
id: string;
events: EventHandler[];
};
const TRIGGER_KEY = '$trigger';
const ACCOUNTABILITY_KEY = '$accountability';
const LAST_KEY = '$last';
class FlowManager {
private isLoaded = false;
private operations: Record<string, OperationHandler> = {};
private triggerHandlers: TriggerHandler[] = [];
private operationFlowHandlers: Record<string, any> = {};
private webhookFlowHandlers: Record<string, any> = {};
private reloadQueue: JobQueue;
constructor() {
this.reloadQueue = new JobQueue();
const messenger = getMessenger();
messenger.subscribe('flows', (event) => {
if (event.type === 'reload') {
this.reloadQueue.enqueue(async () => {
if (this.isLoaded) {
await this.unload();
await this.load();
} else {
logger.warn('Flows have to be loaded before they can be reloaded');
}
});
}
});
}
public async initialize(): Promise<void> {
if (!this.isLoaded) {
await this.load();
}
}
public async reload(): Promise<void> {
const messenger = getMessenger();
messenger.publish('flows', { type: 'reload' });
}
public addOperation(id: string, operation: OperationHandler): void {
this.operations[id] = operation;
}
public clearOperations(): void {
this.operations = {};
}
public async runOperationFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.operationFlowHandlers)) {
logger.warn(`Couldn't find operation triggered flow with id "${id}"`);
return null;
}
const handler = this.operationFlowHandlers[id];
return handler(data, context);
}
public async runWebhookFlow(id: string, data: unknown, context: Record<string, unknown>): Promise<unknown> {
if (!(id in this.webhookFlowHandlers)) {
logger.warn(`Couldn't find webhook or manual triggered flow with id "${id}"`);
throw new exceptions.ForbiddenException();
}
const handler = this.webhookFlowHandlers[id];
return handler(data, context);
}
private async load(): Promise<void> {
const flowsService = new FlowsService({ knex: getDatabase(), schema: await getSchema() });
const flows = await flowsService.readByQuery({
filter: { status: { _eq: 'active' } },
fields: ['*', 'operations.*'],
});
const flowTrees = flows.map((flow) => constructFlowTree(flow));
for (const flow of flowTrees) {
if (flow.trigger === 'event') {
const events: string[] =
flow.options?.scope
?.map((scope: string) => {
if (['items.create', 'items.update', 'items.delete'].includes(scope)) {
return (
flow.options?.collections?.map((collection: string) => {
if (collection.startsWith('directus_')) {
const action = scope.split('.')[1];
return collection.substring(9) + '.' + action;
}
return `${collection}.${scope}`;
}) ?? []
);
}
return scope;
})
?.flat() ?? [];
if (flow.options.type === 'filter') {
const handler: FilterHandler = (payload, meta, context) =>
this.executeFlow(
flow,
{ payload, ...meta },
{
accountability: context.accountability,
database: context.database,
getSchema: context.schema ? () => context.schema : getSchema,
}
);
events.forEach((event) => emitter.onFilter(event, handler));
this.triggerHandlers.push({
id: flow.id,
events: events.map((event) => ({ type: 'filter', name: event, handler })),
});
} else if (flow.options.type === 'action') {
const handler: ActionHandler = (meta, context) =>
this.executeFlow(flow, meta, {
accountability: context.accountability,
database: getDatabase(),
getSchema: context.schema ? () => context.schema : getSchema,
});
events.forEach((event) => emitter.onAction(event, handler));
this.triggerHandlers.push({
id: flow.id,
events: events.map((event) => ({ type: 'action', name: event, handler })),
});
}
} else if (flow.trigger === 'schedule') {
if (validate(flow.options.cron)) {
const task = schedule(flow.options.cron, async () => {
try {
await this.executeFlow(flow);
} catch (error: any) {
logger.error(error);
}
});
this.triggerHandlers.push({ id: flow.id, events: [{ type: flow.trigger, task }] });
} else {
logger.warn(`Couldn't register cron trigger. Provided cron is invalid: ${flow.options.cron}`);
}
} else if (flow.trigger === 'operation') {
const handler = (data: unknown, context: Record<string, unknown>) => this.executeFlow(flow, data, context);
this.operationFlowHandlers[flow.id] = handler;
} else if (flow.trigger === 'webhook') {
const handler = (data: unknown, context: Record<string, unknown>) => {
if (flow.options.async) {
this.executeFlow(flow, data, context);
} else {
return this.executeFlow(flow, data, context);
}
};
const method = flow.options?.method ?? 'GET';
// Default return to $last for webhooks
flow.options.return = flow.options.return ?? '$last';
this.webhookFlowHandlers[`${method}-${flow.id}`] = handler;
} else if (flow.trigger === 'manual') {
const handler = (data: unknown, context: Record<string, unknown>) => {
const enabledCollections = flow.options?.collections ?? [];
const targetCollection = (data as Record<string, any>)?.body.collection;
if (!targetCollection) {
logger.warn(`Manual trigger requires "collection" to be specified in the payload`);
throw new exceptions.ForbiddenException();
}
if (enabledCollections.length === 0) {
logger.warn(`There is no collections configured for this manual trigger`);
throw new exceptions.ForbiddenException();
}
if (!enabledCollections.includes(targetCollection)) {
logger.warn(`Specified collection must be one of: ${enabledCollections.join(', ')}.`);
throw new exceptions.ForbiddenException();
}
if (flow.options.async) {
this.executeFlow(flow, data, context);
} else {
return this.executeFlow(flow, data, context);
}
};
// Default return to $last for manual
flow.options.return = '$last';
this.webhookFlowHandlers[`POST-${flow.id}`] = handler;
}
}
this.isLoaded = true;
}
private async unload(): Promise<void> {
for (const trigger of this.triggerHandlers) {
trigger.events.forEach((event) => {
switch (event.type) {
case 'filter':
emitter.offFilter(event.name, event.handler);
break;
case 'action':
emitter.offAction(event.name, event.handler);
break;
case 'schedule':
event.task.stop();
break;
}
});
}
this.triggerHandlers = [];
this.operationFlowHandlers = {};
this.webhookFlowHandlers = {};
this.isLoaded = false;
}
private async executeFlow(flow: Flow, data: unknown = null, context: Record<string, unknown> = {}): Promise<unknown> {
const database = (context.database as Knex) ?? getDatabase();
const schema = (context.schema as SchemaOverview) ?? (await getSchema({ database }));
const keyedData: Record<string, unknown> = {
[TRIGGER_KEY]: data,
[LAST_KEY]: data,
[ACCOUNTABILITY_KEY]: context?.accountability ?? null,
};
let nextOperation = flow.operation;
const steps: {
operation: string;
key: string;
status: 'resolve' | 'reject' | 'unknown';
options: Record<string, any> | null;
}[] = [];
while (nextOperation !== null) {
const { successor, data, status, options } = await this.executeOperation(nextOperation, keyedData, context);
keyedData[nextOperation.key] = data;
keyedData[LAST_KEY] = data;
steps.push({ operation: nextOperation!.id, key: nextOperation.key, status, options });
nextOperation = successor;
}
if (flow.accountability !== null) {
const activityService = new ActivityService({
knex: database,
schema: schema,
});
const accountability = context?.accountability as Accountability | undefined;
const activity = await activityService.createOne({
action: Action.RUN,
user: accountability?.user ?? null,
collection: 'directus_flows',
ip: accountability?.ip ?? null,
user_agent: accountability?.userAgent ?? null,
item: flow.id,
});
if (flow.accountability === 'all') {
const revisionsService = new RevisionsService({
knex: database,
schema: schema,
});
await revisionsService.createOne({
activity: activity,
collection: 'directus_flows',
item: flow.id,
data: {
steps: steps,
data: redactLogs(omit(keyedData, '$accountability.permissions')), // Permissions is a ton of data, and is just a copy of what's in the directus_permissions table
},
});
}
}
if (flow.options.return === '$all') {
return keyedData;
} else if (flow.options.return) {
return get(keyedData, flow.options.return);
}
return undefined;
}
private async executeOperation(
operation: Operation,
keyedData: Record<string, unknown>,
context: Record<string, unknown> = {}
): Promise<{
successor: Operation | null;
status: 'resolve' | 'reject' | 'unknown';
data: unknown;
options: Record<string, any> | null;
}> {
if (!(operation.type in this.operations)) {
logger.warn(`Couldn't find operation ${operation.type}`);
return { successor: null, status: 'unknown', data: null, options: null };
}
const handler = this.operations[operation.type];
const options = applyOptionsData(operation.options, keyedData);
try {
const result = await handler(options, {
services,
exceptions: { ...exceptions, ...sharedExceptions },
env,
database: getDatabase(),
logger,
getSchema,
data: keyedData,
accountability: null,
...context,
});
return { successor: operation.resolve, status: 'resolve', data: result ?? null, options };
} catch (error: unknown) {
return { successor: operation.reject, status: 'reject', data: error ?? null, options };
}
}
}