Add cross-instance messenger pubsub setup (#13651)

* Add cross-instance messenger pubsub setup

* Rely on messenger for reload activity

* Organize imports

* Undo unrelated change

* Add messenger env var reference
This commit is contained in:
Rijk van Zanten
2022-06-01 14:22:39 -04:00
committed by GitHub
parent a98df4066d
commit f89d98130b
7 changed files with 141 additions and 11 deletions

View File

@@ -185,6 +185,7 @@
"@types/flat": "5.0.2",
"@types/fs-extra": "9.0.13",
"@types/inquirer": "8.1.3",
"@types/ioredis": "^4.28.10",
"@types/jest": "27.4.1",
"@types/js-yaml": "4.0.5",
"@types/json2csv": "5.0.3",

View File

@@ -50,7 +50,7 @@ import schema from './middleware/schema';
import { track } from './utils/track';
import { validateEnv } from './utils/validate-env';
import { validateStorage } from './utils/validate-storage';
import { register as registerWebhooks } from './webhooks';
import { init as initWebhooks } from './webhooks';
import { flushCaches } from './cache';
import { registerAuthProviders } from './auth';
import { Url } from './utils/url';
@@ -242,7 +242,7 @@ export default async function createApp(): Promise<express.Application> {
await emitter.emitInit('routes.after', { app });
// Register all webhooks
await registerWebhooks();
await initWebhooks();
track('serverStarted');

80
api/src/messenger.ts Normal file
View File

@@ -0,0 +1,80 @@
import IORedis from 'ioredis';
import env from './env';
import { getConfigFromEnv } from './utils/get-config-from-env';
import { parseJSON } from './utils/parse-json';
export type MessengerSubscriptionCallback = (payload: Record<string, any>) => void;
export interface Messenger {
publish: (channel: string, payload: Record<string, any>) => void;
subscribe: (channel: string, callback: MessengerSubscriptionCallback) => void;
unsubscribe: (channel: string) => void;
}
export class MessengerMemory implements Messenger {
handlers: Record<string, MessengerSubscriptionCallback>;
constructor() {
this.handlers = {};
}
publish(channel: string, payload: Record<string, any>) {
this.handlers[channel]?.(payload);
}
subscribe(channel: string, callback: MessengerSubscriptionCallback) {
this.handlers[channel] = callback;
}
unsubscribe(channel: string) {
delete this.handlers[channel];
}
}
export class MessengerRedis implements Messenger {
namespace: string;
pub: IORedis.Redis;
sub: IORedis.Redis;
constructor() {
const config = getConfigFromEnv('MESSENGER_REDIS');
this.pub = new IORedis(env.MESSENGER_REDIS ?? config);
this.sub = new IORedis(env.MESSENGER_REDIS ?? config);
this.namespace = env.MESSENGER_NAMESPACE ?? 'directus';
}
publish(channel: string, payload: Record<string, any>) {
this.pub.publish(`${this.namespace}:${channel}`, JSON.stringify(payload));
}
subscribe(channel: string, callback: MessengerSubscriptionCallback) {
this.sub.subscribe(`${this.namespace}:${channel}`);
this.sub.on('message', (messageChannel, payloadString) => {
const payload = parseJSON(payloadString);
if (messageChannel === `${this.namespace}:${channel}`) {
callback(payload);
}
});
}
unsubscribe(channel: string) {
this.sub.unsubscribe(`${this.namespace}:${channel}`);
}
}
let messenger: Messenger;
export function getMessenger() {
if (messenger) return messenger;
if (env.MESSENGER_STORE === 'redis') {
messenger = new MessengerRedis();
} else {
messenger = new MessengerMemory();
}
return messenger;
}

View File

@@ -1,45 +1,48 @@
import { AbstractServiceOptions, Item, PrimaryKey, Webhook, MutationOptions } from '../types';
import { register } from '../webhooks';
import { ItemsService } from './items';
import { getMessenger, Messenger } from '../messenger';
export class WebhooksService extends ItemsService<Webhook> {
messenger: Messenger;
constructor(options: AbstractServiceOptions) {
super('directus_webhooks', options);
this.messenger = getMessenger();
}
async createOne(data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.createOne(data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
async createMany(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.createMany(data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
async updateOne(key: PrimaryKey, data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.updateOne(key, data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.updateMany(keys, data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.deleteOne(key, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.deleteMany(keys, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
}

View File

@@ -6,15 +6,31 @@ import { Webhook, WebhookHeader } from './types';
import { WebhooksService } from './services';
import { getSchema } from './utils/get-schema';
import { ActionHandler } from '@directus/shared/types';
import { getMessenger } from './messenger';
let registered: { event: string; handler: ActionHandler }[] = [];
export async function register(): Promise<void> {
unregister();
export async function init(): Promise<void> {
await register();
const messenger = getMessenger();
messenger.subscribe('webhooks', (event) => {
if (event.type === 'reload') {
reload();
}
});
}
export async function reload(): Promise<void> {
unregister();
await register();
}
export async function register(): Promise<void> {
const webhookService = new WebhooksService({ knex: getDatabase(), schema: await getSchema() });
const webhooks = await webhookService.readByQuery({ filter: { status: { _eq: 'active' } } });
for (const webhook of webhooks) {
for (const action of webhook.actions) {
const event = `items.${action}`;

View File

@@ -779,6 +779,16 @@ AUTH_FACEBOOK_ICON="facebook"
| `EXTENSIONS_PATH` | Path to your local extensions folder. | `./extensions` |
| `EXTENSIONS_AUTO_RELOAD` | Automatically reload extensions when they have changed. | `false` |
## Messenger
| Variable | Description | Default Value |
| --------------------- | ------------------------------------------------- | ------------- |
| `MESSENGER_STORE` | One of `memory`, `redis`<sup>[1]</sup> | `memory` |
| `MESSENGER_NAMESPACE` | How to scope the channels in Redis | `directus` |
| `MESSENGER_REDIS_*` | The Redis configuration for the pubsub connection | -- |
<sup>[1]</sup> `redis` should be used in load-balanced installations of Directus
## Email
| Variable | Description | Default Value |

20
package-lock.json generated
View File

@@ -155,6 +155,7 @@
"@types/flat": "5.0.2",
"@types/fs-extra": "9.0.13",
"@types/inquirer": "8.1.3",
"@types/ioredis": "^4.28.10",
"@types/jest": "27.4.1",
"@types/js-yaml": "4.0.5",
"@types/json2csv": "5.0.3",
@@ -13089,6 +13090,15 @@
"rxjs": "^7.2.0"
}
},
"node_modules/@types/ioredis": {
"version": "4.28.10",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.28.10.tgz",
"integrity": "sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==",
"dev": true,
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/istanbul-lib-coverage": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.4.tgz",
@@ -64522,6 +64532,15 @@
"rxjs": "^7.2.0"
}
},
"@types/ioredis": {
"version": "4.28.10",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.28.10.tgz",
"integrity": "sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==",
"dev": true,
"requires": {
"@types/node": "*"
}
},
"@types/istanbul-lib-coverage": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/@types/istanbul-lib-coverage/-/istanbul-lib-coverage-2.0.4.tgz",
@@ -71792,6 +71811,7 @@
"@types/flat": "5.0.2",
"@types/fs-extra": "9.0.13",
"@types/inquirer": "8.1.3",
"@types/ioredis": "*",
"@types/jest": "27.4.1",
"@types/js-yaml": "4.0.5",
"@types/json2csv": "5.0.3",