Files
directus/api/src/messenger.ts
Rijk van Zanten 63069bd688 Consolidate Redis environment variables (#19010)
* Remove individual redis configurations

* Use shared redis config

* Update stub

* Add changeset

* Update .changeset/grumpy-rice-film.md

Co-authored-by: Pascal Jufer <pascal-jufer@bluewin.ch>

* Remove RATE_LIMITER_REDIS_ ref in docs

* Update REDIS config in docker compose example

* Readd missing param

* Fix mention of namespace ind ocs

* Undo moving of cache store env

* Update blackbox tests & docs

* Update api/src/env.ts

Co-authored-by: ian <licitdev@gmail.com>

* Fix casing of wEbSoCkEtS

* Remove redis namespace from env check

* Add global store back in

* Tweak phrasing

* Readd supplementary note 5

* Bring back CACHE_STORE in docs / tests

* Forgot one CACHE_STORE

* Consistent order

---------

Co-authored-by: Pascal Jufer <pascal-jufer@bluewin.ch>
Co-authored-by: ian <licitdev@gmail.com>
2023-06-27 13:34:49 -04:00

87 lines
2.3 KiB
TypeScript

import { parseJSON } from '@directus/utils';
import { Redis } from 'ioredis';
import { getEnv } from './env.js';
import { getConfigFromEnv } from './utils/get-config-from-env.js';
export type MessengerSubscriptionCallback = (payload: Record<string, any>) => void;
export interface Messenger {
publish: (channel: string, payload: Record<string, any>) => void;
subscribe: (channel: string, callback: MessengerSubscriptionCallback) => void;
unsubscribe: (channel: string, callback?: MessengerSubscriptionCallback) => void;
}
export class MessengerMemory implements Messenger {
handlers: Record<string, Set<MessengerSubscriptionCallback>>;
constructor() {
this.handlers = {};
}
publish(channel: string, payload: Record<string, any>) {
this.handlers[channel]?.forEach((callback) => callback(payload));
}
subscribe(channel: string, callback: MessengerSubscriptionCallback) {
if (!this.handlers[channel]) this.handlers[channel] = new Set();
this.handlers[channel]?.add(callback);
}
unsubscribe(channel: string, callback?: MessengerSubscriptionCallback) {
if (!callback) {
delete this.handlers[channel];
} else {
this.handlers[channel]?.delete(callback);
}
}
}
export class MessengerRedis implements Messenger {
namespace: string;
pub: Redis;
sub: Redis;
constructor() {
const config = getConfigFromEnv('REDIS');
const env = getEnv();
this.pub = new Redis(env['REDIS'] ?? config);
this.sub = new Redis(env['REDIS'] ?? config);
this.namespace = env['MESSENGER_NAMESPACE'] ?? 'directus-messenger';
}
publish(channel: string, payload: Record<string, any>) {
this.pub.publish(`${this.namespace}:${channel}`, JSON.stringify(payload));
}
subscribe(channel: string, callback: MessengerSubscriptionCallback) {
this.sub.subscribe(`${this.namespace}:${channel}`);
this.sub.on('message', (messageChannel: string, payloadString: string) => {
const payload = parseJSON(payloadString);
if (messageChannel === `${this.namespace}:${channel}`) {
callback(payload);
}
});
}
unsubscribe(channel: string) {
this.sub.unsubscribe(`${this.namespace}:${channel}`);
}
}
let messenger: Messenger;
export function getMessenger() {
if (messenger) return messenger;
const env = getEnv();
if (env['MESSENGER_STORE'] === 'redis') {
messenger = new MessengerRedis();
} else {
messenger = new MessengerMemory();
}
return messenger;
}