Use winston stream for log streaming

This commit is contained in:
FoxxMD
2021-08-18 16:42:19 -04:00
parent c687ddbe57
commit 67a04c6cc6
2 changed files with 17 additions and 33 deletions

View File

@@ -6,7 +6,7 @@ import cookieParser from 'cookie-parser';
import CacheManagerStore from 'express-session-cache-manager'
import passport from 'passport';
import {Strategy as CustomStrategy} from 'passport-custom';
import {OperatorConfig, BotConnection, LogInfo} from "../../Common/interfaces";
import {OperatorConfig, BotConnection, LogInfo, StreamedLogInfo} from "../../Common/interfaces";
import {
createCacheManager, filterLogBySubreddit,
formatLogLineToHtml,
@@ -534,8 +534,7 @@ const webClient = async (options: OperatorConfig) => {
});
delim.on('data', (c: any) => {
const chunk = c.toString();
io.to(sessionId).emit('log', formatLogLineToHtml(chunk));
io.to(sessionId).emit('log', formatLogLineToHtml(c.toString()));
});
gotStream.once('retry', retryFn);

View File

@@ -7,6 +7,8 @@ import pEvent from "p-event";
import {getLogger} from "../../../../../Utils/loggerFactory";
import {booleanMiddle} from "../../../../Common/middleware";
import {authUserCheck, botRoute} from "../../../middleware";
import {LogInfo} from "../../../../../Common/interfaces";
import {MESSAGE} from "triple-beam";
// TODO update logs api
const logs = (subLogMap: Map<string, LogEntry[]>) => {
@@ -23,40 +25,25 @@ const logs = (subLogMap: Map<string, LogEntry[]>) => {
const logger = winston.loggers.get('app');
const {name: userName, realManagers = [], isOperator} = req.user as Express.User;
const {level = 'verbose', stream, limit = 200, sort = 'descending'} = req.query;
const {level = 'verbose', stream, limit = 200, sort = 'descending', streamObjects = false} = req.query;
if (stream) {
const userStream = new Transform({
transform(chunk, encoding, callback) {
const log = chunk.toString().slice(0, -1);
if (isLogLineMinLevel(log, level as string)) {
const subName = parseSubredditLogName(log);
if (isOperator || (subName !== undefined && (realManagers.includes(subName) || subName.includes(userName)))) {
callback(null, `${log}\r\n`);
} else {
callback(null);
}
} else {
callback(null);
}
}
});
userStream.on('end', () => {
console.log('user end');
});
const currTransport = new winston.transports.Stream({
stream: userStream,
});
logger.add(currTransport);
const origin = req.header('X-Forwarded-For') ?? req.header('host');
try {
//winstonStream.pipe(userStream, {end: false});
//logStream.pipe(userStream, {end: false});
logger.stream().on('log', (log: LogInfo) => {
if (isLogLineMinLevel(log, level as string)) {
const {subreddit: subName} = log;
if (isOperator || (subName !== undefined && (realManagers.includes(subName) || subName.includes(userName)))) {
if(streamObjects) {
res.write(`${JSON.stringify(log)}\r\n`);
} else {
res.write(`${log[MESSAGE]}\r\n`)
}
}
}
});
logger.info(`${userName} from ${origin} => CONNECTED`);
userStream.pipe(res, {end: false});
await pEvent(req, 'close');
console.log('Request closed detected with "close" listener');
userStream.end();
res.destroy();
return;
} catch (e) {
@@ -65,8 +52,6 @@ const logs = (subLogMap: Map<string, LogEntry[]>) => {
}
} finally {
logger.info(`${userName} from ${origin} => DISCONNECTED`);
logger.remove(currTransport);
userStream.end();
res.destroy();
}
} else {