Refactor polling to enable polling from multiple sources

* New polling sources: unmoderated and modqueue
* Can now poll from many sources
This commit is contained in:
FoxxMD
2021-06-22 14:47:20 -04:00
parent 8c6b18cf4d
commit db2be949b4
5 changed files with 231 additions and 169 deletions

View File

@@ -262,49 +262,71 @@ export interface JoinCondition {
condition?: JoinOperands,
}
export type PollOn = 'unmoderated' | 'modqueue' | 'newSub' | 'newComm';
export interface PollingOptionsStrong extends PollingOptions {
limit: number,
interval: number,
}
/**
* You may specify polling options independently for submissions/comments
* @examples [{"submissions": {"limit": 10, "interval": 10000}, "comments": {"limit": 15, "interval": 10000}}]
* A configuration for where, how, and when to poll Reddit for Activities to process
*
* @examples [{"pollOn": "unmoderated","limit": 25, "interval": 20000}]
* */
export interface PollingOptions {
/**
* Polling options for submission events
* @examples [{"limit": 10, "interval": 10000}]
* What source to get Activities from. The source you choose will modify how the bots behaves so choose carefully.
*
* ### unmoderated (default)
*
* Activities that have yet to be approved/removed by a mod. This includes all modqueue (reports/spam) **and new submissions**.
*
* Use this if you want the bot to act like a regular moderator and act on anything that can be seen from mod tools.
*
* **Note:** Does NOT include new comments, only comments that are reported/filtered by Automoderator. If you want to process all unmoderated AND all new comments then use some version of `polling: ["unmoderated","newComm"]`
*
* ### modqueue
*
* Activities requiring moderator review, such as reported things and items caught by the spam filter.
*
* Use this if you only want the Bot to process reported/filtered Activities.
*
* ### newSub
*
* Get only `Submissions` that show up in `/r/mySubreddit/new`
*
* Use this if you want the bot to process Submissions only when:
*
* * they are not initially filtered by Automoderator or
* * after they have been manually approved from modqueue
*
* ## newComm
*
* Get only new `Comments`
*
* Use this if you want the bot to process Comments only when:
*
* * they are not initially filtered by Automoderator or
* * after they have been manually approved from modqueue
*
* */
submissions?: {
/**
* The number of submissions to pull from /r/subreddit/new on every request
* @default 10
* @examples [10]
* */
limit?: number,
/**
* Amount of time, in milliseconds, to wait between requests to /r/subreddit/new
*
* @default 10000
* @examples [10000]
* */
interval?: number,
},
pollOn: 'unmoderated' | 'modqueue' | 'newSub' | 'newComm'
/**
* Polling options for comment events
* @examples [{"limit": 10, "interval": 10000}]
* The maximum number of Activities to get on every request
* @default 25
* @examples [25]
* */
comments?: {
/**
* The number of new comments to pull on every request
* @default 10
* @examples [10]
* */
limit?: number,
/**
* Amount of time, in milliseconds, to wait between requests for new comments
*
* @default 10000
* @examples [10000]
* */
interval?: number,
}
limit?: number
/**
* Amount of time, in milliseconds, to wait between requests
*
* @default 20000
* @examples [20000]
* */
interval?: number,
}
export interface SubredditCacheConfig {
@@ -358,7 +380,33 @@ export interface Footer {
}
export interface ManagerOptions {
polling?: PollingOptions
/**
* An array of sources to process Activities from
*
* Values in the array may be either:
*
* **A `string` representing the `pollOn` value to use**
*
* One of:
*
* * `unmoderated`
* * `modqueue`
* * `newSub`
* * `newComm`
*
* with the rest of the `PollingOptions` properties as defaults
*
* **A `PollingOptions` object**
*
* If you want to specify non-default preoperties
*
* ****
* If not specified the default is `["unmoderated"]`
*
* @default [["unmoderated"]]
* @example [["unmoderated","newComm"]]
* */
polling?: (string|PollingOptions)[]
/**
* Per-subreddit config for caching TTL values. If set to `false` caching is disabled.
@@ -404,6 +452,7 @@ export interface ManagerOptions {
* An alternate identifier to use in logs to identify your subreddit
*
* If your subreddit has a very long name it can make logging unwieldy. Specify a shorter name here to make log statements more readable (and shorter)
* @example ["shortName"]
* */
nickname?: string
}

View File

@@ -8,7 +8,7 @@ import * as schema from './Schema/App.json';
import {JSONConfig} from "./JsonConfig";
import LoggedError from "./Utils/LoggedError";
import {CheckStructuredJson} from "./Check";
import {ManagerOptions} from "./Common/interfaces";
import {PollingOptions, PollingOptionsStrong, PollOn} from "./Common/interfaces";
import {isRuleSetJSON, RuleSetJson, RuleSetObjectJson} from "./Rule/RuleSet";
import deepEqual from "fast-deep-equal";
import {ActionJson, ActionObjectJson, RuleJson, RuleObjectJson} from "./Common/types";
@@ -102,6 +102,23 @@ export class ConfigBuilder {
}
}
export const buildPollingOptions = (values: (string | PollingOptions)[]): PollingOptionsStrong[] => {
let opts: PollingOptionsStrong[] = [];
for (const v of values) {
if (typeof v === 'string') {
opts.push({pollOn: v as PollOn, interval: 10000, limit: 25});
} else {
const {
pollOn: p,
interval = 20000,
limit = 25
} = v;
opts.push({pollOn: p as PollOn, interval, limit});
}
}
return opts;
}
export const extractNamedRules = (rules: Array<RuleSetJson | RuleJson>, namedRules: Map<string, RuleObjectJson> = new Map()): Map<string, RuleObjectJson> => {
//const namedRules = new Map();
for (const r of rules) {

View File

@@ -1251,77 +1251,45 @@
"type": "object"
},
"PollingOptions": {
"description": "You may specify polling options independently for submissions/comments",
"description": "A configuration for where, how, and when to poll Reddit for Activities to process",
"examples": [
{
"comments": {
"interval": 10000,
"limit": 15
},
"submissions": {
"interval": 10000,
"limit": 10
}
"interval": 20000,
"limit": 25,
"pollOn": "unmoderated"
}
],
"properties": {
"comments": {
"description": "Polling options for comment events",
"interval": {
"default": 20000,
"description": "Amount of time, in milliseconds, to wait between requests",
"examples": [
{
"interval": 10000,
"limit": 10
}
20000
],
"properties": {
"interval": {
"default": 10000,
"description": "Amount of time, in milliseconds, to wait between requests for new comments",
"examples": [
10000
],
"type": "number"
},
"limit": {
"default": 10,
"description": "The number of new comments to pull on every request",
"examples": [
10
],
"type": "number"
}
},
"type": "object"
"type": "number"
},
"submissions": {
"description": "Polling options for submission events",
"limit": {
"default": 25,
"description": "The maximum number of Activities to get on every request",
"examples": [
{
"interval": 10000,
"limit": 10
}
25
],
"properties": {
"interval": {
"default": 10000,
"description": "Amount of time, in milliseconds, to wait between requests to /r/subreddit/new",
"examples": [
10000
],
"type": "number"
},
"limit": {
"default": 10,
"description": "The number of submissions to pull from /r/subreddit/new on every request",
"examples": [
10
],
"type": "number"
}
},
"type": "object"
"type": "number"
},
"pollOn": {
"description": "What source to get Activities from. The source you choose will modify how the bots behaves so choose carefully.\n\n### unmoderated (default)\n\nActivities that have yet to be approved/removed by a mod. This includes all modqueue (reports/spam) **and new submissions**.\n\nUse this if you want the bot to act like a regular moderator and act on anything that can be seen from mod tools.\n\n**Note:** Does NOT include new comments, only comments that are reported/filtered by Automoderator. If you want to process all unmoderated AND all new comments then use some version of `polling: [\"unmoderated\",\"newComm\"]`\n\n### modqueue\n\nActivities requiring moderator review, such as reported things and items caught by the spam filter.\n\nUse this if you only want the Bot to process reported/filtered Activities.\n\n### newSub\n\nGet only `Submissions` that show up in `/r/mySubreddit/new`\n\nUse this if you want the bot to process Submissions only when:\n\n* they are not initially filtered by Automoderator or\n* after they have been manually approved from modqueue\n\n## newComm\n\nGet only new `Comments`\n\nUse this if you want the bot to process Comments only when:\n\n* they are not initially filtered by Automoderator or\n* after they have been manually approved from modqueue",
"enum": [
"modqueue",
"newComm",
"newSub",
"unmoderated"
],
"type": "string"
}
},
"required": [
"pollOn"
],
"type": "object"
},
"RecentActivityRuleJSONConfig": {
@@ -2205,20 +2173,23 @@
"type": "string"
},
"polling": {
"$ref": "#/definitions/PollingOptions",
"description": "You may specify polling options independently for submissions/comments",
"examples": [
{
"comments": {
"interval": 10000,
"limit": 15
"default": [
[
"unmoderated"
]
],
"description": "An array of sources to process Activities from\n\nValues in the array may be either:\n\n**A `string` representing the `pollOn` value to use**\n\nOne of:\n\n* `unmoderated`\n* `modqueue`\n* `newSub`\n* `newComm`\n\nwith the rest of the `PollingOptions` properties as defaults\n\n**A `PollingOptions` object**\n\nIf you want to specify non-default preoperties\n\n****\nIf not specified the default is `[\"unmoderated\"]`",
"items": {
"anyOf": [
{
"$ref": "#/definitions/PollingOptions"
},
"submissions": {
"interval": 10000,
"limit": 10
{
"type": "string"
}
}
]
]
},
"type": "array"
}
},
"required": [

View File

@@ -6,30 +6,29 @@ import {
determineNewResults,
mergeArr,
} from "../util";
import {CommentStream, SubmissionStream} from "snoostorm";
import {CommentStream, SubmissionStream, Poll, ModQueueStream} from "snoostorm";
import pEvent from "p-event";
import {RuleResult} from "../Rule";
import {ConfigBuilder} from "../ConfigBuilder";
import {ManagerOptions, PollingOptions} from "../Common/interfaces";
import {ConfigBuilder, buildPollingOptions} from "../ConfigBuilder";
import {ManagerOptions, PollingOptionsStrong} from "../Common/interfaces";
import Submission from "snoowrap/dist/objects/Submission";
import {itemContentPeek} from "../Utils/SnoowrapUtils";
import dayjs from "dayjs";
import LoggedError from "../Utils/LoggedError";
import ResourceManager, {SubredditResourceOptions, SubredditResources} from "./SubredditResources";
import {UnmoderatedStream} from "./Streams";
import EventEmitter from "events";
export class Manager {
subreddit: Subreddit;
client: Snoowrap;
logger: Logger;
pollOptions: PollingOptions;
pollOptions: PollingOptionsStrong[];
submissionChecks: SubmissionCheck[];
commentChecks: CommentCheck[];
resources: SubredditResources;
subListedOnce = false;
streamSub?: SubmissionStream;
commentsListedOnce = false;
streamComments?: CommentStream;
streamListedOnce: string[] = [];
streams: Poll<Snoowrap.Submission | Snoowrap.Comment>[] = [];
dryRun?: boolean;
displayLabel: string;
@@ -57,8 +56,8 @@ export class Manager {
const configBuilder = new ConfigBuilder({logger: this.logger});
const validJson = configBuilder.validateJson(sourceData);
const {checks, ...configManagerOpts} = validJson;
const {polling = {}, caching, dryRun, footer, nickname} = configManagerOpts || {};
this.pollOptions = {...polling, ...opts.polling};
const {polling = [{pollOn: 'unmoderated', limit: 25, interval: 20000}], caching, dryRun, footer, nickname} = configManagerOpts || {};
this.pollOptions = buildPollingOptions(polling);
this.subreddit = sub;
this.client = client;
this.dryRun = opts.dryRun || dryRun;
@@ -167,67 +166,77 @@ export class Manager {
}
async handle(): Promise<void> {
if(this.submissionChecks.length === 0 && this.commentChecks.length === 0) {
this.logger.warn('No submission or comment checks to run! Bot will not run.');
return;
}
try {
if (this.submissionChecks.length > 0) {
const {
submissions: {
limit = 10,
interval = 10000,
} = {}
} = this.pollOptions
this.streamSub = new SubmissionStream(this.client, {
subreddit: this.subreddit.display_name,
limit,
pollTime: interval,
});
for(const pollOpt of this.pollOptions) {
let stream: Poll<Snoowrap.Submission | Snoowrap.Comment>;
this.streamSub.once('listing', async (listing) => {
this.subListedOnce = true;
switch(pollOpt.pollOn) {
case 'unmoderated':
stream = new UnmoderatedStream(this.client, {
subreddit: this.subreddit.display_name,
limit: pollOpt.limit,
pollTime: pollOpt.interval,
});
break;
case 'modqueue':
stream = new ModQueueStream(this.client, {
subreddit: this.subreddit.display_name,
limit: pollOpt.limit,
pollTime: pollOpt.interval,
});
break;
case 'newSub':
stream = new SubmissionStream(this.client, {
subreddit: this.subreddit.display_name,
limit: pollOpt.limit,
pollTime: pollOpt.interval,
});
break;
case 'newComm':
stream = new CommentStream(this.client, {
subreddit: this.subreddit.display_name,
limit: pollOpt.limit,
pollTime: pollOpt.interval,
});
break;
}
stream.once('listing', async (listing) => {
// warning if poll event could potentially miss activities
if(this.commentChecks.length === 0 && ['unmoderated','modqueue','newComm'].some(x => x === pollOpt.pollOn)) {
this.logger.warn(`Polling '${pollOpt.pollOn}' may return Comments but no comments checks were configured.`);
}
if(this.submissionChecks.length === 0 && ['unmoderated','modqueue','newSub'].some(x => x === pollOpt.pollOn)) {
this.logger.warn(`Polling '${pollOpt.pollOn}' may return Submissions but no submission checks were configured.`);
}
this.streamListedOnce.push(pollOpt.pollOn);
});
this.streamSub.on('item', async (item) => {
if (!this.subListedOnce) {
stream.on('item', async (item) => {
if (!this.streamListedOnce.includes(pollOpt.pollOn)) {
return;
}
await this.runChecks('Submission', item)
});
//this.streamSub.on('listing', (_) => this.logger.debug('Polled Submissions'));
}
if (this.commentChecks.length > 0) {
const {
comments: {
limit = 10,
interval = 10000,
} = {}
} = this.pollOptions
this.streamComments = new CommentStream(this.client, {
subreddit: this.subreddit.display_name,
limit,
pollTime: interval,
});
this.streamComments.once('listing', () => this.commentsListedOnce = true);
this.streamComments.on('item', async (item) => {
if (!this.commentsListedOnce) {
return;
if(item instanceof Submission) {
if(this.submissionChecks.length > 0) {
await this.runChecks('Submission', item);
}
} else if(this.commentChecks.length > 0) {
await this.runChecks('Comment', item)
}
await this.runChecks('Comment', item)
});
//this.streamComments.on('listing', (_) => this.logger.debug('Polled Comments'));
this.streams.push(stream);
}
this.running = true;
this.logger.info('Bot Running');
if (this.streamSub !== undefined) {
this.logger.info('Bot Running');
await pEvent(this.streamSub, 'end');
} else if (this.streamComments !== undefined) {
this.logger.info('Bot Running');
await pEvent(this.streamComments, 'end');
} else {
this.logger.warn('No submission or comment checks to run! Bot will not run.');
return;
}
const emitter = new EventEmitter();
await pEvent(emitter, 'end');
} catch (err) {
this.logger.error('Encountered unhandled error, manager is bailing out');
this.logger.error(err);

16
src/Subreddit/Streams.ts Normal file
View File

@@ -0,0 +1,16 @@
import { Poll, SnooStormOptions } from "snoostorm"
import Snoowrap from "snoowrap";
export class UnmoderatedStream extends Poll<
Snoowrap.Submission | Snoowrap.Comment
> {
constructor(
client: Snoowrap,
options: SnooStormOptions & { subreddit: string }) {
super({
frequency: options.pollTime || 20000,
get: async () => client.getSubreddit(options.subreddit).getUnmoderated(options),
identifier: "id",
});
}
}