Add synchronization across horizontally scaled instances to schedule flows and hooks (#18584)

* Add synchronization to schedule flows and hooks

Fixes #15052

* Add changeset

* Add test

* Add to sequential list

* Fix spelling in changeset

---------

Co-authored-by: Pascal Jufer <pascal-jufer@bluewin.ch>
Co-authored-by: ian <licitdev@gmail.com>
This commit is contained in:
Nicola Krumschmidt
2023-05-24 23:16:42 +02:00
committed by GitHub
parent 8401cd5a8e
commit 60be3c2b40
15 changed files with 596 additions and 82 deletions

View File

@@ -98,6 +98,7 @@
"content-disposition": "0.5.4",
"cookie-parser": "1.4.6",
"cors": "2.8.5",
"cron-parser": "4.8.1",
"csv-parser": "3.0.0",
"date-fns": "2.30.0",
"deep-diff": "1.0.2",
@@ -131,8 +132,8 @@
"mime-types": "2.1.35",
"ms": "2.1.3",
"nanoid": "4.0.2",
"node-cron": "3.0.2",
"node-machine-id": "1.1.12",
"node-schedule": "2.1.1",
"nodemailer": "6.9.2",
"object-hash": "3.0.0",
"openapi3-ts": "3.2.0",
@@ -187,8 +188,8 @@
"@types/marked": "4.3.0",
"@types/mime-types": "2.1.1",
"@types/ms": "0.7.31",
"@types/node-schedule": "2.1.0",
"@types/node": "18.16.12",
"@types/node-cron": "3.0.7",
"@types/nodemailer": "6.4.7",
"@types/object-hash": "3.0.2",
"@types/qs": "6.9.7",

View File

@@ -161,6 +161,13 @@ const allowedEnvironmentVars = [
'MESSENGER_REDIS_HOST',
'MESSENGER_REDIS_PORT',
'MESSENGER_REDIS_PASSWORD',
// synchronization
'SYNCHRONIZATION_STORE',
'SYNCHRONIZATION_NAMESPACE',
'SYNCHRONIZATION_REDIS',
'SYNCHRONIZATION_REDIS_HOST',
'SYNCHRONIZATION_REDIS_PORT',
'SYNCHRONIZATION_REDIS_PASSWORD',
// emails
'EMAIL_FROM',
'EMAIL_TRANSPORT',

View File

@@ -39,7 +39,6 @@ import virtualDefault from '@rollup/plugin-virtual';
import chokidar, { FSWatcher } from 'chokidar';
import express, { Router } from 'express';
import { clone, escapeRegExp } from 'lodash-es';
import { schedule, validate } from 'node-cron';
import { readdir } from 'node:fs/promises';
import { createRequire } from 'node:module';
import { dirname } from 'node:path';
@@ -57,6 +56,7 @@ import type { EventHandler } from './types/index.js';
import getModuleDefault from './utils/get-module-default.js';
import { getSchema } from './utils/get-schema.js';
import { JobQueue } from './utils/job-queue.js';
import { scheduleSynchronizedJob, validateCron } from './utils/schedule.js';
import { Url } from './utils/url.js';
// Workaround for https://github.com/rollup/plugins/issues/1329
@@ -281,7 +281,7 @@ class ExtensionManager {
}
private async unload(): Promise<void> {
this.unregisterApiExtensions();
await this.unregisterApiExtensions();
this.apiEmitter.offAll();
@@ -434,7 +434,7 @@ class ExtensionManager {
const config = getModuleDefault(hookInstance);
this.registerHook(config);
this.registerHook(config, hook.name);
this.apiExtensions.push({ path: hookPath });
} catch (error: any) {
@@ -517,8 +517,8 @@ class ExtensionManager {
const configs = getModuleDefault(bundleInstances);
for (const { config } of configs.hooks) {
this.registerHook(config);
for (const { config, name } of configs.hooks) {
this.registerHook(config, name);
}
for (const { config, name } of configs.endpoints) {
@@ -537,7 +537,9 @@ class ExtensionManager {
}
}
private registerHook(register: HookConfig): void {
private registerHook(register: HookConfig, name: string): void {
let scheduleIndex = 0;
const registerFunctions = {
filter: (event: string, handler: FilterHandler) => {
emitter.onFilter(event, handler);
@@ -567,8 +569,8 @@ class ExtensionManager {
});
},
schedule: (cron: string, handler: ScheduleHandler) => {
if (validate(cron)) {
const task = schedule(cron, async () => {
if (validateCron(cron)) {
const job = scheduleSynchronizedJob(`${name}:${scheduleIndex}`, cron, async () => {
if (this.options.schedule) {
try {
await handler();
@@ -578,9 +580,11 @@ class ExtensionManager {
}
});
scheduleIndex++;
this.hookEvents.push({
type: 'schedule',
task,
job,
});
} else {
logger.warn(`Couldn't register cron hook. Provided cron is invalid: ${cron}`);
@@ -639,7 +643,7 @@ class ExtensionManager {
flowManager.addOperation(config.id, config.handler);
}
private unregisterApiExtensions(): void {
private async unregisterApiExtensions(): Promise<void> {
for (const event of this.hookEvents) {
switch (event.type) {
case 'filter':
@@ -652,7 +656,7 @@ class ExtensionManager {
emitter.offInit(event.name, event.handler);
break;
case 'schedule':
event.task.stop();
await event.job.stop();
break;
}
}

View File

@@ -13,7 +13,6 @@ import { applyOptionsData, isValidJSON, parseJSON, toArray } from '@directus/uti
import type { Knex } from 'knex';
import { omit, pick } from 'lodash-es';
import { get } from 'micromustache';
import { schedule, validate } from 'node-cron';
import getDatabase from './database/index.js';
import emitter from './emitter.js';
import env from './env.js';
@@ -31,6 +30,7 @@ import { JobQueue } from './utils/job-queue.js';
import { mapValuesDeep } from './utils/map-values-deep.js';
import { redact } from './utils/redact.js';
import { sanitizeError } from './utils/sanitize-error.js';
import { scheduleSynchronizedJob, validateCron } from './utils/schedule.js';
let flowManager: FlowManager | undefined;
@@ -200,8 +200,8 @@ class FlowManager {
});
}
} else if (flow.trigger === 'schedule') {
if (validate(flow.options['cron'])) {
const task = schedule(flow.options['cron'], async () => {
if (validateCron(flow.options['cron'])) {
const job = scheduleSynchronizedJob(flow.id, flow.options['cron'], async () => {
try {
await this.executeFlow(flow);
} catch (error: any) {
@@ -209,7 +209,7 @@ class FlowManager {
}
});
this.triggerHandlers.push({ id: flow.id, events: [{ type: flow.trigger, task }] });
this.triggerHandlers.push({ id: flow.id, events: [{ type: flow.trigger, job }] });
} else {
logger.warn(`Couldn't register cron trigger. Provided cron is invalid: ${flow.options['cron']}`);
}
@@ -279,7 +279,7 @@ class FlowManager {
private async unload(): Promise<void> {
for (const trigger of this.triggerHandlers) {
trigger.events.forEach((event) => {
for (const event of trigger.events) {
switch (event.type) {
case 'filter':
emitter.offFilter(event.name, event.handler);
@@ -288,10 +288,10 @@ class FlowManager {
emitter.offAction(event.name, event.handler);
break;
case 'schedule':
event.task.stop();
await event.job.stop();
break;
}
});
}
}
this.triggerHandlers = [];

165
api/src/synchronization.ts Normal file
View File

@@ -0,0 +1,165 @@
import { Redis } from 'ioredis';
import env from './env.js';
import { getConfigFromEnv } from './utils/get-config-from-env.js';
interface SynchronizationManager {
set(key: string, value: string | number): Promise<void>;
get(key: string): Promise<string | null>;
delete(key: string): Promise<void>;
exists(key: string): Promise<boolean>;
setGreaterThan(key: string, value: number): Promise<boolean>;
}
let synchronizationManager: SynchronizationManager;
function getSynchronizationManager() {
if (synchronizationManager) return synchronizationManager;
if (env['SYNCHRONIZATION_STORE'] === 'redis') {
synchronizationManager = new SynchronizationManagerRedis();
} else {
synchronizationManager = new SynchronizationManagerMemory();
}
return synchronizationManager;
}
class SynchronizationManagerMemory implements SynchronizationManager {
private store: Record<string, string>;
constructor() {
this.store = {};
}
public async set(key: string, value: string | number): Promise<void> {
this.setSync(key, value);
}
public async get(key: string): Promise<string | null> {
return this.getSync(key);
}
public async delete(key: string): Promise<void> {
this.deleteSync(key);
}
public async exists(key: string): Promise<boolean> {
return this.existsSync(key);
}
public async setGreaterThan(key: string, value: number): Promise<boolean> {
if (this.existsSync(key)) {
const oldValue = Number(this.getSync(key));
if (value <= oldValue) {
return false;
}
}
this.setSync(key, value);
return true;
}
private setSync(key: string, value: string | number): void {
this.store[key] = String(value);
}
private getSync(key: string): string | null {
return this.store[key] ?? null;
}
private deleteSync(key: string): void {
delete this.store[key];
}
private existsSync(key: string): boolean {
return key in this.store;
}
}
const SET_GREATER_THAN_SCRIPT = `
local key = KEYS[1]
local value = tonumber(ARGV[1])
if redis.call("EXISTS", key) == 1 then
local oldValue = tonumber(redis.call('GET', key))
if value <= oldValue then
return false
end
end
redis.call('SET', key, value)
return true
`;
class SynchronizationManagerRedis implements SynchronizationManager {
private namespace: string;
private client: Redis;
constructor() {
const config = getConfigFromEnv('SYNCHRONIZATION_REDIS');
this.client = new Redis(env['SYNCHRONIZATION_REDIS'] ?? config);
this.namespace = env['SYNCHRONIZATION_NAMESPACE'] ?? 'directus';
this.client.defineCommand('setGreaterThan', {
numberOfKeys: 1,
lua: SET_GREATER_THAN_SCRIPT,
});
}
public async set(key: string, value: string | number): Promise<void> {
await this.client.set(this.getNamespacedKey(key), value);
}
public get(key: string): Promise<string | null> {
return this.client.get(this.getNamespacedKey(key));
}
public async delete(key: string): Promise<void> {
await this.client.del(this.getNamespacedKey(key));
}
public async exists(key: string): Promise<boolean> {
const doesExist = await this.client.exists(this.getNamespacedKey(key));
return doesExist === 1;
}
public async setGreaterThan(key: string, value: number): Promise<boolean> {
const client = this.client as Redis & {
setGreaterThan(key: string, value: number): Promise<number>;
};
const wasSet = await client.setGreaterThan(this.getNamespacedKey(key), value);
return wasSet === 1;
}
private getNamespacedKey(key: string): string {
return `${this.namespace}:${key}`;
}
}
export class SynchronizedClock {
private key: string;
private synchronizationManager: SynchronizationManager;
constructor(id: string) {
this.key = `clock:${id}`;
this.synchronizationManager = getSynchronizationManager();
}
public async set(timestamp: number): Promise<boolean> {
const wasSet = await this.synchronizationManager.setGreaterThan(this.key, timestamp);
return wasSet;
}
public async reset(): Promise<void> {
await this.synchronizationManager.delete(this.key);
}
}

View File

@@ -1,8 +1,8 @@
import type { ActionHandler, FilterHandler, InitHandler } from '@directus/types';
import type { ScheduledTask } from 'node-cron';
import type { ScheduledJob } from '../utils/schedule.js';
export type EventHandler =
| { type: 'filter'; name: string; handler: FilterHandler }
| { type: 'action'; name: string; handler: ActionHandler }
| { type: 'init'; name: string; handler: InitHandler }
| { type: 'schedule'; task: ScheduledTask };
| { type: 'schedule'; job: ScheduledJob };

43
api/src/utils/schedule.ts Normal file
View File

@@ -0,0 +1,43 @@
import cron from 'cron-parser';
import schedule from 'node-schedule';
import { SynchronizedClock } from '../synchronization.js';
export interface ScheduledJob {
stop(): Promise<void>;
}
export function validateCron(rule: string): boolean {
try {
cron.parseExpression(rule);
} catch {
return false;
}
return true;
}
export function scheduleSynchronizedJob(
id: string,
rule: string,
cb: (fireDate: Date) => void | Promise<void>
): ScheduledJob {
const clock = new SynchronizedClock(`${id}:${rule}`);
const job = schedule.scheduleJob(rule, async (fireDate) => {
const nextTimestamp = job.nextInvocation().getTime();
const wasSet = await clock.set(nextTimestamp);
if (wasSet) {
await cb(fireDate);
}
});
const stop = async () => {
job.cancel();
await clock.reset();
};
return { stop };
}