From 8d8e4405e0942073b0095577cef82ce66e0986f2 Mon Sep 17 00:00:00 2001 From: FoxxMD Date: Fri, 13 Aug 2021 15:52:18 -0400 Subject: [PATCH] Implement graceful bot teardown and api endpoint to rebuild bot --- src/App.ts | 365 ++++++++++++++++++++++----------------- src/Subreddit/Manager.ts | 23 --- src/Utils/AbortToken.ts | 25 +++ src/Web/Server/server.ts | 100 ++++------- 4 files changed, 264 insertions(+), 249 deletions(-) create mode 100644 src/Utils/AbortToken.ts diff --git a/src/App.ts b/src/App.ts index 7ca4b9e..548c233 100644 --- a/src/App.ts +++ b/src/App.ts @@ -18,7 +18,7 @@ import LoggedError from "./Utils/LoggedError"; import {ProxiedSnoowrap, RequestTrackingSnoowrap} from "./Utils/SnoowrapClients"; import {ModQueueStream, UnmoderatedStream} from "./Subreddit/Streams"; import {getLogger} from "./Utils/loggerFactory"; -import {DurationString, OperatorConfig, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces"; +import {DurationString, Invokee, OperatorConfig, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces"; import { Duration } from "dayjs/plugin/duration"; import {singleton} from "./Utils/SnoowrapUtils"; @@ -44,13 +44,14 @@ export class App { heartbeatInterval: number; nextHeartbeat?: Dayjs; heartBeating: boolean = false; + running: boolean = false; //apiLimitWarning: number; softLimit: number | string = 250; hardLimit: number | string = 50; nannyMode?: 'soft' | 'hard'; - nextExpiration!: Dayjs; - botName!: string; - botLink!: string; + nextExpiration: Dayjs = dayjs(); + botName?: string; + botLink?: string; maxWorkers: number; startedAt: Dayjs = dayjs(); sharedModqueue: boolean = false; @@ -62,6 +63,7 @@ export class App { depletedInSecs: number = 0; error: any; + emitter: EventEmitter = new EventEmitter(); constructor(config: OperatorConfig) { const { @@ -153,7 +155,7 @@ export class App { this.logger.info(`If this is a first-time setup use the 'web' command for a web-based guide to configuring your application`); this.logger.info(`Or check the USAGE section of the readme for the correct naming of these arguments/environment variables`); this.error = `Missing credentials: ${missingCreds.join(', ')}`; - throw new LoggedError(`Missing credentials: ${missingCreds.join(', ')}`); + //throw new LoggedError(`Missing credentials: ${missingCreds.join(', ')}`); } this.client = proxy === undefined ? new Snoowrap(creds) : new ProxiedSnoowrap({...creds, proxy}); @@ -264,7 +266,7 @@ export class App { // TODO don't know a way to check permissions yet availSubs.push(sub); } - this.logger.info(`${this.botName} is a moderator of these subreddits: ${availSubs.map(x => x.display_name_prefixed).join(', ')}`); + this.logger.info(`u/${user.name} is a moderator of these subreddits: ${availSubs.map(x => x.display_name_prefixed).join(', ')}`); let subsToRun: Subreddit[] = []; const subsToUse = subreddits.length > 0 ? subreddits.map(parseSubredditName) : this.subreddits; @@ -304,56 +306,87 @@ export class App { } async heartbeat() { + // break up interval into 5 seconds chunks so we can interrupt on destroy() + const wholeIterations = Math.floor(this.heartbeatInterval / 5); + const remainderSecs = this.heartbeatInterval % 5; + try { this.heartBeating = true; - while (true) { - this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second'); - await sleep(this.heartbeatInterval * 1000); - const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ~${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)` - this.logger.info(heartbeat); - for (const s of this.subManagers) { - if(s.botState.state === STOPPED && s.botState.causedBy === USER) { - this.logger.debug('Skipping config check/restart on heartbeat due to previously being stopped by user', {subreddit: s.displayLabel}); - continue; + mainLoop: + while (this.running) { + this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second'); + + for(let i = 0; i < wholeIterations; i++) { + await sleep(5000); + if(!this.running) { + break mainLoop; + } } - try { - const newConfig = await s.parseConfiguration(); - if(newConfig || (s.queueState.state !== RUNNING && s.queueState.causedBy === SYSTEM)) - { - await s.startQueue('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running queue'}); + if(remainderSecs > 0) { + await sleep(remainderSecs * 1000); + } + if(!this.running) { + break; + } + + const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ~${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)` + this.logger.info(heartbeat); + for (const s of this.subManagers) { + if(s.botState.state === STOPPED && s.botState.causedBy === USER) { + this.logger.debug('Skipping config check/restart on heartbeat due to previously being stopped by user', {subreddit: s.displayLabel}); + continue; } - if(newConfig || (s.eventsState.state !== RUNNING && s.eventsState.causedBy === SYSTEM)) - { - await s.startEvents('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running events'}); - } - if(s.botState.state !== RUNNING && s.eventsState.state === RUNNING && s.queueState.state === RUNNING) { - s.botState = { - state: RUNNING, - causedBy: 'system', + try { + const newConfig = await s.parseConfiguration(); + if(newConfig || (s.queueState.state !== RUNNING && s.queueState.causedBy === SYSTEM)) + { + await s.startQueue('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running queue'}); + } + if(newConfig || (s.eventsState.state !== RUNNING && s.eventsState.causedBy === SYSTEM)) + { + await s.startEvents('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running events'}); + } + if(s.botState.state !== RUNNING && s.eventsState.state === RUNNING && s.queueState.state === RUNNING) { + s.botState = { + state: RUNNING, + causedBy: 'system', + } + } + } catch (err) { + this.logger.info('Stopping event polling to prevent activity processing queue from backing up. Will be restarted when config update succeeds.') + await s.stopEvents('system', {reason: 'Invalid config will cause events to pile up in queue. Will be restarted when config update succeeds (next heartbeat).'}); + if(!(err instanceof LoggedError)) { + this.logger.error(err, {subreddit: s.displayLabel}); + } + if(this.nextHeartbeat !== undefined) { + this.logger.info(`Will retry parsing config on next heartbeat (in ${dayjs.duration(this.nextHeartbeat.diff(dayjs())).humanize()})`, {subreddit: s.displayLabel}); } } - } catch (err) { - this.logger.info('Stopping event polling to prevent activity processing queue from backing up. Will be restarted when config update succeeds.') - await s.stopEvents('system', {reason: 'Invalid config will cause events to pile up in queue. Will be restarted when config update succeeds (next heartbeat).'}); - if(!(err instanceof LoggedError)) { - this.logger.error(err, {subreddit: s.displayLabel}); - } - if(this.nextHeartbeat !== undefined) { - this.logger.info(`Will retry parsing config on next heartbeat (in ${dayjs.duration(this.nextHeartbeat.diff(dayjs())).humanize()})`, {subreddit: s.displayLabel}); - } } + await this.runModStreams(true); } - await this.runModStreams(true); - } } catch (err) { this.logger.error('Error occurred during heartbeat', err); throw err; } finally { this.nextHeartbeat = undefined; this.heartBeating = false; + this.logger.info('Heartbeat stopped'); + this.emitter.emit('heartbeatStopped'); } } + async destroy(causedBy: Invokee) { + this.logger.info('Stopping heartbeat and nanny processes, may take up to 5 seconds...'); + const processWait = Promise.all([pEvent(this.emitter, 'heartbeatStopped'), pEvent(this.emitter, 'nannyStopped')]); + this.running = false; + await processWait; + for (const manager of this.subManagers) { + await manager.stop(causedBy, {reason: 'App rebuild'}); + } + this.logger.info('Bot is stopped.'); + } + async runModStreams(notify = false) { for(const [k,v] of CacheManager.modStreams) { if(!v.running && v.listeners('item').length > 0) { @@ -362,7 +395,7 @@ export class App { if(notify) { for(const m of this.subManagers) { if(m.modStreamCallbacks.size > 0) { - m.notificationManager.handle('runStateChanged', `${k.toUpperCase()} Polling Started`, 'Polling was successfully restarted on heartbeat.'); + await m.notificationManager.handle('runStateChanged', `${k.toUpperCase()} Polling Started`, 'Polling was successfully restarted on heartbeat.'); } } } @@ -373,6 +406,7 @@ export class App { async runManagers() { if(this.subManagers.every(x => !x.validConfigLoaded)) { this.logger.warn('All managers have invalid configs!'); + this.error = 'All managers have invalid configs'; } for (const manager of this.subManagers) { if (manager.validConfigLoaded && manager.botState.state !== RUNNING) { @@ -382,135 +416,146 @@ export class App { await this.runModStreams(); - if (this.heartbeatInterval !== 0 && !this.heartBeating) { - this.heartbeat(); - } + this.running = true; + this.heartbeat(); this.runApiNanny(); - - const emitter = new EventEmitter(); - await pEvent(emitter, 'end'); } async runApiNanny() { - while(true) { - await sleep(10000); - this.nextExpiration = dayjs(this.client.ratelimitExpiration); - const nowish = dayjs().add(10, 'second'); - if(nowish.isAfter(this.nextExpiration)) { - // it's possible no api calls are being made because of a hard limit - // need to make an api call to update this - // @ts-ignore - await this.client.getMe(); - this.nextExpiration = dayjs(this.client.ratelimitExpiration); - } - const rollingSample = this.apiSample.slice(0, 7) - rollingSample.unshift(this.client.ratelimitRemaining); - this.apiSample = rollingSample; - const diff = this.apiSample.reduceRight((acc: number[], curr, index) => { - if(this.apiSample[index + 1] !== undefined) { - const d = Math.abs(curr - this.apiSample[index + 1]); - if(d === 0) { - return [...acc, 0]; + try { + mainLoop: + while (this.running) { + for(let i = 0; i < 2; i++) { + await sleep(5000); + if (!this.running) { + break mainLoop; + } } - return [...acc, d/10]; - } - return acc; - }, []); - this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second - this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0 - this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs}); - this.logger.debug(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`); - - let hardLimitHit = false; - if(typeof this.hardLimit === 'string') { - const hardDur = parseDuration(this.hardLimit); - hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds(); - } else { - hardLimitHit = this.hardLimit > this.client.ratelimitRemaining; - } - - if(hardLimitHit) { - if(this.nannyMode === 'hard') { - continue; - } - this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'}); - this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); - this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'}); - - for(const m of this.subManagers) { - m.pauseEvents('system'); - m.notificationManager.handle('runStateChanged', 'Hard Limit Triggered', `Hard Limit of ${this.hardLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit event polling has been paused.`, 'system', 'warn'); - } - - this.nannyMode = 'hard'; - continue; - } - - let softLimitHit = false; - if(typeof this.softLimit === 'string') { - const softDur = parseDuration(this.softLimit); - softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds(); - } else { - softLimitHit = this.softLimit > this.client.ratelimitRemaining; - } - - if(softLimitHit) { - if(this.nannyMode === 'soft') { - continue; - } - this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'}); - this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); - this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'}); - let threshold = 0.5; - let offenders = this.subManagers.filter(x => { - const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; - return combinedPerSec > threshold; - }); - if(offenders.length === 0) { - threshold = 0.25; - // reduce threshold - offenders = this.subManagers.filter(x => { - const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; - return combinedPerSec > threshold; - }); - } - - if(offenders.length > 0) { - this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'}); - for(const m of offenders) { - m.delayBy = 1.5; - m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); - m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn'); + this.nextExpiration = dayjs(this.client.ratelimitExpiration); + const nowish = dayjs().add(10, 'second'); + if (nowish.isAfter(this.nextExpiration)) { + // it's possible no api calls are being made because of a hard limit + // need to make an api call to update this + // @ts-ignore + await this.client.getMe(); + this.nextExpiration = dayjs(this.client.ratelimitExpiration); } - } else { - this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'}); - for(const m of this.subManagers) { - m.delayBy = 1.5; - m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); - m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn'); + const rollingSample = this.apiSample.slice(0, 7) + rollingSample.unshift(this.client.ratelimitRemaining); + this.apiSample = rollingSample; + const diff = this.apiSample.reduceRight((acc: number[], curr, index) => { + if (this.apiSample[index + 1] !== undefined) { + const d = Math.abs(curr - this.apiSample[index + 1]); + if (d === 0) { + return [...acc, 0]; + } + return [...acc, d / 10]; + } + return acc; + }, []); + this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr, 0) / diff.length; // api requests per second + this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0 + this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs}); + this.logger.debug(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`); + + + let hardLimitHit = false; + if (typeof this.hardLimit === 'string') { + const hardDur = parseDuration(this.hardLimit); + hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds(); + } else { + hardLimitHit = this.hardLimit > this.client.ratelimitRemaining; + } + + if (hardLimitHit) { + if (this.nannyMode === 'hard') { + continue; + } + this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'}); + this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); + this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'}); + + for (const m of this.subManagers) { + m.pauseEvents('system'); + m.notificationManager.handle('runStateChanged', 'Hard Limit Triggered', `Hard Limit of ${this.hardLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit event polling has been paused.`, 'system', 'warn'); + } + + this.nannyMode = 'hard'; + continue; + } + + let softLimitHit = false; + if (typeof this.softLimit === 'string') { + const softDur = parseDuration(this.softLimit); + softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds(); + } else { + softLimitHit = this.softLimit > this.client.ratelimitRemaining; + } + + if (softLimitHit) { + if (this.nannyMode === 'soft') { + continue; + } + this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'}); + this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); + this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'}); + let threshold = 0.5; + let offenders = this.subManagers.filter(x => { + const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; + return combinedPerSec > threshold; + }); + if (offenders.length === 0) { + threshold = 0.25; + // reduce threshold + offenders = this.subManagers.filter(x => { + const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; + return combinedPerSec > threshold; + }); + } + + if (offenders.length > 0) { + this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'}); + for (const m of offenders) { + m.delayBy = 1.5; + m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); + m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn'); + } + } else { + this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'}); + for (const m of this.subManagers) { + m.delayBy = 1.5; + m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); + m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn'); + } + } + this.nannyMode = 'soft'; + continue; + } + + if (this.nannyMode !== undefined) { + this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'}); + for (const m of this.subManagers) { + if (m.delayBy !== undefined) { + m.delayBy = undefined; + m.notificationManager.handle('runStateChanged', 'Normal Processing Resumed', 'Slow Mode has been turned off due to better API conditions', 'system'); + } + if (m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) { + m.startQueue('system', {reason: 'API Nanny has been turned off due to better API conditions'}); + } + if (m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) { + await m.startEvents('system', {reason: 'API Nanny has been turned off due to better API conditions'}); + } + } + this.nannyMode = undefined; } } - this.nannyMode = 'soft'; - continue; - } - - if(this.nannyMode !== undefined) { - this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'}); - for(const m of this.subManagers) { - if(m.delayBy !== undefined) { - m.delayBy = undefined; - m.notificationManager.handle('runStateChanged', 'Normal Processing Resumed', 'Slow Mode has been turned off due to better API conditions', 'system'); - } - if(m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) { - m.startQueue('system', {reason: 'API Nanny has been turned off due to better API conditions'}); - } - if(m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) { - await m.startEvents('system', {reason: 'API Nanny has been turned off due to better API conditions'}); - } - } - this.nannyMode = undefined; - } + } catch (err) { + this.logger.error('Error occurred during nanny loop', err); + throw err; + } finally { + this.logger.info('Nanny stopped'); + this.emitter.emit('nannyStopped'); } } } diff --git a/src/Subreddit/Manager.ts b/src/Subreddit/Manager.ts index 062a852..63b69e8 100644 --- a/src/Subreddit/Manager.ts +++ b/src/Subreddit/Manager.ts @@ -730,29 +730,6 @@ export class Manager { } } - async handle(): Promise { - if (this.submissionChecks.length === 0 && this.commentChecks.length === 0) { - this.logger.warn('No submission or comment checks to run! Bot will not run.'); - return; - } - - try { - for (const s of this.streams) { - s.startInterval(); - } - this.startedAt = dayjs(); - this.running = true; - this.manuallyStopped = false; - this.logger.info('Bot Running'); - - await pEvent(this.emitter, 'end'); - } catch (err) { - this.logger.error('Too many request errors occurred or an unhandled error was encountered, manager is stopping'); - } finally { - this.stop(); - } - } - startQueue(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) { const {reason, suppressNotification = false} = options || {}; if(this.queueState.state === RUNNING) { diff --git a/src/Utils/AbortToken.ts b/src/Utils/AbortToken.ts new file mode 100644 index 0000000..a914462 --- /dev/null +++ b/src/Utils/AbortToken.ts @@ -0,0 +1,25 @@ +//https://gist.github.com/pygy/6290f78b078e22418821b07d8d63f111#gistcomment-3408351 +class AbortToken { + private readonly abortSymbol = Symbol('cancelled'); + private abortPromise: Promise; + private resolve!: Function; // Works due to promise init + + constructor() { + this.abortPromise = new Promise(res => this.resolve = res); + } + + public async wrap(p: PromiseLike): Promise { + const result = await Promise.race([p, this.abortPromise]); + if (result === this.abortSymbol) { + throw new Error('aborted'); + } + + return result; + } + + public abort() { + this.resolve(this.abortSymbol); + } +} + +export default AbortToken; diff --git a/src/Web/Server/server.ts b/src/Web/Server/server.ts index f98bb25..dddf206 100644 --- a/src/Web/Server/server.ts +++ b/src/Web/Server/server.ts @@ -2,49 +2,34 @@ import {addAsync, Router} from '@awaitjs/express'; import express, {Request, Response} from 'express'; import bodyParser from 'body-parser'; import {App} from "../../App"; -import dayjs from 'dayjs'; -import {Writable, Transform} from "stream"; +import {Transform} from "stream"; import winston from 'winston'; import {Server as SocketServer} from 'socket.io'; -import Submission from "snoowrap/dist/objects/Submission"; -import EventEmitter from "events"; import {Strategy as JwtStrategy, ExtractJwt} from 'passport-jwt'; import passport from 'passport'; import tcpUsed from 'tcp-port-used'; import { - boolToString, cacheStats, - COMMENT_URL_ID, createCacheManager, - filterLogBySubreddit, - formatLogLineToHtml, formatNumber, - isLogLineMinLevel, - LogEntry, parseFromJsonOrYamlToObject, - parseLinkIdentifier, - parseSubredditLogName, parseSubredditName, - pollingInfo, SUBMISSION_URL_ID + LogEntry, + parseSubredditLogName } from "../../util"; -import {Manager} from "../../Subreddit/Manager"; import {getLogger} from "../../Utils/loggerFactory"; import LoggedError from "../../Utils/LoggedError"; -import {OperatorConfig, ResourceStats, RUNNING, STOPPED, SYSTEM, USER} from "../../Common/interfaces"; +import {Invokee, OperatorConfig} from "../../Common/interfaces"; import http from "http"; import SimpleError from "../../Utils/SimpleError"; -import {booleanMiddle} from "../Common/middleware"; -import pEvent from "p-event"; -import {BotStats, BotStatusResponse} from '../Common/interfaces'; import {heartbeat} from "./routes/authenticated/applicationRoutes"; import logs from "./routes/authenticated/user/logs"; import status from './routes/authenticated/user/status'; import {actionRoute, configRoute} from "./routes/authenticated/user"; import action from "./routes/authenticated/user/action"; +import {authUserCheck} from "./middleware"; +import {opStats} from "../Common/util"; const app = addAsync(express()); app.use(bodyParser.json()); app.use(bodyParser.urlencoded({extended: false})); -const commentReg = parseLinkIdentifier([COMMENT_URL_ID]); -const submissionReg = parseLinkIdentifier([SUBMISSION_URL_ID]); - declare module 'express-session' { interface SessionData { user: string, @@ -61,11 +46,6 @@ const subLogMap: Map = new Map(); const rcbServer = async function (options: OperatorConfig) { const { - credentials: { - clientId, - clientSecret, - redirectUri - }, operator: { name, display, @@ -109,14 +89,6 @@ const rcbServer = async function (options: OperatorConfig) { logger.add(streamTransport); - let error: string; - // need to return App to main so that we can handle app shutdown on SIGTERM and discriminate between normal shutdown and crash on error - try { - bot = new App(options); - } catch (err) { - error = err.message; - } - if (await tcpUsed.check(port)) { throw new SimpleError(`Specified port for API (${port}) is in use or not available. Cannot start API.`); } @@ -181,43 +153,39 @@ const rcbServer = async function (options: OperatorConfig) { app.getAsync('/check', ...actionRoute); - - try { - // @ts-ignore + const initBot = async (causedBy: Invokee = 'system') => { if(bot !== undefined) { - await bot.testClient(); - await bot.buildManagers(); - botSubreddits = bot.subManagers.map(x => x.displayLabel); + logger.info('A bot instance already exists. Attempting to stop event/queue processing first before building new bot.'); + await bot.destroy(causedBy); } - } catch (err) { - // TODO eventually allow re-creating bot from api request - logger.error('Server is still ONLINE but bot cannot recover from this error. The server must be restarted.') - if(!err.logged || !(err instanceof LoggedError)) { - logger.error(err); + const newBot = new App(options); + if(newBot.error === undefined) { + try { + await newBot.testClient(); + await newBot.buildManagers(); + botSubreddits = newBot.subManagers.map(x => x.displayLabel); + await newBot.runManagers(); + } catch (err) { + if(newBot.error === undefined) { + newBot.error = err.message; + } + logger.error('Server is still ONLINE but bot cannot recover from this error and must be re-built'); + if(!err.logged || !(err instanceof LoggedError)) { + logger.error(err); + } + } } + return newBot; } - // @ts-ignore - if(bot !== undefined) { - await bot.runManagers(); - } -}; -const opStats = (bot: App): BotStats => { - const limitReset = dayjs(bot.client.ratelimitExpiration); - const nextHeartbeat = bot.nextHeartbeat !== undefined ? bot.nextHeartbeat.local().format('MMMM D, YYYY h:mm A Z') : 'N/A'; - const nextHeartbeatHuman = bot.nextHeartbeat !== undefined ? `in ${dayjs.duration(bot.nextHeartbeat.diff(dayjs())).humanize()}` : 'N/A' - return { - startedAtHuman: `${dayjs.duration(dayjs().diff(bot.startedAt)).humanize()}`, - nextHeartbeat, - nextHeartbeatHuman, - apiLimit: bot.client.ratelimitRemaining, - apiAvg: formatNumber(bot.apiRollingAvg), - nannyMode: bot.nannyMode || 'Off', - apiDepletion: bot.apiEstDepletion === undefined ? 'Not Calculated' : bot.apiEstDepletion.humanize(), - limitReset: limitReset.format(), - limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}`, - } -} + app.postAsync('/init', authUserCheck(), async (req, res) => { + logger.info(`${(req.user as Express.User).name} requested the bot to be re-built. Starting rebuild now...`, {subreddit: (req.user as Express.User).name}); + bot = await initBot('user'); + }); + + logger.info('Beginning bot init on startup...'); + bot = await initBot(); +}; export default rcbServer;