Refactor app/manager building for in-situ updates

* Separate manager instantiation from configuration flow so config can be reloaded
* Move wiki page parsing into manager for better encapsulation
* Check for wiki revision date on heartbeat and on checks if older than one minute
* Catch config parsing issues and retry on next heartbeat
This commit is contained in:
FoxxMD
2021-07-06 16:28:18 -04:00
parent 00e38b5560
commit ed8be6dda2
4 changed files with 183 additions and 47 deletions

View File

@@ -1,4 +1,4 @@
import Snoowrap from "snoowrap";
import Snoowrap, { Subreddit } from "snoowrap";
import {Manager} from "./Subreddit/Manager";
import winston, {Logger} from "winston";
import {argParseInt, labelledFormat, parseBool, parseFromJsonOrYamlToObject, parseSubredditName, sleep} from "./util";
@@ -8,6 +8,7 @@ import EventEmitter from "events";
import CacheManager from './Subreddit/SubredditResources';
import dayjs, {Dayjs} from "dayjs";
import LoggedError from "./Utils/LoggedError";
import ConfigParseError from "./Utils/ConfigParseError";
const {transports} = winston;
@@ -149,7 +150,7 @@ export class App {
}
this.logger.info(`/u/${name} is a moderator of these subreddits: ${availSubs.map(x => x.display_name_prefixed).join(', ')}`);
let subsToRun = [];
let subsToRun: Subreddit[] = [];
const subsToUse = subreddits.length > 0 ? subreddits.map(parseSubredditName) : this.subreddits;
if (subsToUse.length > 0) {
this.logger.info(`User-defined subreddit constraints detected (CLI argument or environmental variable), will try to run on: ${subsToUse.join(', ')}`);
@@ -173,9 +174,11 @@ export class App {
// get configs for subs we want to run on and build/validate them
for (const sub of subsToRun) {
let content = undefined;
let wiki;
try {
const wiki = sub.getWikiPage(this.wikiLocation);
content = await wiki.content_md;
// @ts-ignore
wiki = await sub.getWikiPage(this.wikiLocation).fetch();
content = wiki.content_md;
} catch (err) {
this.logger.error(`[${sub.display_name_prefixed}] Could not read wiki configuration. Please ensure the page https://reddit.com${sub.url}wiki/${this.wikiLocation} exists and is readable -- error: ${err.message}`);
continue;
@@ -196,7 +199,10 @@ export class App {
}
try {
subSchedule.push(new Manager(sub, this.client, this.logger, configObj, {dryRun: this.dryRun}));
const manager = new Manager(sub, this.client, this.logger, configObj, {dryRun: this.dryRun});
manager.lastWikiCheck = dayjs();
manager.lastWikiRevision = dayjs.unix(wiki.revision_date);
subSchedule.push(manager);
} catch (err) {
if(!(err instanceof LoggedError)) {
this.logger.error(`[${sub.display_name_prefixed}] Config was not valid`, err);
@@ -210,13 +216,24 @@ export class App {
try {
this.heartBeating = true;
while (true) {
await sleep(this.heartbeatInterval * 1000);
await sleep(60 * 1000);
const heartbeat = `HEARTBEAT -- Reddit API Rate Limit remaining: ${this.client.ratelimitRemaining}`
if (this.apiLimitWarning >= this.client.ratelimitRemaining) {
this.logger.warn(heartbeat);
} else {
this.logger.info(heartbeat);
}
for(const s of this.subManagers) {
try {
await s.parseConfiguration();
if(!s.running) {
s.handle();
}
} catch (err) {
s.stop();
this.logger.info('Will retry parsing config on next heartbeat...');
}
}
}
} finally {
this.heartBeating = false;

View File

@@ -4,7 +4,7 @@ import {SubmissionCheck} from "../Check/SubmissionCheck";
import {CommentCheck} from "../Check/CommentCheck";
import {
determineNewResults,
mergeArr,
mergeArr, parseFromJsonOrYamlToObject, sleep,
} from "../util";
import {CommentStream, SubmissionStream, Poll, ModQueueStream} from "snoostorm";
import pEvent from "p-event";
@@ -14,22 +14,34 @@ import {ManagerOptions, PollingOptionsStrong} from "../Common/interfaces";
import Submission from "snoowrap/dist/objects/Submission";
import {itemContentPeek} from "../Utils/SnoowrapUtils";
import LoggedError from "../Utils/LoggedError";
import ResourceManager, {SubredditResourceOptions, SubredditResources} from "./SubredditResources";
import ResourceManager, {
SubredditResourceOptions,
SubredditResources,
SubredditResourceSetOptions
} from "./SubredditResources";
import {UnmoderatedStream} from "./Streams";
import EventEmitter from "events";
import ConfigParseError from "../Utils/ConfigParseError";
import dayjs, { Dayjs as DayjsObj } from "dayjs";
export class Manager {
subreddit: Subreddit;
client: Snoowrap;
logger: Logger;
pollOptions: PollingOptionsStrong[];
submissionChecks: SubmissionCheck[];
commentChecks: CommentCheck[];
resources: SubredditResources;
pollOptions!: PollingOptionsStrong[];
submissionChecks!: SubmissionCheck[];
commentChecks!: CommentCheck[];
resources!: SubredditResources;
wikiLocation: string = 'botconfig/contextbot';
lastWikiRevision?: DayjsObj
lastWikiCheck: DayjsObj = dayjs();
wikiUpdateRunning: boolean = false;
streamListedOnce: string[] = [];
streams: Poll<Snoowrap.Submission | Snoowrap.Comment>[] = [];
dryRun?: boolean;
globalDryRun?: boolean;
emitter: EventEmitter = new EventEmitter();
displayLabel: string;
currentLabels?: string[];
@@ -52,35 +64,54 @@ export class Manager {
return getLabels()
}
}, mergeArr);
const configBuilder = new ConfigBuilder({logger: this.logger});
const validJson = configBuilder.validateJson(sourceData);
const {checks, ...configManagerOpts} = validJson;
const {polling = [{pollOn: 'unmoderated', limit: 25, interval: 20000}], caching, dryRun, footer, nickname} = configManagerOpts || {};
this.pollOptions = buildPollingOptions(polling);
this.subreddit = sub;
this.client = client;
this.dryRun = opts.dryRun || dryRun;
this.parseConfigurationFromObject(sourceData);
}
protected parseConfigurationFromObject(configObj: object) {
const configBuilder = new ConfigBuilder({logger: this.logger});
const validJson = configBuilder.validateJson(configObj);
const {checks, ...configManagerOpts} = validJson;
const {
polling = [{pollOn: 'unmoderated', limit: 25, interval: 20000}],
caching,
dryRun,
footer,
nickname
} = configManagerOpts || {};
this.pollOptions = buildPollingOptions(polling);
this.dryRun = this.globalDryRun || dryRun;
if(nickname !== undefined) {
this.displayLabel = nickname;
this.currentLabels = [this.displayLabel];
}
let resourceConfig: SubredditResourceOptions = {
logger: this.logger,
subreddit: sub,
if(footer !== undefined) {
this.resources.footer = footer;
}
let resourceConfig: SubredditResourceSetOptions = {
footer,
enabled: true
};
if(caching === false) {
resourceConfig.enabled = false;
} else {
resourceConfig = {...resourceConfig, ...caching};
}
if(this.resources === undefined) {
this.resources = ResourceManager.set(this.subreddit.display_name, {
...resourceConfig,
logger: this.logger,
subreddit: this.subreddit
});
}
this.resources.setOptions(resourceConfig);
this.resources = ResourceManager.set(sub.display_name, resourceConfig);
this.logger.info('Subreddit-specific options updated');
this.logger.info('Building Checks...');
const commentChecks: Array<CommentCheck> = [];
const subChecks: Array<SubmissionCheck> = [];
@@ -90,7 +121,7 @@ export class Manager {
...jCheck,
dryRun: this.dryRun || jCheck.dryRun,
logger: this.logger,
subredditName: sub.display_name
subredditName: this.subreddit.display_name
};
if (jCheck.kind === 'comment') {
commentChecks.push(new CommentCheck(checkConfig));
@@ -109,6 +140,54 @@ export class Manager {
}
}
async parseConfiguration(force: boolean = false) {
this.wikiUpdateRunning = true;
this.lastWikiCheck = dayjs();
let sourceData: string;
try {
// @ts-ignore
const wiki = await this.subreddit.getWikiPage(this.wikiLocation).fetch();
const revisionDate = dayjs.unix(wiki.revision_date);
if (!force && (this.lastWikiRevision !== undefined && this.lastWikiRevision.isSame(revisionDate))) {
// nothing to do, we already have this revision
this.wikiUpdateRunning = false;
this.logger.verbose('Config is up to date');
return;
}
if (this.lastWikiRevision !== undefined) {
this.logger.info(`Updating config due to stale wiki page (${dayjs.duration(dayjs().diff(revisionDate)).humanize()} old)`)
}
this.lastWikiRevision = revisionDate;
sourceData = await wiki.content_md;
} catch (err) {
const msg = `Could not read wiki configuration. Please ensure the page https://reddit.com${this.subreddit.url}wiki/${this.wikiLocation} exists and is readable -- error: ${err.message}`;
this.logger.error(msg);
this.wikiUpdateRunning = false;
throw new ConfigParseError(msg);
}
if (sourceData === '') {
this.logger.error(`Wiki page contents was empty`);
this.wikiUpdateRunning = false;
throw new ConfigParseError('Wiki page contents was empty');
}
const [configObj, jsonErr, yamlErr] = parseFromJsonOrYamlToObject(sourceData);
if (configObj === undefined) {
this.logger.error(`Could not parse wiki page contents as JSON or YAML:`);
this.logger.error(jsonErr);
this.logger.error(yamlErr);
this.wikiUpdateRunning = false;
throw new ConfigParseError('Could not parse wiki page contents as JSON or YAML:')
}
this.wikiUpdateRunning = false;
this.parseConfigurationFromObject(configObj);
this.logger.info('Checks updated');
}
async runChecks(checkType: ('Comment' | 'Submission'), item: (Submission | Comment), checkNames: string[] = []): Promise<void> {
const checks = checkType === 'Comment' ? this.commentChecks : this.submissionChecks;
const itemId = await item.id;
@@ -117,6 +196,17 @@ export class Manager {
this.currentLabels = [this.displayLabel, itemIdentifier];
const [peek, _] = await itemContentPeek(item);
this.logger.info(`<EVENT> ${peek}`);
while(this.wikiUpdateRunning) {
// sleep for a few seconds while we get new config zzzz
this.logger.verbose('A wiki config update is running, delaying checks by 3 seconds');
await sleep(3000);
}
if(this.lastWikiCheck.diff(dayjs(), 's') > 60) {
// last checked more than 60 seconds ago for config, try and update
await this.parseConfiguration();
}
const startingApiLimit = this.client.ratelimitRemaining;
if(item instanceof Submission) {
@@ -247,12 +337,21 @@ export class Manager {
this.running = true;
this.logger.info('Bot Running');
const emitter = new EventEmitter();
await pEvent(emitter, 'end');
await pEvent(this.emitter, 'end');
} catch (err) {
this.logger.error('Encountered unhandled error, manager is bailing out');
this.logger.error(err);
} finally {
this.stop();
}
}
stop() {
if(this.running) {
for(const s of this.streams) {
s.end();
}
this.emitter.emit('end');
this.running = false;
this.logger.info('Bot Stopped');
}

View File

@@ -26,29 +26,51 @@ export interface SubredditResourceOptions extends SubredditCacheConfig, Footer {
logger: Logger;
}
export class SubredditResources {
export interface SubredditResourceSetOptions extends SubredditCacheConfig, Footer {
enabled: boolean;
protected authorTTL: number;
protected useSubredditAuthorCache: boolean;
protected wikiTTL: number;
}
export class SubredditResources {
enabled!: boolean;
protected authorTTL!: number;
protected useSubredditAuthorCache!: boolean;
protected wikiTTL!: number;
name: string;
protected logger: Logger;
userNotes: UserNotes;
footer: false | string;
footer!: false | string;
subreddit: Subreddit
constructor(name: string, options: SubredditResourceOptions) {
const {
enabled = true,
authorTTL,
subreddit,
userNotesTTL = 60000,
wikiTTL = 300000, // 5 minutes
logger,
footer = DEFAULT_FOOTER
enabled = true,
userNotesTTL = 60000,
} = options || {};
this.subreddit = subreddit;
this.name = name;
if (logger === undefined) {
const alogger = winston.loggers.get('default')
this.logger = alogger.child({labels: [this.name, 'Resource Cache']}, mergeArr);
} else {
this.logger = logger.child({labels: ['Resource Cache']}, mergeArr);
}
this.userNotes = new UserNotes(enabled ? userNotesTTL : 0, this.subreddit, this.logger)
this.setOptions(options);
}
setOptions (options: SubredditResourceSetOptions) {
const {
enabled = true,
authorTTL,
userNotesTTL = 60000,
wikiTTL = 300000, // 5 minutes
footer = DEFAULT_FOOTER
} = options || {};
this.footer = footer;
this.enabled = manager.enabled ? enabled : false;
if (authorTTL === undefined) {
@@ -59,16 +81,7 @@ export class SubredditResources {
this.authorTTL = authorTTL;
}
this.wikiTTL = wikiTTL;
this.userNotes = new UserNotes(enabled ? userNotesTTL : 0, subreddit, logger);
this.name = name;
if (logger === undefined) {
const alogger = winston.loggers.get('default')
this.logger = alogger.child({labels: [this.name, 'Resource Cache']}, mergeArr);
} else {
this.logger = logger.child({labels: ['Resource Cache']}, mergeArr);
}
this.userNotes.notesTTL = enabled ? userNotesTTL : 0;
}
async getAuthorActivities(user: RedditUser, options: AuthorTypedActivitiesOptions): Promise<Array<Submission | Comment>> {

View File

@@ -0,0 +1,7 @@
import LoggedError from "./LoggedError";
class ConfigParseError extends LoggedError {
}
export default ConfigParseError