Implement event hooks for notifications

* Implemented a generic notification manager with several event types
* Implemented discord notification provider
This commit is contained in:
FoxxMD
2021-07-27 12:24:36 -04:00
parent e4f18e8f06
commit 6b6124d76e
10 changed files with 552 additions and 44 deletions

28
package-lock.json generated
View File

@@ -39,6 +39,7 @@
"socket.io": "^4.1.3",
"tcp-port-used": "^1.0.2",
"typescript": "^4.3.4",
"webhook-discord": "^3.7.7",
"winston": "FoxxMD/winston#fbab8de969ecee578981c77846156c7f43b5f01e",
"winston-daily-rotate-file": "^4.5.5",
"zlib": "^1.0.5"
@@ -2764,6 +2765,12 @@
"is-arrayish": "^0.3.1"
}
},
"node_modules/snekfetch": {
"version": "3.6.4",
"resolved": "https://registry.npmjs.org/snekfetch/-/snekfetch-3.6.4.tgz",
"integrity": "sha512-NjxjITIj04Ffqid5lqr7XdgwM7X61c/Dns073Ly170bPQHLm6jkmelye/eglS++1nfTWktpP6Y2bFXjdPlQqdw==",
"deprecated": "use node-fetch instead"
},
"node_modules/snoostorm": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/snoostorm/-/snoostorm-1.5.2.tgz",
@@ -3365,6 +3372,14 @@
"extsprintf": "^1.2.0"
}
},
"node_modules/webhook-discord": {
"version": "3.7.7",
"resolved": "https://registry.npmjs.org/webhook-discord/-/webhook-discord-3.7.7.tgz",
"integrity": "sha512-qlEucVfun4ZzIhIJGQt8g9ytlSMpnaXiZeOiJCuuGUSSnkmBW5oFFcJHxN0KzC3VgUI0Brkl/i5X0Wf/ljKVXQ==",
"dependencies": {
"snekfetch": "^3.6.4"
}
},
"node_modules/which": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz",
@@ -5731,6 +5746,11 @@
"is-arrayish": "^0.3.1"
}
},
"snekfetch": {
"version": "3.6.4",
"resolved": "https://registry.npmjs.org/snekfetch/-/snekfetch-3.6.4.tgz",
"integrity": "sha512-NjxjITIj04Ffqid5lqr7XdgwM7X61c/Dns073Ly170bPQHLm6jkmelye/eglS++1nfTWktpP6Y2bFXjdPlQqdw=="
},
"snoostorm": {
"version": "1.5.2",
"resolved": "https://registry.npmjs.org/snoostorm/-/snoostorm-1.5.2.tgz",
@@ -6191,6 +6211,14 @@
"extsprintf": "^1.2.0"
}
},
"webhook-discord": {
"version": "3.7.7",
"resolved": "https://registry.npmjs.org/webhook-discord/-/webhook-discord-3.7.7.tgz",
"integrity": "sha512-qlEucVfun4ZzIhIJGQt8g9ytlSMpnaXiZeOiJCuuGUSSnkmBW5oFFcJHxN0KzC3VgUI0Brkl/i5X0Wf/ljKVXQ==",
"requires": {
"snekfetch": "^3.6.4"
}
},
"which": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz",

View File

@@ -56,6 +56,7 @@
"socket.io": "^4.1.3",
"tcp-port-used": "^1.0.2",
"typescript": "^4.3.4",
"webhook-discord": "^3.7.7",
"winston": "FoxxMD/winston#fbab8de969ecee578981c77846156c7f43b5f01e",
"winston-daily-rotate-file": "^4.5.5",
"zlib": "^1.0.5"

View File

@@ -59,24 +59,6 @@ export class App {
depletedInSecs: number = 0;
constructor(config: OperatorConfig) {
// const {
// subreddits = [],
// clientId = process.env.CLIENT_ID,
// clientSecret = process.env.CLIENT_SECRET,
// accessToken = process.env.ACCESS_TOKEN,
// refreshToken = process.env.REFRESH_TOKEN,
// wikiConfig = process.env.WIKI_CONFIG || 'botconfig/contextbot',
// snooDebug = process.env.SNOO_DEBUG || false,
// dryRun = process.env.DRYRUN || false,
// heartbeat = process.env.HEARTBEAT || 300,
// apiLimitWarning = process.env.API_REMAINING || 250,
// version,
// authorTTL = process.env.AUTHOR_TTL || 10000,
// disableCache = process.env.DISABLE_CACHE || false,
// proxy = process.env.PROXY,
// sharedModqueue = false,
// } = options;
const {
subreddits: {
names = [],
@@ -166,6 +148,11 @@ export class App {
if(shouldRetry) {
defaultUnmoderatedStream.startInterval();
} else {
for(const m of this.subManagers) {
if(m.modStreamCallbacks.size > 0) {
m.notificationManager.handle('runStateChanged', `${name.toUpperCase()} Polling Stopped`, 'Encountered too many errors from Reddit while polling. Will try to restart on next heartbeat.');
}
}
this.logger.error(`Mod stream ${name.toUpperCase()} encountered too many errors while polling. Will try to restart on next heartbeat.`);
}
}
@@ -178,6 +165,16 @@ export class App {
defaultModqueueStream.on('error', modStreamErrorListener('modqueue'));
CacheManager.modStreams.set('unmoderated', defaultUnmoderatedStream);
CacheManager.modStreams.set('modqueue', defaultModqueueStream);
const onTerm = () => {
for(const m of this.subManagers) {
m.notificationManager.handle('runStateChanged', 'Application Shutdown', 'The application was shutdown unexpectedly due to an error.');
}
}
process.on('SIGTERM', () => {
onTerm();
});
}
async testClient() {
@@ -240,7 +237,7 @@ export class App {
for (const sub of subsToRun) {
const manager = new Manager(sub, this.client, this.logger, {dryRun: this.dryRun, sharedModqueue: this.sharedModqueue});
try {
await manager.parseConfiguration('system', true);
await manager.parseConfiguration('system', true, {suppressNotification: true});
} catch (err) {
if (!(err instanceof LoggedError)) {
this.logger.error(`Config was not valid:`, {subreddit: sub.display_name_prefixed});
@@ -269,11 +266,11 @@ export class App {
const newConfig = await s.parseConfiguration();
if(newConfig || (s.queueState.state !== RUNNING && s.queueState.causedBy === SYSTEM))
{
await s.startQueue();
await s.startQueue('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running queue'});
}
if(newConfig || (s.eventsState.state !== RUNNING && s.eventsState.causedBy === SYSTEM))
{
await s.startEvents();
await s.startEvents('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running events'});
}
if(s.botState.state !== RUNNING && s.eventsState.state === RUNNING && s.queueState.state === RUNNING) {
s.botState = {
@@ -283,7 +280,7 @@ export class App {
}
} catch (err) {
this.logger.info('Stopping event polling to prevent activity processing queue from backing up. Will be restarted when config update succeeds.')
await s.stopEvents();
await s.stopEvents('system', {reason: 'Invalid config will cause events to pile up in queue. Will be restarted when config update succeeds (next heartbeat).'});
if(!(err instanceof LoggedError)) {
this.logger.error(err, {subreddit: s.displayLabel});
}
@@ -292,7 +289,7 @@ export class App {
}
}
}
await this.runModStreams();
await this.runModStreams(true);
}
} catch (err) {
this.logger.error('Error occurred during heartbeat', err);
@@ -303,11 +300,18 @@ export class App {
}
}
async runModStreams() {
async runModStreams(notify = false) {
for(const [k,v] of CacheManager.modStreams) {
if(!v.running && v.listeners('item').length > 0) {
v.startInterval();
this.logger.info(`Starting default ${k.toUpperCase()} mod stream`);
if(notify) {
for(const m of this.subManagers) {
if(m.modStreamCallbacks.size > 0) {
m.notificationManager.handle('runStateChanged', `${k.toUpperCase()} Polling Started`, 'Polling was successfully restarted on heartbeat.');
}
}
}
}
}
}
@@ -318,7 +322,7 @@ export class App {
}
for (const manager of this.subManagers) {
if (manager.validConfigLoaded && manager.botState.state !== RUNNING) {
await manager.start();
await manager.start('system', {reason: 'Caused by application startup'});
}
}
@@ -382,6 +386,7 @@ export class App {
for(const m of this.subManagers) {
m.pauseEvents('system');
m.notificationManager.handle('runStateChanged', 'Hard Limit Triggered', `Hard Limit of ${this.hardLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit event polling has been paused.`, 'system', 'warn');
}
this.nannyMode = 'hard';
@@ -422,12 +427,14 @@ export class App {
for(const m of offenders) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
}
} else {
this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
}
}
this.nannyMode = 'soft';
@@ -437,12 +444,15 @@ export class App {
if(this.nannyMode !== undefined) {
this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.delayBy = undefined;
if(m.delayBy !== undefined) {
m.delayBy = undefined;
m.notificationManager.handle('runStateChanged', 'Normal Processing Resumed', 'Slow Mode has been turned off due to better API conditions', 'system');
}
if(m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) {
m.startQueue();
m.startQueue('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
if(m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) {
await m.startEvents();
await m.startEvents('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
}
this.nannyMode = undefined;

View File

@@ -45,6 +45,7 @@ export class Check implements ICheck {
exclude: AuthorCriteria[]
};
dryRun?: boolean;
notifyOnTrigger: boolean;
resources: SubredditResources;
constructor(options: CheckOptions) {
@@ -54,6 +55,7 @@ export class Check implements ICheck {
condition = 'AND',
rules = [],
actions = [],
notifyOnTrigger = false,
subredditName,
itemIs = [],
authorIs: {
@@ -71,6 +73,7 @@ export class Check implements ICheck {
this.name = name;
this.description = description;
this.notifyOnTrigger = notifyOnTrigger;
this.condition = condition;
this.itemIs = itemIs;
this.authorIs = {
@@ -139,7 +142,7 @@ export class Check implements ICheck {
}
runStats.push(`${this.actions.length} Actions`);
// not sure if this should be info or verbose
this.logger.info(`${type.toUpperCase()} (${this.condition}) => ${runStats.join(' | ')}${this.description !== undefined ? ` => ${this.description}` : ''}`);
this.logger.info(`${type.toUpperCase()} (${this.condition})${this.notifyOnTrigger ? ' ||Notify on Trigger|| ' : ''} => ${runStats.join(' | ')}${this.description !== undefined ? ` => ${this.description}` : ''}`);
if (this.rules.length === 0 && this.itemIs.length === 0 && this.authorIs.exclude.length === 0 && this.authorIs.include.length === 0) {
this.logger.warn('No rules, item tests, or author test found -- this check will ALWAYS PASS!');
}
@@ -300,6 +303,7 @@ export interface CheckOptions extends ICheck {
actions: ActionConfig[]
logger: Logger
subredditName: string
notifyOnTrigger?: boolean
}
export interface CheckJson extends ICheck {
@@ -327,6 +331,13 @@ export interface CheckJson extends ICheck {
* @examples [[{"kind": "comment", "content": "this is the content of the comment", "distinguish": true}, {"kind": "lock"}]]
* */
actions: Array<ActionTypeJson>
/**
* If notifications are configured and this is `true` then an `eventActioned` event will be sent when this check is triggered.
*
* @default false
* */
notifyOnTrigger?: boolean,
}
export interface SubmissionCheckJson extends CheckJson {

View File

@@ -507,6 +507,8 @@ export interface ManagerOptions {
* @example ["shortName"]
* */
nickname?: string
notifications?: NotificationConfig
}
/**
@@ -652,6 +654,51 @@ export interface CacheOptions {
max?: number
}
export type NotificationProvider = 'discord';
export type NotificationEventType = 'runStateChanged' | 'pollingError' | 'eventActioned' | 'configUpdated'
export interface NotificationProviderConfig {
name: string
type: NotificationProvider
}
export interface DiscordProviderConfig extends NotificationProviderConfig {
url: string
}
export type NotificationProviders = DiscordProviderConfig;
export interface NotificationEventConfig {
types: NotificationEventType[]
providers: string[]
}
export interface NotificationContent {
logLevel?: string
title: string
body?: string
footer?: string
}
export type NotificationEvents = (NotificationEventType[] | NotificationEventConfig)[];
export interface NotificationConfig {
providers: NotificationProviders[],
events: NotificationEvents
}
export interface Notifier {
name: string
type: string;
handle: Function
}
export interface ManagerStateChangeOption {
reason?: string
suppressNotification?: boolean
}
export interface OperatorJsonConfig {
operator?: {
name?: string,
@@ -664,6 +711,7 @@ export interface OperatorJsonConfig {
accessToken?: string,
refreshToken?: string
},
notifications?: NotificationConfig
logging?: {
level?: LogLevel,
path?: string,
@@ -736,6 +784,7 @@ export interface OperatorConfig extends OperatorJsonConfig {
accessToken?: string,
refreshToken?: string
},
notifications?: NotificationConfig
logging: {
level: LogLevel,
path?: string,

View File

@@ -0,0 +1,46 @@
import webhook from 'webhook-discord';
import {NotificationContent} from "../Common/interfaces";
class DiscordNotifier {
name: string
type: string = 'Discord';
url: string;
constructor(name: string, url: string) {
this.name = name;
this.url = url;
}
handle(val: NotificationContent) {
const h = new webhook.Webhook(this.url);
const hook = new webhook.MessageBuilder();
const {logLevel, title, footer, body = ''} = val;
hook.setName('RCB')
.setTitle(title)
.setDescription(body)
if (footer !== undefined) {
// @ts-ignore
hook.setFooter(footer, false);
}
switch (logLevel) {
case 'error':
hook.setColor("##ff0000");
break;
case 'warn':
hook.setColor("#ffe900");
break;
default:
hook.setColor("#00fffa");
break;
}
h.send(hook);
}
}
export default DiscordNotifier;

View File

@@ -0,0 +1,120 @@
import {
NotificationConfig,
NotificationEventConfig,
NotificationEvents,
NotificationEventType,
Notifier
} from "../Common/interfaces";
import DiscordNotifier from "./DiscordNotifier";
import {Logger} from "winston";
import {mergeArr} from "../util";
import Subreddit from "snoowrap/dist/objects/Subreddit";
class NotificationManager {
notifiers: Notifier[] = [];
events: NotificationEvents = [];
logger: Logger;
subreddit: Subreddit;
name: string;
constructor(logger: Logger, subreddit: Subreddit, displayName: string, config?: NotificationConfig) {
this.logger = logger.child({leaf: 'Notifications'}, mergeArr);
this.subreddit = subreddit;
this.name = displayName;
if (config !== undefined) {
const {events = [], providers = []} = config;
this.events = events;
for (const p of providers) {
switch (p.type) {
case 'discord':
this.notifiers.push(new DiscordNotifier(p.name, p.url));
break;
default:
this.logger.warn(`Notification provider type of ${p.type} not recognized.`);
break;
}
}
if (this.events.length > 0 && this.notifiers.length === 0) {
this.logger.warn(`Config specified ${this.events.length} event hooks but not notification providers were setup!`);
}
}
}
getStats() {
let notifiers: string[] = [];
if (this.notifiers.length > 0) {
notifiers = this.notifiers.map(x => `${x.name} (${x.type})`);
}
let events: string[] = [];
if (this.events.length > 0) {
events = this.events.reduce((acc: string[], curr) => {
const e = Array.isArray(curr) ? curr : curr.types;
for (const ev of e) {
if (!acc.includes(ev)) {
acc.push(ev);
}
}
return acc;
}, []);
}
return {
notifiers,
events,
}
}
handle(name: NotificationEventType, title: string, body?: string, causedBy?: string, logLevel?: string) {
if (this.notifiers.length === 0 || this.events.length === 0) {
return;
}
let notifiers: Notifier[] = [];
for (const e of this.events) {
// array of event NotificationEventType
if (Array.isArray(e)) {
const ev = e as NotificationEventType[];
for (const v of ev) {
if (v === name) {
// if we find the event here then we want to sent the event to all configured notifiers
notifiers = notifiers.concat(this.notifiers);
}
}
} else {
// e is a NotificationEventConfig
const ev = e as NotificationEventConfig;
const hasEvent = ev.types.some(x => x === name);
if (hasEvent) {
const p = ev.providers.map(y => y.toLowerCase());
const validNotifiers = this.notifiers.filter(x => p.includes(x.name.toLowerCase()));
notifiers = notifiers.concat(validNotifiers);
}
}
}
// remove dups
notifiers = notifiers.reduce((acc: Notifier[], curr: Notifier) => {
if (!acc.some(x => x.name === curr.name)) {
return acc.concat(curr);
}
return acc;
}, []);
let footer = [];
if (causedBy !== undefined) {
footer.push(`* Performed by "${causedBy}"`);
}
footer.push(`* Notification triggered by "${name}"`);
for (const n of notifiers) {
n.handle({
title: `${title} (${this.name})`,
body: body || '',
footer: footer.length > 0 ? footer.join('\n') : undefined,
logLevel
});
}
}
}
export default NotificationManager;

View File

@@ -924,6 +924,11 @@
"pattern": "^[a-zA-Z]([\\w -]*[\\w])?$",
"type": "string"
},
"notifyOnTrigger": {
"default": false,
"description": "If notifications are configured and this is `true` then an `eventActioned` event will be sent when this check is triggered.",
"type": "boolean"
},
"rules": {
"description": "A list of Rules to run.\n\nIf `Rule` objects are triggered based on `condition` then `actions` will be performed.\n\nCan be `Rule`, `RuleSet`, or the `name` of any **named** `Rule` in your subreddit's configuration.\n\n**If `rules` is an empty array or not present then `actions` are performed immediately.**",
"items": {
@@ -1003,6 +1008,28 @@
},
"type": "object"
},
"DiscordProviderConfig": {
"properties": {
"name": {
"type": "string"
},
"type": {
"enum": [
"discord"
],
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"name",
"type",
"url"
],
"type": "object"
},
"DurationObject": {
"additionalProperties": false,
"description": "A [Day.js duration object](https://day.js.org/docs/en/durations/creating)",
@@ -1353,6 +1380,70 @@
],
"type": "object"
},
"NotificationConfig": {
"properties": {
"events": {
"items": {
"anyOf": [
{
"$ref": "#/definitions/NotificationEventConfig"
},
{
"items": {
"enum": [
"configUpdated",
"eventActioned",
"pollingError",
"runStateChanged"
],
"type": "string"
},
"type": "array"
}
]
},
"type": "array"
},
"providers": {
"items": {
"$ref": "#/definitions/DiscordProviderConfig"
},
"type": "array"
}
},
"required": [
"events",
"providers"
],
"type": "object"
},
"NotificationEventConfig": {
"properties": {
"providers": {
"items": {
"type": "string"
},
"type": "array"
},
"types": {
"items": {
"enum": [
"configUpdated",
"eventActioned",
"pollingError",
"runStateChanged"
],
"type": "string"
},
"type": "array"
}
},
"required": [
"providers",
"types"
],
"type": "object"
},
"PollingOptions": {
"description": "A configuration for where, how, and when to poll Reddit for Activities to process",
"examples": [
@@ -2180,6 +2271,11 @@
"pattern": "^[a-zA-Z]([\\w -]*[\\w])?$",
"type": "string"
},
"notifyOnTrigger": {
"default": false,
"description": "If notifications are configured and this is `true` then an `eventActioned` event will be sent when this check is triggered.",
"type": "boolean"
},
"rules": {
"description": "A list of Rules to run.\n\nIf `Rule` objects are triggered based on `condition` then `actions` will be performed.\n\nCan be `Rule`, `RuleSet`, or the `name` of any **named** `Rule` in your subreddit's configuration.\n\n**If `rules` is an empty array or not present then `actions` are performed immediately.**",
"items": {
@@ -2492,6 +2588,9 @@
"nickname": {
"type": "string"
},
"notifications": {
"$ref": "#/definitions/NotificationConfig"
},
"polling": {
"default": [
[

View File

@@ -38,6 +38,92 @@
],
"type": "string"
},
"DiscordProviderConfig": {
"properties": {
"name": {
"type": "string"
},
"type": {
"enum": [
"discord"
],
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"name",
"type",
"url"
],
"type": "object"
},
"NotificationConfig": {
"properties": {
"events": {
"items": {
"anyOf": [
{
"$ref": "#/definitions/NotificationEventConfig"
},
{
"items": {
"enum": [
"configUpdated",
"eventActioned",
"pollingError",
"runStateChanged"
],
"type": "string"
},
"type": "array"
}
]
},
"type": "array"
},
"providers": {
"items": {
"$ref": "#/definitions/DiscordProviderConfig"
},
"type": "array"
}
},
"required": [
"events",
"providers"
],
"type": "object"
},
"NotificationEventConfig": {
"properties": {
"providers": {
"items": {
"type": "string"
},
"type": "array"
},
"types": {
"items": {
"enum": [
"configUpdated",
"eventActioned",
"pollingError",
"runStateChanged"
],
"type": "string"
},
"type": "array"
}
},
"required": [
"providers",
"types"
],
"type": "object"
},
"PollingDefaults": {
"properties": {
"delayUntil": {
@@ -170,6 +256,9 @@
},
"type": "object"
},
"notifications": {
"$ref": "#/definitions/NotificationConfig"
},
"operator": {
"properties": {
"display": {

View File

@@ -15,7 +15,7 @@ import {ConfigBuilder, buildPollingOptions} from "../ConfigBuilder";
import {
DEFAULT_POLLING_INTERVAL,
DEFAULT_POLLING_LIMIT, Invokee,
ManagerOptions, PAUSED,
ManagerOptions, ManagerStateChangeOption, PAUSED,
PollingOptionsStrong, RUNNING, RunState, STOPPED, SYSTEM, USER
} from "../Common/interfaces";
import Submission from "snoowrap/dist/objects/Submission";
@@ -34,6 +34,7 @@ import Action from "../Action";
import {queue, QueueObject} from 'async';
import {JSONConfig} from "../JsonConfig";
import {CheckStructuredJson} from "../Check";
import NotificationManager from "../Notification/NotificationManager";
export interface RunningState {
state: RunState,
@@ -98,6 +99,8 @@ export class Manager {
causedBy: SYSTEM
}
notificationManager!: NotificationManager;
// use by api nanny to slow event consumption
delayBy?: number;
@@ -262,7 +265,8 @@ export class Manager {
caching,
dryRun,
footer,
nickname
nickname,
notifications,
} = configManagerOpts || {};
this.pollOptions = buildPollingOptions(polling);
this.dryRun = this.globalDryRun || dryRun;
@@ -278,6 +282,12 @@ export class Manager {
this.logger.info(`Polling Info => ${pollingInfo(p)}`)
}
this.notificationManager = new NotificationManager(this.logger, this.subreddit, this.displayLabel, notifications);
const {events, notifiers} = this.notificationManager.getStats();
const notifierContent = notifiers.length === 0 ? 'None' : notifiers.join(', ');
const eventContent = events.length === 0 ? 'None' : events.join(', ');
this.logger.info(`Notification Info => Providers: ${notifierContent} | Events: ${eventContent}`);
let resourceConfig: SubredditResourceConfig = {
footer,
logger: this.logger,
@@ -321,7 +331,8 @@ export class Manager {
}
}
async parseConfiguration(causedBy: Invokee = 'system', force: boolean = false) {
async parseConfiguration(causedBy: Invokee = 'system', force: boolean = false, options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
//this.wikiUpdateRunning = true;
this.lastWikiCheck = dayjs();
@@ -377,6 +388,11 @@ export class Manager {
this.parseConfigurationFromObject(configObj);
this.logger.info('Checks updated');
if(!suppressNotification) {
this.notificationManager.handle('configUpdated', 'Configuration Updated', reason, causedBy)
}
return true;
} catch (err) {
this.validConfigLoaded = false;
@@ -393,8 +409,10 @@ export class Manager {
let allRuleResults: RuleResult[] = [];
const itemIdentifier = `${checkType === 'Submission' ? 'SUB' : 'COM'} ${itemId}`;
this.currentLabels = [itemIdentifier];
let ePeek = '';
try {
const [peek, _] = await itemContentPeek(item);
ePeek = peek;
this.logger.info(`<EVENT> ${peek}`);
} catch (err) {
this.logger.error(`Error occurred while generate item peek for ${checkType} Activity ${itemId}`, err);
@@ -461,6 +479,11 @@ export class Manager {
this.checksTriggeredSinceStart.set(check.name, (this.checksTriggeredSinceStart.get(check.name) || 0) + 1);
runActions = await check.runActions(item, currentResults.filter(x => x.triggered), dryRun);
actionsRun = runActions.length;
if(!check.notifyOnTrigger) {
const ar = runActions.map(x => x.getActionUniqueName()).join(', ');
this.notificationManager.handle('eventActioned', 'Check Triggered', `Check "${check.name}" was triggered on Event: \n ${ePeek} \n\n with the following actions run: ${ar}`);
}
break;
}
}
@@ -641,7 +664,8 @@ export class Manager {
}
}
startQueue(causedBy: Invokee = 'system') {
startQueue(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.queueState.state === RUNNING) {
this.logger.info(`Activity processing queue is already RUNNING with (${this.queue.length()} queued activities)`);
} else if (!this.validConfigLoaded) {
@@ -653,10 +677,14 @@ export class Manager {
state: RUNNING,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Queue Started', reason, causedBy)
}
}
}
async pauseQueue(causedBy: Invokee = 'system') {
async pauseQueue(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.queueState.state === PAUSED) {
if(this.queueState.causedBy !== causedBy) {
this.logger.info(`Activity processing queue state set to PAUSED by ${causedBy}`);
@@ -686,10 +714,14 @@ export class Manager {
state: PAUSED,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Queue Paused', reason, causedBy)
}
}
}
async stopQueue(causedBy: Invokee = 'system') {
async stopQueue(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.queueState.state === STOPPED) {
if(this.queueState.causedBy !== causedBy) {
this.logger.info(`Activity processing queue state set to STOPPED by ${causedBy}`);
@@ -715,11 +747,15 @@ export class Manager {
state: STOPPED,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Queue Stopped', reason, causedBy)
}
}
}
async startEvents(causedBy: Invokee = 'system') {
async startEvents(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(!this.validConfigLoaded) {
this.logger.warn('Cannot start event polling while manager has an invalid configuration');
return;
@@ -748,9 +784,13 @@ export class Manager {
state: RUNNING,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Events Polling Started', reason, causedBy)
}
}
pauseEvents(causedBy: Invokee = 'system') {
pauseEvents(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.eventsState.state !== RUNNING) {
this.logger.warn('Events must be in RUNNING state in order to be paused.');
} else {
@@ -766,10 +806,14 @@ export class Manager {
} else {
this.logger.info('Event polling is PAUSED.');
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Events Polling Paused', reason, causedBy)
}
}
}
stopEvents(causedBy: Invokee = 'system') {
stopEvents(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.eventsState.state !== STOPPED) {
for (const s of this.streams) {
s.end();
@@ -793,6 +837,9 @@ export class Manager {
causedBy
}
this.logger.info('Note: Polling behavior will be re-built from configuration when next started');
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Events Polling Stopped', reason, causedBy)
}
} else if(causedBy !== this.eventsState.causedBy) {
this.logger.info(`Events STOPPED by ${causedBy}`);
this.logger.info('Note: Polling behavior will be re-built from configuration when next started');
@@ -802,25 +849,33 @@ export class Manager {
}
}
async start(causedBy: Invokee = 'system') {
async start(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(!this.validConfigLoaded) {
this.logger.warn('Cannot put bot in RUNNING state while manager has an invalid configuration');
return;
}
await this.startEvents(causedBy);
this.startQueue(causedBy);
await this.startEvents(causedBy, {suppressNotification: true});
this.startQueue(causedBy, {suppressNotification: true});
this.botState = {
state: RUNNING,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Bot Started', reason, causedBy)
}
}
async stop(causedBy: Invokee = 'system') {
this.stopEvents(causedBy);
await this.stopQueue(causedBy);
async stop(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
this.stopEvents(causedBy, {suppressNotification: true});
await this.stopQueue(causedBy, {suppressNotification: true});
this.botState = {
state: STOPPED,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Bot Stopped', reason, causedBy)
}
}
}