Manager improvements

* Add configurable limit/poll time
* Wait until after first listing event before processing items so we only get new since bot start
This commit is contained in:
FoxxMD
2021-06-01 22:31:55 -04:00
parent ce0e03f310
commit bbb09bb9fb
4 changed files with 91 additions and 38 deletions

View File

@@ -49,7 +49,7 @@ export class RecentActivityRule extends Rule {
if (this.usePostAsReference) {
if (!(item instanceof Submission)) {
this.logger.debug('Cannot use post as reference because triggered item is not a Submission');
} else if (item.url === undefined) {
} else if (item.is_self) {
this.logger.debug('Cannot use post as reference because triggered Submission is not a link type');
} else {
const usableUrl = parseLink(await item.url);

View File

@@ -2,19 +2,37 @@ import Snoowrap, {Comment, Submission, Subreddit} from "snoowrap";
import {Logger} from "winston";
import {SubmissionCheck} from "../Check/SubmissionCheck";
import {CommentCheck} from "../Check/CommentCheck";
import {createLabelledLogger} from "../util";
import {createLabelledLogger, sleep} from "../util";
import {CommentStream, SubmissionStream} from "snoostorm";
import pEvent from "p-event";
export interface ManagerOptions {
submissions?: {
limit?: number,
interval?: number,
},
comments?: {
limit?: number,
interval?: number,
}
}
export class Manager {
subreddit: Subreddit;
client: Snoowrap;
logger: Logger;
pollOptions: ManagerOptions;
submissionChecks: SubmissionCheck[];
commentChecks: CommentCheck[];
constructor(sub: Subreddit, client: Snoowrap, subChecks: SubmissionCheck[], commentChecks: CommentCheck[]) {
subListedOnce = false;
streamSub?: SubmissionStream;
commentsListedOnce = false;
streamComments?: CommentStream;
constructor(sub: Subreddit, client: Snoowrap, subChecks: SubmissionCheck[], commentChecks: CommentCheck[], opts: ManagerOptions = {}) {
this.logger = createLabelledLogger(`Manager ${sub.display_name}`, `Manager ${sub.display_name}`);
this.pollOptions = opts;
this.subreddit = sub;
this.client = client;
this.submissionChecks = subChecks;
@@ -22,10 +40,10 @@ export class Manager {
this.logger.info(`Found Checks -- Submission: ${this.submissionChecks.length} | Comment: ${this.commentChecks.length}`);
}
async runChecks(checkType: ('Comment'|'Submission'), item: (Submission|Comment)): Promise<void> {
async runChecks(checkType: ('Comment' | 'Submission'), item: (Submission | Comment)): Promise<void> {
const checks = checkType === 'Comment' ? this.commentChecks : this.submissionChecks;
const itemId = await item.id;
for(const check of checks) {
for (const check of checks) {
this.logger.debug(`Running Check ${check.name} on ${checkType} (ID ${itemId})`);
let triggered = false;
try {
@@ -52,33 +70,56 @@ export class Manager {
}
async handle(): Promise<void> {
let subStream;
let cStream;
if (this.submissionChecks.length > 0) {
subStream = new SubmissionStream(this.client, {
const {
submissions: {
limit = 10,
interval = 10000,
} = {}
} = this.pollOptions
this.streamSub = new SubmissionStream(this.client, {
subreddit: this.subreddit.display_name,
limit: 10,
pollTime: 5000,
limit,
pollTime: interval,
});
// this.client.getSubmission('np85nc')
subStream.on('item', async (item) => await this.runChecks('Submission', item));
this.streamSub.once('listing', () => {
this.subListedOnce = true;
});
this.streamSub.on('item', async (item) => {
if(!this.subListedOnce) {
return;
}
await this.runChecks('Submission', item)
});
}
if (this.commentChecks.length > 0) {
cStream = new CommentStream(this.client, {
const {
comments: {
limit = 10,
interval = 10000,
} = {}
} = this.pollOptions
this.streamComments = new CommentStream(this.client, {
subreddit: this.subreddit.display_name,
limit: 10,
pollTime: 5000,
limit,
pollTime: interval,
});
this.streamComments.once('listing', () => this.commentsListedOnce = true);
this.streamComments.on('item', async (item) => {
if(!this.commentsListedOnce) {
return;
}
await this.runChecks('Comment', item)
});
cStream.on('item', async (item) => await this.runChecks('Comment', item));
}
if (subStream !== undefined) {
await pEvent(subStream, 'end');
} else if (cStream !== undefined) {
await pEvent(cStream, 'end');
if (this.streamSub !== undefined) {
await pEvent(this.streamSub, 'end');
} else if (this.streamComments !== undefined) {
await pEvent(this.streamComments, 'end');
} else {
this.logger.warn('No submission or comment checks to run!');
}

View File

@@ -61,6 +61,8 @@ winston.loggers.add('default', loggerOptions);
const logger = winston.loggers.get('default');
const version = process.env.VERSION || 'dev';
let subredditsArg = subredditsArgs;
if (subredditsArg.length === 0) {
// try to get from comma delim env variable
@@ -69,17 +71,17 @@ if (subredditsArg.length === 0) {
subredditsArg = subenv.split(',');
}
}
try {
(async function () {
const creds = {
userAgent: 'contextBot',
clientId,
clientSecret,
refreshToken,
accessToken,
};
(async function () {
const creds = {
userAgent: `web:contextBot:${version}`,
clientId,
clientSecret,
refreshToken,
accessToken,
};
try {
const client = new snoowrap(creds);
client.config({warnings: true, retryErrorCodes: [500], maxRetryAttempts: 2, debug: logLevel === 'debug'});
// const me = client.getMe().then(text => {
// console.log(text);
@@ -139,7 +141,7 @@ try {
try {
const builder = new ConfigBuilder({subreddit: sub});
const [subChecks, commentChecks] = builder.buildFromJson(json);
if(subChecks.length > 0 || commentChecks.length > 0) {
if (subChecks.length > 0 || commentChecks.length > 0) {
subSchedule.push(new Manager(sub, client, subChecks, commentChecks));
logger.info(`[${sub.display_name}] Found ${subChecks.length} submission checks and ${commentChecks.length} comment checks`);
}
@@ -150,14 +152,20 @@ try {
const emitter = new EventEmitter();
for(const manager of subSchedule) {
for (const manager of subSchedule) {
manager.handle();
}
// never hits so we can run indefinitely
await pEvent(emitter, 'end');
}());
} catch (err) {
debugger;
console.log(err);
}
} catch (err) {
if(err.name === 'StatusCodeError' && err.response !== undefined) {
const authHeader = err.response.headers['www-authenticate'];
if(authHeader !== undefined && authHeader.includes('insufficient_scope')) {
logger.error('Reddit responded with a 403 insufficient_scope, did you choose the correct scopes?');
}
}
debugger;
console.log(err);
}
}());

View File

@@ -105,3 +105,7 @@ export const parseUsableLinkIdentifier = (regexes: RegExp[] = [REGEX_YOUTUBE]) =
}
return val;
}
export function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}