Implement graceful bot teardown and api endpoint to rebuild bot

This commit is contained in:
FoxxMD
2021-08-13 15:52:18 -04:00
parent ee302ee430
commit 8d8e4405e0
4 changed files with 264 additions and 249 deletions

View File

@@ -18,7 +18,7 @@ import LoggedError from "./Utils/LoggedError";
import {ProxiedSnoowrap, RequestTrackingSnoowrap} from "./Utils/SnoowrapClients";
import {ModQueueStream, UnmoderatedStream} from "./Subreddit/Streams";
import {getLogger} from "./Utils/loggerFactory";
import {DurationString, OperatorConfig, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces";
import {DurationString, Invokee, OperatorConfig, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces";
import { Duration } from "dayjs/plugin/duration";
import {singleton} from "./Utils/SnoowrapUtils";
@@ -44,13 +44,14 @@ export class App {
heartbeatInterval: number;
nextHeartbeat?: Dayjs;
heartBeating: boolean = false;
running: boolean = false;
//apiLimitWarning: number;
softLimit: number | string = 250;
hardLimit: number | string = 50;
nannyMode?: 'soft' | 'hard';
nextExpiration!: Dayjs;
botName!: string;
botLink!: string;
nextExpiration: Dayjs = dayjs();
botName?: string;
botLink?: string;
maxWorkers: number;
startedAt: Dayjs = dayjs();
sharedModqueue: boolean = false;
@@ -62,6 +63,7 @@ export class App {
depletedInSecs: number = 0;
error: any;
emitter: EventEmitter = new EventEmitter();
constructor(config: OperatorConfig) {
const {
@@ -153,7 +155,7 @@ export class App {
this.logger.info(`If this is a first-time setup use the 'web' command for a web-based guide to configuring your application`);
this.logger.info(`Or check the USAGE section of the readme for the correct naming of these arguments/environment variables`);
this.error = `Missing credentials: ${missingCreds.join(', ')}`;
throw new LoggedError(`Missing credentials: ${missingCreds.join(', ')}`);
//throw new LoggedError(`Missing credentials: ${missingCreds.join(', ')}`);
}
this.client = proxy === undefined ? new Snoowrap(creds) : new ProxiedSnoowrap({...creds, proxy});
@@ -264,7 +266,7 @@ export class App {
// TODO don't know a way to check permissions yet
availSubs.push(sub);
}
this.logger.info(`${this.botName} is a moderator of these subreddits: ${availSubs.map(x => x.display_name_prefixed).join(', ')}`);
this.logger.info(`u/${user.name} is a moderator of these subreddits: ${availSubs.map(x => x.display_name_prefixed).join(', ')}`);
let subsToRun: Subreddit[] = [];
const subsToUse = subreddits.length > 0 ? subreddits.map(parseSubredditName) : this.subreddits;
@@ -304,56 +306,87 @@ export class App {
}
async heartbeat() {
// break up interval into 5 seconds chunks so we can interrupt on destroy()
const wholeIterations = Math.floor(this.heartbeatInterval / 5);
const remainderSecs = this.heartbeatInterval % 5;
try {
this.heartBeating = true;
while (true) {
this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second');
await sleep(this.heartbeatInterval * 1000);
const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ~${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`
this.logger.info(heartbeat);
for (const s of this.subManagers) {
if(s.botState.state === STOPPED && s.botState.causedBy === USER) {
this.logger.debug('Skipping config check/restart on heartbeat due to previously being stopped by user', {subreddit: s.displayLabel});
continue;
mainLoop:
while (this.running) {
this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second');
for(let i = 0; i < wholeIterations; i++) {
await sleep(5000);
if(!this.running) {
break mainLoop;
}
}
try {
const newConfig = await s.parseConfiguration();
if(newConfig || (s.queueState.state !== RUNNING && s.queueState.causedBy === SYSTEM))
{
await s.startQueue('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running queue'});
if(remainderSecs > 0) {
await sleep(remainderSecs * 1000);
}
if(!this.running) {
break;
}
const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ~${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`
this.logger.info(heartbeat);
for (const s of this.subManagers) {
if(s.botState.state === STOPPED && s.botState.causedBy === USER) {
this.logger.debug('Skipping config check/restart on heartbeat due to previously being stopped by user', {subreddit: s.displayLabel});
continue;
}
if(newConfig || (s.eventsState.state !== RUNNING && s.eventsState.causedBy === SYSTEM))
{
await s.startEvents('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running events'});
}
if(s.botState.state !== RUNNING && s.eventsState.state === RUNNING && s.queueState.state === RUNNING) {
s.botState = {
state: RUNNING,
causedBy: 'system',
try {
const newConfig = await s.parseConfiguration();
if(newConfig || (s.queueState.state !== RUNNING && s.queueState.causedBy === SYSTEM))
{
await s.startQueue('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running queue'});
}
if(newConfig || (s.eventsState.state !== RUNNING && s.eventsState.causedBy === SYSTEM))
{
await s.startEvents('system', {reason: newConfig ? 'Config updated on heartbeat triggered reload' : 'Heartbeat detected non-running events'});
}
if(s.botState.state !== RUNNING && s.eventsState.state === RUNNING && s.queueState.state === RUNNING) {
s.botState = {
state: RUNNING,
causedBy: 'system',
}
}
} catch (err) {
this.logger.info('Stopping event polling to prevent activity processing queue from backing up. Will be restarted when config update succeeds.')
await s.stopEvents('system', {reason: 'Invalid config will cause events to pile up in queue. Will be restarted when config update succeeds (next heartbeat).'});
if(!(err instanceof LoggedError)) {
this.logger.error(err, {subreddit: s.displayLabel});
}
if(this.nextHeartbeat !== undefined) {
this.logger.info(`Will retry parsing config on next heartbeat (in ${dayjs.duration(this.nextHeartbeat.diff(dayjs())).humanize()})`, {subreddit: s.displayLabel});
}
}
} catch (err) {
this.logger.info('Stopping event polling to prevent activity processing queue from backing up. Will be restarted when config update succeeds.')
await s.stopEvents('system', {reason: 'Invalid config will cause events to pile up in queue. Will be restarted when config update succeeds (next heartbeat).'});
if(!(err instanceof LoggedError)) {
this.logger.error(err, {subreddit: s.displayLabel});
}
if(this.nextHeartbeat !== undefined) {
this.logger.info(`Will retry parsing config on next heartbeat (in ${dayjs.duration(this.nextHeartbeat.diff(dayjs())).humanize()})`, {subreddit: s.displayLabel});
}
}
await this.runModStreams(true);
}
await this.runModStreams(true);
}
} catch (err) {
this.logger.error('Error occurred during heartbeat', err);
throw err;
} finally {
this.nextHeartbeat = undefined;
this.heartBeating = false;
this.logger.info('Heartbeat stopped');
this.emitter.emit('heartbeatStopped');
}
}
async destroy(causedBy: Invokee) {
this.logger.info('Stopping heartbeat and nanny processes, may take up to 5 seconds...');
const processWait = Promise.all([pEvent(this.emitter, 'heartbeatStopped'), pEvent(this.emitter, 'nannyStopped')]);
this.running = false;
await processWait;
for (const manager of this.subManagers) {
await manager.stop(causedBy, {reason: 'App rebuild'});
}
this.logger.info('Bot is stopped.');
}
async runModStreams(notify = false) {
for(const [k,v] of CacheManager.modStreams) {
if(!v.running && v.listeners('item').length > 0) {
@@ -362,7 +395,7 @@ export class App {
if(notify) {
for(const m of this.subManagers) {
if(m.modStreamCallbacks.size > 0) {
m.notificationManager.handle('runStateChanged', `${k.toUpperCase()} Polling Started`, 'Polling was successfully restarted on heartbeat.');
await m.notificationManager.handle('runStateChanged', `${k.toUpperCase()} Polling Started`, 'Polling was successfully restarted on heartbeat.');
}
}
}
@@ -373,6 +406,7 @@ export class App {
async runManagers() {
if(this.subManagers.every(x => !x.validConfigLoaded)) {
this.logger.warn('All managers have invalid configs!');
this.error = 'All managers have invalid configs';
}
for (const manager of this.subManagers) {
if (manager.validConfigLoaded && manager.botState.state !== RUNNING) {
@@ -382,135 +416,146 @@ export class App {
await this.runModStreams();
if (this.heartbeatInterval !== 0 && !this.heartBeating) {
this.heartbeat();
}
this.running = true;
this.heartbeat();
this.runApiNanny();
const emitter = new EventEmitter();
await pEvent(emitter, 'end');
}
async runApiNanny() {
while(true) {
await sleep(10000);
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
const nowish = dayjs().add(10, 'second');
if(nowish.isAfter(this.nextExpiration)) {
// it's possible no api calls are being made because of a hard limit
// need to make an api call to update this
// @ts-ignore
await this.client.getMe();
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
}
const rollingSample = this.apiSample.slice(0, 7)
rollingSample.unshift(this.client.ratelimitRemaining);
this.apiSample = rollingSample;
const diff = this.apiSample.reduceRight((acc: number[], curr, index) => {
if(this.apiSample[index + 1] !== undefined) {
const d = Math.abs(curr - this.apiSample[index + 1]);
if(d === 0) {
return [...acc, 0];
try {
mainLoop:
while (this.running) {
for(let i = 0; i < 2; i++) {
await sleep(5000);
if (!this.running) {
break mainLoop;
}
}
return [...acc, d/10];
}
return acc;
}, []);
this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second
this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0
this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs});
this.logger.debug(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`);
let hardLimitHit = false;
if(typeof this.hardLimit === 'string') {
const hardDur = parseDuration(this.hardLimit);
hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
hardLimitHit = this.hardLimit > this.client.ratelimitRemaining;
}
if(hardLimitHit) {
if(this.nannyMode === 'hard') {
continue;
}
this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.pauseEvents('system');
m.notificationManager.handle('runStateChanged', 'Hard Limit Triggered', `Hard Limit of ${this.hardLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit event polling has been paused.`, 'system', 'warn');
}
this.nannyMode = 'hard';
continue;
}
let softLimitHit = false;
if(typeof this.softLimit === 'string') {
const softDur = parseDuration(this.softLimit);
softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
softLimitHit = this.softLimit > this.client.ratelimitRemaining;
}
if(softLimitHit) {
if(this.nannyMode === 'soft') {
continue;
}
this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'});
let threshold = 0.5;
let offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
if(offenders.length === 0) {
threshold = 0.25;
// reduce threshold
offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
}
if(offenders.length > 0) {
this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'});
for(const m of offenders) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
const nowish = dayjs().add(10, 'second');
if (nowish.isAfter(this.nextExpiration)) {
// it's possible no api calls are being made because of a hard limit
// need to make an api call to update this
// @ts-ignore
await this.client.getMe();
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
}
} else {
this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
const rollingSample = this.apiSample.slice(0, 7)
rollingSample.unshift(this.client.ratelimitRemaining);
this.apiSample = rollingSample;
const diff = this.apiSample.reduceRight((acc: number[], curr, index) => {
if (this.apiSample[index + 1] !== undefined) {
const d = Math.abs(curr - this.apiSample[index + 1]);
if (d === 0) {
return [...acc, 0];
}
return [...acc, d / 10];
}
return acc;
}, []);
this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr, 0) / diff.length; // api requests per second
this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0
this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs});
this.logger.debug(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`);
let hardLimitHit = false;
if (typeof this.hardLimit === 'string') {
const hardDur = parseDuration(this.hardLimit);
hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
hardLimitHit = this.hardLimit > this.client.ratelimitRemaining;
}
if (hardLimitHit) {
if (this.nannyMode === 'hard') {
continue;
}
this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'});
for (const m of this.subManagers) {
m.pauseEvents('system');
m.notificationManager.handle('runStateChanged', 'Hard Limit Triggered', `Hard Limit of ${this.hardLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit event polling has been paused.`, 'system', 'warn');
}
this.nannyMode = 'hard';
continue;
}
let softLimitHit = false;
if (typeof this.softLimit === 'string') {
const softDur = parseDuration(this.softLimit);
softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
softLimitHit = this.softLimit > this.client.ratelimitRemaining;
}
if (softLimitHit) {
if (this.nannyMode === 'soft') {
continue;
}
this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'});
let threshold = 0.5;
let offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
if (offenders.length === 0) {
threshold = 0.25;
// reduce threshold
offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
}
if (offenders.length > 0) {
this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'});
for (const m of offenders) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
}
} else {
this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'});
for (const m of this.subManagers) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
m.notificationManager.handle('runStateChanged', 'Soft Limit Triggered', `Soft Limit of ${this.softLimit} hit (API Remaining: ${this.client.ratelimitRemaining}). Subreddit queue processing will be slowed to 1.5 seconds per.`, 'system', 'warn');
}
}
this.nannyMode = 'soft';
continue;
}
if (this.nannyMode !== undefined) {
this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'});
for (const m of this.subManagers) {
if (m.delayBy !== undefined) {
m.delayBy = undefined;
m.notificationManager.handle('runStateChanged', 'Normal Processing Resumed', 'Slow Mode has been turned off due to better API conditions', 'system');
}
if (m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) {
m.startQueue('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
if (m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) {
await m.startEvents('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
}
this.nannyMode = undefined;
}
}
this.nannyMode = 'soft';
continue;
}
if(this.nannyMode !== undefined) {
this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
if(m.delayBy !== undefined) {
m.delayBy = undefined;
m.notificationManager.handle('runStateChanged', 'Normal Processing Resumed', 'Slow Mode has been turned off due to better API conditions', 'system');
}
if(m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) {
m.startQueue('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
if(m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) {
await m.startEvents('system', {reason: 'API Nanny has been turned off due to better API conditions'});
}
}
this.nannyMode = undefined;
}
} catch (err) {
this.logger.error('Error occurred during nanny loop', err);
throw err;
} finally {
this.logger.info('Nanny stopped');
this.emitter.emit('nannyStopped');
}
}
}

View File

@@ -730,29 +730,6 @@ export class Manager {
}
}
async handle(): Promise<void> {
if (this.submissionChecks.length === 0 && this.commentChecks.length === 0) {
this.logger.warn('No submission or comment checks to run! Bot will not run.');
return;
}
try {
for (const s of this.streams) {
s.startInterval();
}
this.startedAt = dayjs();
this.running = true;
this.manuallyStopped = false;
this.logger.info('Bot Running');
await pEvent(this.emitter, 'end');
} catch (err) {
this.logger.error('Too many request errors occurred or an unhandled error was encountered, manager is stopping');
} finally {
this.stop();
}
}
startQueue(causedBy: Invokee = 'system', options?: ManagerStateChangeOption) {
const {reason, suppressNotification = false} = options || {};
if(this.queueState.state === RUNNING) {

25
src/Utils/AbortToken.ts Normal file
View File

@@ -0,0 +1,25 @@
//https://gist.github.com/pygy/6290f78b078e22418821b07d8d63f111#gistcomment-3408351
class AbortToken {
private readonly abortSymbol = Symbol('cancelled');
private abortPromise: Promise<any>;
private resolve!: Function; // Works due to promise init
constructor() {
this.abortPromise = new Promise(res => this.resolve = res);
}
public async wrap<T>(p: PromiseLike<T>): Promise<T> {
const result = await Promise.race([p, this.abortPromise]);
if (result === this.abortSymbol) {
throw new Error('aborted');
}
return result;
}
public abort() {
this.resolve(this.abortSymbol);
}
}
export default AbortToken;

View File

@@ -2,49 +2,34 @@ import {addAsync, Router} from '@awaitjs/express';
import express, {Request, Response} from 'express';
import bodyParser from 'body-parser';
import {App} from "../../App";
import dayjs from 'dayjs';
import {Writable, Transform} from "stream";
import {Transform} from "stream";
import winston from 'winston';
import {Server as SocketServer} from 'socket.io';
import Submission from "snoowrap/dist/objects/Submission";
import EventEmitter from "events";
import {Strategy as JwtStrategy, ExtractJwt} from 'passport-jwt';
import passport from 'passport';
import tcpUsed from 'tcp-port-used';
import {
boolToString, cacheStats,
COMMENT_URL_ID, createCacheManager,
filterLogBySubreddit,
formatLogLineToHtml, formatNumber,
isLogLineMinLevel,
LogEntry, parseFromJsonOrYamlToObject,
parseLinkIdentifier,
parseSubredditLogName, parseSubredditName,
pollingInfo, SUBMISSION_URL_ID
LogEntry,
parseSubredditLogName
} from "../../util";
import {Manager} from "../../Subreddit/Manager";
import {getLogger} from "../../Utils/loggerFactory";
import LoggedError from "../../Utils/LoggedError";
import {OperatorConfig, ResourceStats, RUNNING, STOPPED, SYSTEM, USER} from "../../Common/interfaces";
import {Invokee, OperatorConfig} from "../../Common/interfaces";
import http from "http";
import SimpleError from "../../Utils/SimpleError";
import {booleanMiddle} from "../Common/middleware";
import pEvent from "p-event";
import {BotStats, BotStatusResponse} from '../Common/interfaces';
import {heartbeat} from "./routes/authenticated/applicationRoutes";
import logs from "./routes/authenticated/user/logs";
import status from './routes/authenticated/user/status';
import {actionRoute, configRoute} from "./routes/authenticated/user";
import action from "./routes/authenticated/user/action";
import {authUserCheck} from "./middleware";
import {opStats} from "../Common/util";
const app = addAsync(express());
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({extended: false}));
const commentReg = parseLinkIdentifier([COMMENT_URL_ID]);
const submissionReg = parseLinkIdentifier([SUBMISSION_URL_ID]);
declare module 'express-session' {
interface SessionData {
user: string,
@@ -61,11 +46,6 @@ const subLogMap: Map<string, LogEntry[]> = new Map();
const rcbServer = async function (options: OperatorConfig) {
const {
credentials: {
clientId,
clientSecret,
redirectUri
},
operator: {
name,
display,
@@ -109,14 +89,6 @@ const rcbServer = async function (options: OperatorConfig) {
logger.add(streamTransport);
let error: string;
// need to return App to main so that we can handle app shutdown on SIGTERM and discriminate between normal shutdown and crash on error
try {
bot = new App(options);
} catch (err) {
error = err.message;
}
if (await tcpUsed.check(port)) {
throw new SimpleError(`Specified port for API (${port}) is in use or not available. Cannot start API.`);
}
@@ -181,43 +153,39 @@ const rcbServer = async function (options: OperatorConfig) {
app.getAsync('/check', ...actionRoute);
try {
// @ts-ignore
const initBot = async (causedBy: Invokee = 'system') => {
if(bot !== undefined) {
await bot.testClient();
await bot.buildManagers();
botSubreddits = bot.subManagers.map(x => x.displayLabel);
logger.info('A bot instance already exists. Attempting to stop event/queue processing first before building new bot.');
await bot.destroy(causedBy);
}
} catch (err) {
// TODO eventually allow re-creating bot from api request
logger.error('Server is still ONLINE but bot cannot recover from this error. The server must be restarted.')
if(!err.logged || !(err instanceof LoggedError)) {
logger.error(err);
const newBot = new App(options);
if(newBot.error === undefined) {
try {
await newBot.testClient();
await newBot.buildManagers();
botSubreddits = newBot.subManagers.map(x => x.displayLabel);
await newBot.runManagers();
} catch (err) {
if(newBot.error === undefined) {
newBot.error = err.message;
}
logger.error('Server is still ONLINE but bot cannot recover from this error and must be re-built');
if(!err.logged || !(err instanceof LoggedError)) {
logger.error(err);
}
}
}
return newBot;
}
// @ts-ignore
if(bot !== undefined) {
await bot.runManagers();
}
};
const opStats = (bot: App): BotStats => {
const limitReset = dayjs(bot.client.ratelimitExpiration);
const nextHeartbeat = bot.nextHeartbeat !== undefined ? bot.nextHeartbeat.local().format('MMMM D, YYYY h:mm A Z') : 'N/A';
const nextHeartbeatHuman = bot.nextHeartbeat !== undefined ? `in ${dayjs.duration(bot.nextHeartbeat.diff(dayjs())).humanize()}` : 'N/A'
return {
startedAtHuman: `${dayjs.duration(dayjs().diff(bot.startedAt)).humanize()}`,
nextHeartbeat,
nextHeartbeatHuman,
apiLimit: bot.client.ratelimitRemaining,
apiAvg: formatNumber(bot.apiRollingAvg),
nannyMode: bot.nannyMode || 'Off',
apiDepletion: bot.apiEstDepletion === undefined ? 'Not Calculated' : bot.apiEstDepletion.humanize(),
limitReset: limitReset.format(),
limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}`,
}
}
app.postAsync('/init', authUserCheck(), async (req, res) => {
logger.info(`${(req.user as Express.User).name} requested the bot to be re-built. Starting rebuild now...`, {subreddit: (req.user as Express.User).name});
bot = await initBot('user');
});
logger.info('Beginning bot init on startup...');
bot = await initBot();
};
export default rcbServer;