From a63191e11d8056332fe1a57da766f7bba75f253b Mon Sep 17 00:00:00 2001 From: Sheen Capadngan Date: Wed, 4 Dec 2024 22:22:34 +0800 Subject: [PATCH 1/5] misc: use pg queue for audit logs when enabled --- backend/package-lock.json | 74 ++++++----- backend/package.json | 1 + .../ee/services/audit-log/audit-log-queue.ts | 117 ++++++++++++++++-- backend/src/lib/config/env.ts | 4 +- backend/src/main.ts | 9 +- backend/src/queue/queue-service.ts | 64 +++++++++- backend/src/server/routes/index.ts | 3 +- 7 files changed, 228 insertions(+), 44 deletions(-) diff --git a/backend/package-lock.json b/backend/package-lock.json index 2113d21a67..78a24d436e 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -92,6 +92,7 @@ "passport-google-oauth20": "^2.0.0", "passport-ldapauth": "^3.0.1", "pg": "^8.11.3", + "pg-boss": "^10.1.5", "pg-query-stream": "^4.5.3", "picomatch": "^3.0.1", "pino": "^8.16.2", @@ -12259,14 +12260,6 @@ "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==" }, - "node_modules/buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==", - "engines": { - "node": ">=4" - } - }, "node_modules/bullmq": { "version": "5.4.2", "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.4.2.tgz", @@ -18185,11 +18178,6 @@ "integrity": "sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==", "license": "BlueOak-1.0.0" }, - "node_modules/packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -18408,15 +18396,13 @@ "integrity": "sha512-KG8UEiEVkR3wGEb4m5yZkVCzigAD+cVEJck2CzYZO37ZGJfctvVptVO192MwrtPhzONn6go8ylnOdMhKqi4nfg==" }, "node_modules/pg": { - "version": "8.11.3", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.11.3.tgz", - "integrity": "sha512-+9iuvG8QfaaUrrph+kpF24cXkH1YOOUeArRNYIxq1viYHZagBxrTno7cecY1Fa44tJeZvaoG+Djpkc3JwehN5g==", + "version": "8.13.1", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.13.1.tgz", + "integrity": "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ==", "dependencies": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", - "pg-connection-string": "^2.6.2", - "pg-pool": "^3.6.1", - "pg-protocol": "^1.6.0", + "pg-connection-string": "^2.7.0", + "pg-pool": "^3.7.0", + "pg-protocol": "^1.7.0", "pg-types": "^2.1.0", "pgpass": "1.x" }, @@ -18435,6 +18421,19 @@ } } }, + "node_modules/pg-boss": { + "version": "10.1.5", + "resolved": "https://registry.npmjs.org/pg-boss/-/pg-boss-10.1.5.tgz", + "integrity": "sha512-H87NL6c7N6nTCSCePh16EaSQVSFevNXWdJuzY6PZz4rw+W/nuMKPfI/vYyXS0AdT1g1Q3S3EgeOYOHcB7ZVToQ==", + "dependencies": { + "cron-parser": "^4.9.0", + "pg": "^8.13.0", + "serialize-error": "^8.1.0" + }, + "engines": { + "node": ">=20" + } + }, "node_modules/pg-cloudflare": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz", @@ -18471,17 +18470,17 @@ } }, "node_modules/pg-pool": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.6.1.tgz", - "integrity": "sha512-jizsIzhkIitxCGfPRzJn1ZdcosIt3pz9Sh3V01fm1vZnbnCMgmGl5wvGGdNN2EL9Rmb0EcFoCkixH4Pu+sP9Og==", + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", + "integrity": "sha512-ZOBQForurqh4zZWjrgSwwAtzJ7QiRX0ovFkZr2klsen3Nm0aoh33Ls0fzfv3imeH/nw/O27cjdz5kzYJfeGp/g==", "peerDependencies": { "pg": ">=8.0" } }, "node_modules/pg-protocol": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.6.0.tgz", - "integrity": "sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==" + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.7.0.tgz", + "integrity": "sha512-hTK/mE36i8fDDhgDFjy6xNOG+LCorxLG3WO17tku+ij6sVHXh1jQUJ8hYAnRhNla4QVD2H8er/FOjc/+EgC6yQ==" }, "node_modules/pg-query-stream": { "version": "4.5.3", @@ -18510,9 +18509,9 @@ } }, "node_modules/pg/node_modules/pg-connection-string": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.6.2.tgz", - "integrity": "sha512-ch6OwaeaPYcova4kKZ15sbJ2hKb/VP48ZD2gE7i1J+L4MspCtBMAx8nMgz7bksc7IojCIIWuEhHibSMFH8m8oA==" + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.7.0.tgz", + "integrity": "sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA==" }, "node_modules/pgpass": { "version": "1.0.5", @@ -20111,6 +20110,20 @@ "resolved": "https://registry.npmjs.org/seq-queue/-/seq-queue-0.0.5.tgz", "integrity": "sha512-hr3Wtp/GZIc/6DAGPDcV4/9WoZhjrkXsi5B/07QgX8tsdc6ilr7BFM6PM6rbdAX1kFSDYeZGLipIZZKyQP0O5Q==" }, + "node_modules/serialize-error": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/serialize-error/-/serialize-error-8.1.0.tgz", + "integrity": "sha512-3NnuWfM6vBYoy5gZFvHiYsVbafvI9vZv/+jlIigFn4oP4zjNPK3LhcY0xSCgeb1a5L8jO71Mit9LlNoi2UfDDQ==", + "dependencies": { + "type-fest": "^0.20.2" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/serve-static": { "version": "1.16.2", "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.2.tgz", @@ -22130,7 +22143,6 @@ "version": "0.20.2", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", - "dev": true, "engines": { "node": ">=10" }, diff --git a/backend/package.json b/backend/package.json index 1aabdde2e5..29530cc3a9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -200,6 +200,7 @@ "passport-google-oauth20": "^2.0.0", "passport-ldapauth": "^3.0.1", "pg": "^8.11.3", + "pg-boss": "^10.1.5", "pg-query-stream": "^4.5.3", "picomatch": "^3.0.1", "pino": "^8.16.2", diff --git a/backend/src/ee/services/audit-log/audit-log-queue.ts b/backend/src/ee/services/audit-log/audit-log-queue.ts index 83a2fafa6f..50813317cb 100644 --- a/backend/src/ee/services/audit-log/audit-log-queue.ts +++ b/backend/src/ee/services/audit-log/audit-log-queue.ts @@ -1,6 +1,7 @@ import { RawAxiosRequestHeaders } from "axios"; import { SecretKeyEncoding } from "@app/db/schemas"; +import { getConfig } from "@app/lib/config/env"; import { request } from "@app/lib/config/request"; import { infisicalSymmetricDecrypt } from "@app/lib/crypto/encryption"; import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue"; @@ -20,12 +21,13 @@ type TAuditLogQueueServiceFactoryDep = { licenseService: Pick; }; -export type TAuditLogQueueServiceFactory = ReturnType; +export type TAuditLogQueueServiceFactory = Awaited>; // keep this timeout 5s it must be fast because else the queue will take time to finish // audit log is a crowded queue thus needs to be fast export const AUDIT_LOG_STREAM_TIMEOUT = 5 * 1000; -export const auditLogQueueServiceFactory = ({ + +export const auditLogQueueServiceFactory = async ({ auditLogDAL, queueService, projectDAL, @@ -33,14 +35,113 @@ export const auditLogQueueServiceFactory = ({ auditLogStreamDAL }: TAuditLogQueueServiceFactoryDep) => { const pushToLog = async (data: TCreateAuditLogDTO) => { - await queueService.queue(QueueName.AuditLog, QueueJobs.AuditLog, data, { - removeOnFail: { - count: 3 - }, - removeOnComplete: true - }); + const appCfg = getConfig(); + if (appCfg.USE_PG_QUEUE) { + await queueService.queuePg(QueueJobs.AuditLog, data, { + retryLimit: 10, + retryBackoff: true + }); + } else { + await queueService.queue(QueueName.AuditLog, QueueJobs.AuditLog, data, { + removeOnFail: { + count: 3 + }, + removeOnComplete: true + }); + } }; + await queueService.startPg( + QueueJobs.AuditLog, + async ([job]) => { + const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; + let { orgId } = job.data; + const MS_IN_DAY = 24 * 60 * 60 * 1000; + let project; + + if (!orgId) { + // it will never be undefined for both org and project id + // TODO(akhilmhdh): use caching here in dal to avoid db calls + project = await projectDAL.findById(projectId as string); + orgId = project.orgId; + } + + const plan = await licenseService.getPlan(orgId); + if (plan.auditLogsRetentionDays === 0) { + // skip inserting if audit log retention is 0 meaning its not supported + return; + } + + // For project actions, set TTL to project-level audit log retention config + // This condition ensures that the plan's audit log retention days cannot be bypassed + const ttlInDays = + project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays + ? project.auditLogsRetentionDays + : plan.auditLogsRetentionDays; + + const ttl = ttlInDays * MS_IN_DAY; + + const auditLog = await auditLogDAL.create({ + actor: actor.type, + actorMetadata: actor.metadata, + userAgent, + projectId, + projectName: project?.name, + ipAddress, + orgId, + eventType: event.type, + expiresAt: new Date(Date.now() + ttl), + eventMetadata: event.metadata, + userAgentType + }); + + const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; + await Promise.allSettled( + logStreams.map( + async ({ + url, + encryptedHeadersTag, + encryptedHeadersIV, + encryptedHeadersKeyEncoding, + encryptedHeadersCiphertext + }) => { + const streamHeaders = + encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag + ? (JSON.parse( + infisicalSymmetricDecrypt({ + keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, + iv: encryptedHeadersIV, + tag: encryptedHeadersTag, + ciphertext: encryptedHeadersCiphertext + }) + ) as LogStreamHeaders[]) + : []; + + const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; + + if (streamHeaders.length) + streamHeaders.forEach(({ key, value }) => { + headers[key] = value; + }); + + return request.post(url, auditLog, { + headers, + // request timeout + timeout: AUDIT_LOG_STREAM_TIMEOUT, + // connection timeout + signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) + }); + } + ) + ); + }, + { + batchSize: 1, + workerCount: 30, + pollingIntervalSeconds: 0.5 + } + ); + queueService.start(QueueName.AuditLog, async (job) => { const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; let { orgId } = job.data; diff --git a/backend/src/lib/config/env.ts b/backend/src/lib/config/env.ts index a9014a72b8..8a4cf07b30 100644 --- a/backend/src/lib/config/env.ts +++ b/backend/src/lib/config/env.ts @@ -178,7 +178,9 @@ const envSchema = z HSM_LIB_PATH: zpStr(z.string().optional()), HSM_PIN: zpStr(z.string().optional()), HSM_KEY_LABEL: zpStr(z.string().optional()), - HSM_SLOT: z.coerce.number().optional().default(0) + HSM_SLOT: z.coerce.number().optional().default(0), + + USE_PG_QUEUE: zodStrBool.default("false") }) // To ensure that basic encryption is always possible. .refine( diff --git a/backend/src/main.ts b/backend/src/main.ts index 7f62d6b1e8..1d105ebc75 100644 --- a/backend/src/main.ts +++ b/backend/src/main.ts @@ -55,7 +55,14 @@ const run = async () => { } const smtp = smtpServiceFactory(formatSmtpConfig()); - const queue = queueServiceFactory(appCfg.REDIS_URL); + + const queue = queueServiceFactory(appCfg.REDIS_URL, appCfg.DB_CONNECTION_URI); + + if (appCfg.USE_PG_QUEUE) { + logger.info("Initializing PG queue..."); + await queue.initialize(); + } + const keyStore = keyStoreFactory(appCfg.REDIS_URL); const hsmModule = initializeHsmModule(); diff --git a/backend/src/queue/queue-service.ts b/backend/src/queue/queue-service.ts index 457eebcc1c..85322efea9 100644 --- a/backend/src/queue/queue-service.ts +++ b/backend/src/queue/queue-service.ts @@ -1,5 +1,6 @@ import { Job, JobsOptions, Queue, QueueOptions, RepeatOptions, Worker, WorkerListener } from "bullmq"; import Redis from "ioredis"; +import PgBoss, { WorkOptions } from "pg-boss"; import { SecretEncryptionAlgo, SecretKeyEncoding } from "@app/db/schemas"; import { TCreateAuditLogDTO } from "@app/ee/services/audit-log/audit-log-types"; @@ -184,17 +185,31 @@ export type TQueueJobTypes = { }; export type TQueueServiceFactory = ReturnType; -export const queueServiceFactory = (redisUrl: string) => { +export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) => { const connection = new Redis(redisUrl, { maxRetriesPerRequest: null }); const queueContainer = {} as Record< QueueName, Queue >; + + const pgBoss = new PgBoss({ + connectionString: dbConnectionUrl, + archiveCompletedAfterSeconds: 30, + maintenanceIntervalSeconds: 30, + deleteAfterSeconds: 30 + }); + + const queueContainerPg = {} as Record; + const workerContainer = {} as Record< QueueName, Worker >; + const initialize = async () => { + return pgBoss.start(); + }; + const start = ( name: T, jobFn: (job: Job, token?: string) => Promise, @@ -215,6 +230,27 @@ export const queueServiceFactory = (redisUrl: string) => { }); }; + const startPg = async ( + jobName: QueueJobs, + jobsFn: (jobs: PgBoss.Job[]) => Promise, + options: WorkOptions & { + workerCount: number; + } + ) => { + if (queueContainerPg[jobName]) { + throw new Error(`${jobName} queue is already initialized`); + } + + await pgBoss.createQueue(jobName); + queueContainerPg[jobName] = true; + + await Promise.all( + Array.from({ length: options.workerCount }).map(() => + pgBoss.work(jobName, options, jobsFn) + ) + ); + }; + const listen = < T extends QueueName, U extends keyof WorkerListener @@ -238,6 +274,18 @@ export const queueServiceFactory = (redisUrl: string) => { await q.add(job, data, opts); }; + const queuePg = async ( + job: TQueueJobTypes[T]["name"], + data: TQueueJobTypes[T]["payload"], + opts?: PgBoss.SendOptions & { jobId?: string } + ) => { + await pgBoss.send({ + name: job, + data, + options: opts + }); + }; + const stopRepeatableJob = async ( name: T, job: TQueueJobTypes[T]["name"], @@ -274,5 +322,17 @@ export const queueServiceFactory = (redisUrl: string) => { await Promise.all(Object.values(workerContainer).map((worker) => worker.close())); }; - return { start, listen, queue, shutdown, stopRepeatableJob, stopRepeatableJobByJobId, clearQueue, stopJobById }; + return { + initialize, + start, + listen, + queue, + shutdown, + stopRepeatableJob, + stopRepeatableJobByJobId, + clearQueue, + stopJobById, + startPg, + queuePg + }; }; diff --git a/backend/src/server/routes/index.ts b/backend/src/server/routes/index.ts index b9f46627bb..4f07579bd6 100644 --- a/backend/src/server/routes/index.ts +++ b/backend/src/server/routes/index.ts @@ -394,13 +394,14 @@ export const registerRoutes = async ( permissionService }); - const auditLogQueue = auditLogQueueServiceFactory({ + const auditLogQueue = await auditLogQueueServiceFactory({ auditLogDAL, queueService, projectDAL, licenseService, auditLogStreamDAL }); + const auditLogService = auditLogServiceFactory({ auditLogDAL, permissionService, auditLogQueue }); const auditLogStreamService = auditLogStreamServiceFactory({ licenseService, From 11c96245a709c4be922b186b745ac91bd7cade44 Mon Sep 17 00:00:00 2001 From: Sheen Capadngan Date: Wed, 4 Dec 2024 22:27:07 +0800 Subject: [PATCH 2/5] misc: added error listener --- backend/src/queue/queue-service.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/backend/src/queue/queue-service.ts b/backend/src/queue/queue-service.ts index 85322efea9..123fc78097 100644 --- a/backend/src/queue/queue-service.ts +++ b/backend/src/queue/queue-service.ts @@ -8,6 +8,7 @@ import { TScanFullRepoEventPayload, TScanPushEventPayload } from "@app/ee/services/secret-scanning/secret-scanning-queue/secret-scanning-queue-types"; +import { logger } from "@app/lib/logger"; import { TFailedIntegrationSyncEmailsPayload, TIntegrationSyncPayload, @@ -207,7 +208,11 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) = >; const initialize = async () => { - return pgBoss.start(); + await pgBoss.start(); + + pgBoss.on("error", (error) => { + logger.error(error, "pg-queue error"); + }); }; const start = ( From a750f48922e8fb30c860c419f4a4a18e340d46b5 Mon Sep 17 00:00:00 2001 From: Sheen Capadngan Date: Wed, 4 Dec 2024 22:49:28 +0800 Subject: [PATCH 3/5] misc: finalized structure --- backend/e2e-test/mocks/queue.ts | 3 + .../ee/services/audit-log/audit-log-queue.ts | 167 +++++++++--------- backend/src/main.ts | 6 +- backend/src/queue/queue-service.ts | 17 +- 4 files changed, 100 insertions(+), 93 deletions(-) diff --git a/backend/e2e-test/mocks/queue.ts b/backend/e2e-test/mocks/queue.ts index c694979db8..0028381bde 100644 --- a/backend/e2e-test/mocks/queue.ts +++ b/backend/e2e-test/mocks/queue.ts @@ -10,12 +10,15 @@ export const mockQueue = (): TQueueServiceFactory => { queue: async (name, jobData) => { job[name] = jobData; }, + queuePg: async () => {}, + initialize: async () => {}, shutdown: async () => undefined, stopRepeatableJob: async () => true, start: (name, jobFn) => { queues[name] = jobFn; workers[name] = jobFn; }, + startPg: async () => {}, listen: (name, event) => { events[name] = event; }, diff --git a/backend/src/ee/services/audit-log/audit-log-queue.ts b/backend/src/ee/services/audit-log/audit-log-queue.ts index 50813317cb..a9e3229624 100644 --- a/backend/src/ee/services/audit-log/audit-log-queue.ts +++ b/backend/src/ee/services/audit-log/audit-log-queue.ts @@ -34,8 +34,9 @@ export const auditLogQueueServiceFactory = async ({ licenseService, auditLogStreamDAL }: TAuditLogQueueServiceFactoryDep) => { + const appCfg = getConfig(); + const pushToLog = async (data: TCreateAuditLogDTO) => { - const appCfg = getConfig(); if (appCfg.USE_PG_QUEUE) { await queueService.queuePg(QueueJobs.AuditLog, data, { retryLimit: 10, @@ -51,96 +52,98 @@ export const auditLogQueueServiceFactory = async ({ } }; - await queueService.startPg( - QueueJobs.AuditLog, - async ([job]) => { - const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; - let { orgId } = job.data; - const MS_IN_DAY = 24 * 60 * 60 * 1000; - let project; + if (appCfg.USE_PG_QUEUE) { + await queueService.startPg( + QueueJobs.AuditLog, + async ([job]) => { + const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; + let { orgId } = job.data; + const MS_IN_DAY = 24 * 60 * 60 * 1000; + let project; - if (!orgId) { - // it will never be undefined for both org and project id - // TODO(akhilmhdh): use caching here in dal to avoid db calls - project = await projectDAL.findById(projectId as string); - orgId = project.orgId; - } + if (!orgId) { + // it will never be undefined for both org and project id + // TODO(akhilmhdh): use caching here in dal to avoid db calls + project = await projectDAL.findById(projectId as string); + orgId = project.orgId; + } - const plan = await licenseService.getPlan(orgId); - if (plan.auditLogsRetentionDays === 0) { - // skip inserting if audit log retention is 0 meaning its not supported - return; - } + const plan = await licenseService.getPlan(orgId); + if (plan.auditLogsRetentionDays === 0) { + // skip inserting if audit log retention is 0 meaning its not supported + return; + } - // For project actions, set TTL to project-level audit log retention config - // This condition ensures that the plan's audit log retention days cannot be bypassed - const ttlInDays = - project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays - ? project.auditLogsRetentionDays - : plan.auditLogsRetentionDays; + // For project actions, set TTL to project-level audit log retention config + // This condition ensures that the plan's audit log retention days cannot be bypassed + const ttlInDays = + project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays + ? project.auditLogsRetentionDays + : plan.auditLogsRetentionDays; - const ttl = ttlInDays * MS_IN_DAY; + const ttl = ttlInDays * MS_IN_DAY; - const auditLog = await auditLogDAL.create({ - actor: actor.type, - actorMetadata: actor.metadata, - userAgent, - projectId, - projectName: project?.name, - ipAddress, - orgId, - eventType: event.type, - expiresAt: new Date(Date.now() + ttl), - eventMetadata: event.metadata, - userAgentType - }); + const auditLog = await auditLogDAL.create({ + actor: actor.type, + actorMetadata: actor.metadata, + userAgent, + projectId, + projectName: project?.name, + ipAddress, + orgId, + eventType: event.type, + expiresAt: new Date(Date.now() + ttl), + eventMetadata: event.metadata, + userAgentType + }); - const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; - await Promise.allSettled( - logStreams.map( - async ({ - url, - encryptedHeadersTag, - encryptedHeadersIV, - encryptedHeadersKeyEncoding, - encryptedHeadersCiphertext - }) => { - const streamHeaders = - encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag - ? (JSON.parse( - infisicalSymmetricDecrypt({ - keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, - iv: encryptedHeadersIV, - tag: encryptedHeadersTag, - ciphertext: encryptedHeadersCiphertext - }) - ) as LogStreamHeaders[]) - : []; + const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; + await Promise.allSettled( + logStreams.map( + async ({ + url, + encryptedHeadersTag, + encryptedHeadersIV, + encryptedHeadersKeyEncoding, + encryptedHeadersCiphertext + }) => { + const streamHeaders = + encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag + ? (JSON.parse( + infisicalSymmetricDecrypt({ + keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, + iv: encryptedHeadersIV, + tag: encryptedHeadersTag, + ciphertext: encryptedHeadersCiphertext + }) + ) as LogStreamHeaders[]) + : []; - const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; + const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; - if (streamHeaders.length) - streamHeaders.forEach(({ key, value }) => { - headers[key] = value; + if (streamHeaders.length) + streamHeaders.forEach(({ key, value }) => { + headers[key] = value; + }); + + return request.post(url, auditLog, { + headers, + // request timeout + timeout: AUDIT_LOG_STREAM_TIMEOUT, + // connection timeout + signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) }); - - return request.post(url, auditLog, { - headers, - // request timeout - timeout: AUDIT_LOG_STREAM_TIMEOUT, - // connection timeout - signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) - }); - } - ) - ); - }, - { - batchSize: 1, - workerCount: 30, - pollingIntervalSeconds: 0.5 - } - ); + } + ) + ); + }, + { + batchSize: 1, + workerCount: 30, + pollingIntervalSeconds: 0.5 + } + ); + } queueService.start(QueueName.AuditLog, async (job) => { const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; diff --git a/backend/src/main.ts b/backend/src/main.ts index 1d105ebc75..8e36029747 100644 --- a/backend/src/main.ts +++ b/backend/src/main.ts @@ -57,11 +57,7 @@ const run = async () => { const smtp = smtpServiceFactory(formatSmtpConfig()); const queue = queueServiceFactory(appCfg.REDIS_URL, appCfg.DB_CONNECTION_URI); - - if (appCfg.USE_PG_QUEUE) { - logger.info("Initializing PG queue..."); - await queue.initialize(); - } + await queue.initialize(); const keyStore = keyStoreFactory(appCfg.REDIS_URL); diff --git a/backend/src/queue/queue-service.ts b/backend/src/queue/queue-service.ts index 123fc78097..3205c8c945 100644 --- a/backend/src/queue/queue-service.ts +++ b/backend/src/queue/queue-service.ts @@ -8,6 +8,7 @@ import { TScanFullRepoEventPayload, TScanPushEventPayload } from "@app/ee/services/secret-scanning/secret-scanning-queue/secret-scanning-queue-types"; +import { getConfig } from "@app/lib/config/env"; import { logger } from "@app/lib/logger"; import { TFailedIntegrationSyncEmailsPayload, @@ -195,8 +196,8 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) = const pgBoss = new PgBoss({ connectionString: dbConnectionUrl, - archiveCompletedAfterSeconds: 30, - maintenanceIntervalSeconds: 30, + archiveCompletedAfterSeconds: 60, + archiveFailedAfterSeconds: 1000, // we want to keep failed jobs for a longer time so that it can be retried deleteAfterSeconds: 30 }); @@ -208,11 +209,15 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) = >; const initialize = async () => { - await pgBoss.start(); + const appCfg = getConfig(); + if (appCfg.USE_PG_QUEUE) { + logger.info("Initializing PG queue..."); + await pgBoss.start(); - pgBoss.on("error", (error) => { - logger.error(error, "pg-queue error"); - }); + pgBoss.on("error", (error) => { + logger.error(error, "pg-queue error"); + }); + } }; const start = ( From 32d6826ade15bf1f17ef07a1173014291ab01d24 Mon Sep 17 00:00:00 2001 From: Sheen Capadngan Date: Wed, 4 Dec 2024 22:52:30 +0800 Subject: [PATCH 4/5] fix: resolve e2e --- backend/e2e-test/vitest-environment-knex.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/e2e-test/vitest-environment-knex.ts b/backend/e2e-test/vitest-environment-knex.ts index 866b0f45ff..9ca485236e 100644 --- a/backend/e2e-test/vitest-environment-knex.ts +++ b/backend/e2e-test/vitest-environment-knex.ts @@ -53,7 +53,7 @@ export default { extension: "ts" }); const smtp = mockSmtpServer(); - const queue = queueServiceFactory(cfg.REDIS_URL); + const queue = queueServiceFactory(cfg.REDIS_URL, cfg.DB_CONNECTION_URI); const keyStore = keyStoreFactory(cfg.REDIS_URL); const hsmModule = initializeHsmModule(); From 0842901d4f24894b5599fd59fc00d592950c091d Mon Sep 17 00:00:00 2001 From: Sheen Capadngan Date: Wed, 4 Dec 2024 23:21:37 +0800 Subject: [PATCH 5/5] misc: always initialize pg-boss --- .../ee/services/audit-log/audit-log-queue.ts | 178 +++++++++--------- backend/src/queue/queue-service.ts | 13 +- 2 files changed, 92 insertions(+), 99 deletions(-) diff --git a/backend/src/ee/services/audit-log/audit-log-queue.ts b/backend/src/ee/services/audit-log/audit-log-queue.ts index a9e3229624..a1c35bc403 100644 --- a/backend/src/ee/services/audit-log/audit-log-queue.ts +++ b/backend/src/ee/services/audit-log/audit-log-queue.ts @@ -52,98 +52,96 @@ export const auditLogQueueServiceFactory = async ({ } }; - if (appCfg.USE_PG_QUEUE) { - await queueService.startPg( - QueueJobs.AuditLog, - async ([job]) => { - const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; - let { orgId } = job.data; - const MS_IN_DAY = 24 * 60 * 60 * 1000; - let project; + await queueService.startPg( + QueueJobs.AuditLog, + async ([job]) => { + const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; + let { orgId } = job.data; + const MS_IN_DAY = 24 * 60 * 60 * 1000; + let project; - if (!orgId) { - // it will never be undefined for both org and project id - // TODO(akhilmhdh): use caching here in dal to avoid db calls - project = await projectDAL.findById(projectId as string); - orgId = project.orgId; - } - - const plan = await licenseService.getPlan(orgId); - if (plan.auditLogsRetentionDays === 0) { - // skip inserting if audit log retention is 0 meaning its not supported - return; - } - - // For project actions, set TTL to project-level audit log retention config - // This condition ensures that the plan's audit log retention days cannot be bypassed - const ttlInDays = - project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays - ? project.auditLogsRetentionDays - : plan.auditLogsRetentionDays; - - const ttl = ttlInDays * MS_IN_DAY; - - const auditLog = await auditLogDAL.create({ - actor: actor.type, - actorMetadata: actor.metadata, - userAgent, - projectId, - projectName: project?.name, - ipAddress, - orgId, - eventType: event.type, - expiresAt: new Date(Date.now() + ttl), - eventMetadata: event.metadata, - userAgentType - }); - - const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; - await Promise.allSettled( - logStreams.map( - async ({ - url, - encryptedHeadersTag, - encryptedHeadersIV, - encryptedHeadersKeyEncoding, - encryptedHeadersCiphertext - }) => { - const streamHeaders = - encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag - ? (JSON.parse( - infisicalSymmetricDecrypt({ - keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, - iv: encryptedHeadersIV, - tag: encryptedHeadersTag, - ciphertext: encryptedHeadersCiphertext - }) - ) as LogStreamHeaders[]) - : []; - - const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; - - if (streamHeaders.length) - streamHeaders.forEach(({ key, value }) => { - headers[key] = value; - }); - - return request.post(url, auditLog, { - headers, - // request timeout - timeout: AUDIT_LOG_STREAM_TIMEOUT, - // connection timeout - signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) - }); - } - ) - ); - }, - { - batchSize: 1, - workerCount: 30, - pollingIntervalSeconds: 0.5 + if (!orgId) { + // it will never be undefined for both org and project id + // TODO(akhilmhdh): use caching here in dal to avoid db calls + project = await projectDAL.findById(projectId as string); + orgId = project.orgId; } - ); - } + + const plan = await licenseService.getPlan(orgId); + if (plan.auditLogsRetentionDays === 0) { + // skip inserting if audit log retention is 0 meaning its not supported + return; + } + + // For project actions, set TTL to project-level audit log retention config + // This condition ensures that the plan's audit log retention days cannot be bypassed + const ttlInDays = + project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays + ? project.auditLogsRetentionDays + : plan.auditLogsRetentionDays; + + const ttl = ttlInDays * MS_IN_DAY; + + const auditLog = await auditLogDAL.create({ + actor: actor.type, + actorMetadata: actor.metadata, + userAgent, + projectId, + projectName: project?.name, + ipAddress, + orgId, + eventType: event.type, + expiresAt: new Date(Date.now() + ttl), + eventMetadata: event.metadata, + userAgentType + }); + + const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; + await Promise.allSettled( + logStreams.map( + async ({ + url, + encryptedHeadersTag, + encryptedHeadersIV, + encryptedHeadersKeyEncoding, + encryptedHeadersCiphertext + }) => { + const streamHeaders = + encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag + ? (JSON.parse( + infisicalSymmetricDecrypt({ + keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, + iv: encryptedHeadersIV, + tag: encryptedHeadersTag, + ciphertext: encryptedHeadersCiphertext + }) + ) as LogStreamHeaders[]) + : []; + + const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; + + if (streamHeaders.length) + streamHeaders.forEach(({ key, value }) => { + headers[key] = value; + }); + + return request.post(url, auditLog, { + headers, + // request timeout + timeout: AUDIT_LOG_STREAM_TIMEOUT, + // connection timeout + signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) + }); + } + ) + ); + }, + { + batchSize: 1, + workerCount: 30, + pollingIntervalSeconds: 0.5 + } + ); queueService.start(QueueName.AuditLog, async (job) => { const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; diff --git a/backend/src/queue/queue-service.ts b/backend/src/queue/queue-service.ts index 3205c8c945..f18562513c 100644 --- a/backend/src/queue/queue-service.ts +++ b/backend/src/queue/queue-service.ts @@ -8,7 +8,6 @@ import { TScanFullRepoEventPayload, TScanPushEventPayload } from "@app/ee/services/secret-scanning/secret-scanning-queue/secret-scanning-queue-types"; -import { getConfig } from "@app/lib/config/env"; import { logger } from "@app/lib/logger"; import { TFailedIntegrationSyncEmailsPayload, @@ -209,15 +208,11 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) = >; const initialize = async () => { - const appCfg = getConfig(); - if (appCfg.USE_PG_QUEUE) { - logger.info("Initializing PG queue..."); - await pgBoss.start(); + await pgBoss.start(); - pgBoss.on("error", (error) => { - logger.error(error, "pg-queue error"); - }); - } + pgBoss.on("error", (error) => { + logger.error(error, "pg-queue error"); + }); }; const start = (