mirror of
https://github.com/Infisical/infisical.git
synced 2026-01-09 15:38:03 -05:00
feat: added cache versioning to sql for geo invalidation
This commit is contained in:
@@ -56,6 +56,15 @@ export const mockKeyStore = (): TKeyStoreFactory => {
|
||||
incrementBy: async () => {
|
||||
return 1;
|
||||
},
|
||||
pgGetIntItem: async (key) => {
|
||||
const value = store[key];
|
||||
if (typeof value === "number") {
|
||||
return Number(value);
|
||||
}
|
||||
},
|
||||
pgIncrementBy: async () => {
|
||||
return 1;
|
||||
},
|
||||
getItems: async (keys) => {
|
||||
const values = keys.map((key) => {
|
||||
const value = store[key];
|
||||
|
||||
@@ -15,6 +15,7 @@ import { mockSmtpServer } from "./mocks/smtp";
|
||||
import { initDbConnection } from "@app/db";
|
||||
import { queueServiceFactory } from "@app/queue";
|
||||
import { keyStoreFactory } from "@app/keystore/keystore";
|
||||
import { keyValueStoreDALFactory } from "@app/keystore/key-value-store-dal";
|
||||
import { initializeHsmModule } from "@app/ee/services/hsm/hsm-fns";
|
||||
import { buildRedisFromConfig } from "@app/lib/config/redis";
|
||||
import { superAdminDALFactory } from "@app/services/super-admin/super-admin-dal";
|
||||
@@ -62,7 +63,8 @@ export default {
|
||||
|
||||
const smtp = mockSmtpServer();
|
||||
const queue = queueServiceFactory(envCfg, { dbConnectionUrl: envCfg.DB_CONNECTION_URI });
|
||||
const keyStore = keyStoreFactory(envCfg);
|
||||
const keyValueStoreDAL = keyValueStoreDALFactory(db);
|
||||
const keyStore = keyStoreFactory(envCfg, keyValueStoreDAL);
|
||||
|
||||
await queue.initialize();
|
||||
|
||||
|
||||
8
backend/src/@types/knex.d.ts
vendored
8
backend/src/@types/knex.d.ts
vendored
@@ -191,6 +191,9 @@ import {
|
||||
TInternalKms,
|
||||
TInternalKmsInsert,
|
||||
TInternalKmsUpdate,
|
||||
TKeyValueStore,
|
||||
TKeyValueStoreInsert,
|
||||
TKeyValueStoreUpdate,
|
||||
TKmipClientCertificates,
|
||||
TKmipClientCertificatesInsert,
|
||||
TKmipClientCertificatesUpdate,
|
||||
@@ -1264,5 +1267,10 @@ declare module "knex/types/tables" {
|
||||
TUserNotificationsInsert,
|
||||
TUserNotificationsUpdate
|
||||
>;
|
||||
[TableName.KeyValueStore]: KnexOriginal.CompositeTableType<
|
||||
TKeyValueStore,
|
||||
TKeyValueStoreInsert,
|
||||
TKeyValueStoreUpdate
|
||||
>;
|
||||
}
|
||||
}
|
||||
|
||||
17
backend/src/db/migrations/20250908193226_sql-cache_int.ts
Normal file
17
backend/src/db/migrations/20250908193226_sql-cache_int.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { Knex } from "knex";
|
||||
|
||||
import { TableName } from "../schemas";
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
if (!(await knex.schema.hasTable(TableName.KeyValueStore))) {
|
||||
await knex.schema.createTable(TableName.KeyValueStore, (t) => {
|
||||
t.text("key").primary();
|
||||
t.bigint("integerValue");
|
||||
t.datetime("expiresAt");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
await knex.schema.dropTableIfExists(TableName.KeyValueStore);
|
||||
}
|
||||
@@ -61,6 +61,7 @@ export * from "./integration-auths";
|
||||
export * from "./integrations";
|
||||
export * from "./internal-certificate-authorities";
|
||||
export * from "./internal-kms";
|
||||
export * from "./key-value-store";
|
||||
export * from "./kmip-client-certificates";
|
||||
export * from "./kmip-clients";
|
||||
export * from "./kmip-org-configs";
|
||||
|
||||
18
backend/src/db/schemas/key-value-store.ts
Normal file
18
backend/src/db/schemas/key-value-store.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
// Code generated by automation script, DO NOT EDIT.
|
||||
// Automated by pulling database and generating zod schema
|
||||
// To update. Just run npm run generate:schema
|
||||
// Written by akhilmhdh.
|
||||
|
||||
import { z } from "zod";
|
||||
|
||||
import { TImmutableDBKeys } from "./models";
|
||||
|
||||
export const KeyValueStoreSchema = z.object({
|
||||
key: z.string(),
|
||||
integerValue: z.coerce.number().nullable().optional(),
|
||||
expiresAt: z.date().nullable().optional()
|
||||
});
|
||||
|
||||
export type TKeyValueStore = z.infer<typeof KeyValueStoreSchema>;
|
||||
export type TKeyValueStoreInsert = Omit<z.input<typeof KeyValueStoreSchema>, TImmutableDBKeys>;
|
||||
export type TKeyValueStoreUpdate = Partial<Omit<z.input<typeof KeyValueStoreSchema>, TImmutableDBKeys>>;
|
||||
@@ -179,7 +179,8 @@ export enum TableName {
|
||||
SecretScanningConfig = "secret_scanning_configs",
|
||||
// reminders
|
||||
Reminder = "reminders",
|
||||
ReminderRecipient = "reminders_recipients"
|
||||
ReminderRecipient = "reminders_recipients",
|
||||
KeyValueStore = "key_value_store"
|
||||
}
|
||||
|
||||
export type TImmutableDBKeys = "id" | "createdAt" | "updatedAt" | "commitId";
|
||||
|
||||
@@ -976,6 +976,7 @@ export const secretApprovalRequestServiceFactory = ({
|
||||
},
|
||||
tx
|
||||
);
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return {
|
||||
secrets: { created: newSecrets, updated: updatedSecrets, deleted: deletedSecret },
|
||||
approval: updatedSecretApproval
|
||||
@@ -983,7 +984,6 @@ export const secretApprovalRequestServiceFactory = ({
|
||||
});
|
||||
}
|
||||
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
const [folder] = await folderDAL.findSecretPathByFolderIds(projectId, [folderId]);
|
||||
if (!folder) {
|
||||
|
||||
@@ -509,9 +509,9 @@ export const secretReplicationServiceFactory = ({
|
||||
tx
|
||||
);
|
||||
}
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
});
|
||||
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await secretQueueService.syncSecrets({
|
||||
projectId,
|
||||
orgId,
|
||||
|
||||
@@ -361,9 +361,8 @@ export const secretRotationQueueFactory = ({
|
||||
},
|
||||
tx
|
||||
);
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(secretRotation.projectId, tx);
|
||||
});
|
||||
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(secretRotation.projectId);
|
||||
} else {
|
||||
if (!botKey)
|
||||
throw new NotFoundError({
|
||||
|
||||
91
backend/src/keystore/key-value-store-dal.ts
Normal file
91
backend/src/keystore/key-value-store-dal.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import { Knex } from "knex";
|
||||
|
||||
import { TDbClient } from "@app/db";
|
||||
import { TableName } from "@app/db/schemas";
|
||||
import { ormify, TOrmify } from "@app/lib/knex";
|
||||
import { logger } from "@app/lib/logger";
|
||||
import { QueueName } from "@app/queue";
|
||||
|
||||
export interface TKeyValueStoreDALFactory extends TOrmify<TableName.KeyValueStore> {
|
||||
incrementBy: (key: string, dto: { incr?: number; tx?: Knex; expiresAt?: Date }) => Promise<number>;
|
||||
findOneInt: (key: string, tx?: Knex) => Promise<number | undefined>;
|
||||
pruneExpiredKeys: () => Promise<void>;
|
||||
}
|
||||
|
||||
const QUERY_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes
|
||||
const CACHE_KEY_PRUNE_BATCH_SIZE = 10000;
|
||||
const MAX_RETRY_ON_FAILURE = 3;
|
||||
|
||||
export const keyValueStoreDALFactory = (db: TDbClient): TKeyValueStoreDALFactory => {
|
||||
const keyValueStoreOrm = ormify(db, TableName.KeyValueStore);
|
||||
|
||||
const incrementBy: TKeyValueStoreDALFactory["incrementBy"] = (key, { incr = 1, tx, expiresAt }) => {
|
||||
return (tx || db)(TableName.KeyValueStore)
|
||||
.insert({ key, integerValue: 1, expiresAt })
|
||||
.onConflict("key")
|
||||
.merge({
|
||||
integerValue: db.raw(`"${TableName.KeyValueStore}"."integerValue" + ?`, [incr]),
|
||||
expiresAt
|
||||
})
|
||||
.returning("integerValue");
|
||||
};
|
||||
|
||||
const findOneInt: TKeyValueStoreDALFactory["findOneInt"] = async (key, tx) => {
|
||||
const doc = await (tx || db.replicaNode())(TableName.KeyValueStore)
|
||||
.where({ key })
|
||||
.andWhere(
|
||||
(builder) =>
|
||||
void builder
|
||||
.whereNull("expiresAt") // no expiry
|
||||
.orWhere("expiresAt", ">", db.fn.now()) // or not expired
|
||||
)
|
||||
.first()
|
||||
.select("integerValue");
|
||||
return Number(doc?.integerValue || 0);
|
||||
};
|
||||
|
||||
// delete all audit log that have expired
|
||||
const pruneExpiredKeys: TKeyValueStoreDALFactory["pruneExpiredKeys"] = async () => {
|
||||
let deletedIds: { key: string }[] = [];
|
||||
let numberOfRetryOnFailure = 0;
|
||||
let isRetrying = false;
|
||||
|
||||
logger.info(`${QueueName.DailyResourceCleanUp}: db key value store clean up started`);
|
||||
do {
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
deletedIds = await db.transaction(async (trx) => {
|
||||
await trx.raw(`SET statement_timeout = ${QUERY_TIMEOUT_MS}`);
|
||||
|
||||
const findExpiredKeysSubQuery = trx(TableName.KeyValueStore)
|
||||
.where("expiresAt", "<", db.fn.now())
|
||||
.select("key")
|
||||
.limit(CACHE_KEY_PRUNE_BATCH_SIZE);
|
||||
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const results = await trx(TableName.KeyValueStore)
|
||||
.whereIn("key", findExpiredKeysSubQuery)
|
||||
.del()
|
||||
.returning("key");
|
||||
|
||||
return results;
|
||||
});
|
||||
|
||||
numberOfRetryOnFailure = 0; // reset
|
||||
} catch (error) {
|
||||
numberOfRetryOnFailure += 1;
|
||||
deletedIds = [];
|
||||
logger.error(error, "Failed to clean up db key value");
|
||||
} finally {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await new Promise((resolve) => {
|
||||
setTimeout(resolve, 10); // time to breathe for db
|
||||
});
|
||||
}
|
||||
isRetrying = numberOfRetryOnFailure > 0;
|
||||
} while (deletedIds.length > 0 || (isRetrying && numberOfRetryOnFailure < MAX_RETRY_ON_FAILURE));
|
||||
logger.info(`${QueueName.DailyResourceCleanUp}: db key value store clean up completed`);
|
||||
};
|
||||
|
||||
return { ...keyValueStoreOrm, incrementBy, findOneInt, pruneExpiredKeys };
|
||||
};
|
||||
@@ -1,11 +1,15 @@
|
||||
import { Cluster, Redis } from "ioredis";
|
||||
import { Knex } from "knex";
|
||||
|
||||
import { buildRedisFromConfig, TRedisConfigKeys } from "@app/lib/config/redis";
|
||||
import { pgAdvisoryLockHashText } from "@app/lib/crypto/hashtext";
|
||||
import { applyJitter } from "@app/lib/dates";
|
||||
import { delay as delayMs } from "@app/lib/delay";
|
||||
import { ms } from "@app/lib/ms";
|
||||
import { ExecutionResult, Redlock, Settings } from "@app/lib/red-lock";
|
||||
|
||||
import { TKeyValueStoreDALFactory } from "./key-value-store-dal";
|
||||
|
||||
export const PgSqlLock = {
|
||||
BootUpMigration: 2023,
|
||||
SuperAdminInit: 2024,
|
||||
@@ -95,13 +99,17 @@ export type TKeyStoreFactory = {
|
||||
deleteItemsByKeyIn: (keys: string[]) => Promise<number>;
|
||||
deleteItems: (arg: TDeleteItems) => Promise<number>;
|
||||
incrementBy: (key: string, value: number) => Promise<number>;
|
||||
getKeysByPattern: (pattern: string, limit?: number) => Promise<string[]>;
|
||||
// pg
|
||||
pgIncrementBy: (key: string, dto: { incr?: number; expiry?: string; tx?: Knex }) => Promise<number>;
|
||||
pgGetIntItem: (key: string, prefix?: string) => Promise<number | undefined>;
|
||||
// locks
|
||||
acquireLock(
|
||||
resources: string[],
|
||||
duration: number,
|
||||
settings?: Partial<Settings>
|
||||
): Promise<{ release: () => Promise<ExecutionResult> }>;
|
||||
waitTillReady: ({ key, waitingCb, keyCheckCb, waitIteration, delay, jitter }: TWaitTillReady) => Promise<void>;
|
||||
getKeysByPattern: (pattern: string, limit?: number) => Promise<string[]>;
|
||||
};
|
||||
|
||||
const pickPrimaryOrSecondaryRedis = (primary: Redis | Cluster, secondaries?: Array<Redis | Cluster>) => {
|
||||
@@ -114,7 +122,10 @@ interface TKeyStoreFactoryDTO extends TRedisConfigKeys {
|
||||
REDIS_READ_REPLICAS?: { host: string; port: number }[];
|
||||
}
|
||||
|
||||
export const keyStoreFactory = (redisConfigKeys: TKeyStoreFactoryDTO): TKeyStoreFactory => {
|
||||
export const keyStoreFactory = (
|
||||
redisConfigKeys: TKeyStoreFactoryDTO,
|
||||
keyValueStoreDAL: TKeyValueStoreDALFactory
|
||||
): TKeyStoreFactory => {
|
||||
const primaryRedis = buildRedisFromConfig(redisConfigKeys);
|
||||
const redisReadReplicas = redisConfigKeys.REDIS_READ_REPLICAS?.map((el) => {
|
||||
if (redisConfigKeys.REDIS_URL) {
|
||||
@@ -189,29 +200,6 @@ export const keyStoreFactory = (redisConfigKeys: TKeyStoreFactoryDTO): TKeyStore
|
||||
|
||||
const setExpiry = async (key: string, expiryInSeconds: number) => primaryRedis.expire(key, expiryInSeconds);
|
||||
|
||||
const waitTillReady = async ({
|
||||
key,
|
||||
waitingCb,
|
||||
keyCheckCb,
|
||||
waitIteration = 10,
|
||||
delay = 1000,
|
||||
jitter = 200
|
||||
}: TWaitTillReady) => {
|
||||
let attempts = 0;
|
||||
let isReady = keyCheckCb(await getItem(key));
|
||||
while (!isReady) {
|
||||
if (attempts > waitIteration) return;
|
||||
// eslint-disable-next-line
|
||||
await new Promise((resolve) => {
|
||||
waitingCb?.();
|
||||
setTimeout(resolve, Math.max(0, applyJitter(delay, jitter)));
|
||||
});
|
||||
attempts += 1;
|
||||
// eslint-disable-next-line
|
||||
isReady = keyCheckCb(await getItem(key));
|
||||
}
|
||||
};
|
||||
|
||||
const getKeysByPattern = async (pattern: string, limit?: number) => {
|
||||
let cursor = "0";
|
||||
const allKeys: string[] = [];
|
||||
@@ -236,6 +224,37 @@ export const keyStoreFactory = (redisConfigKeys: TKeyStoreFactoryDTO): TKeyStore
|
||||
return allKeys;
|
||||
};
|
||||
|
||||
const pgIncrementBy: TKeyStoreFactory["pgIncrementBy"] = async (key, { incr = 1, tx, expiry }) => {
|
||||
const expiresAt = expiry ? new Date(Date.now() + ms(expiry)) : undefined;
|
||||
return keyValueStoreDAL.incrementBy(key, { incr, expiresAt, tx });
|
||||
};
|
||||
|
||||
const pgGetIntItem = async (key: string, prefix?: string) =>
|
||||
keyValueStoreDAL.findOneInt(prefix ? `${prefix}:${key}` : key);
|
||||
|
||||
const waitTillReady = async ({
|
||||
key,
|
||||
waitingCb,
|
||||
keyCheckCb,
|
||||
waitIteration = 10,
|
||||
delay = 1000,
|
||||
jitter = 200
|
||||
}: TWaitTillReady) => {
|
||||
let attempts = 0;
|
||||
let isReady = keyCheckCb(await getItem(key));
|
||||
while (!isReady) {
|
||||
if (attempts > waitIteration) return;
|
||||
// eslint-disable-next-line
|
||||
await new Promise((resolve) => {
|
||||
waitingCb?.();
|
||||
setTimeout(resolve, Math.max(0, applyJitter(delay, jitter)));
|
||||
});
|
||||
attempts += 1;
|
||||
// eslint-disable-next-line
|
||||
isReady = keyCheckCb(await getItem(key));
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
setItem,
|
||||
getItem,
|
||||
@@ -250,6 +269,8 @@ export const keyStoreFactory = (redisConfigKeys: TKeyStoreFactoryDTO): TKeyStore
|
||||
waitTillReady,
|
||||
getKeysByPattern,
|
||||
deleteItemsByKeyIn,
|
||||
getItems
|
||||
getItems,
|
||||
pgGetIntItem,
|
||||
pgIncrementBy
|
||||
};
|
||||
};
|
||||
|
||||
@@ -53,6 +53,15 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
|
||||
}
|
||||
return null;
|
||||
},
|
||||
pgGetIntItem: async (key) => {
|
||||
const value = store[key];
|
||||
if (typeof value === "number") {
|
||||
return Number(value);
|
||||
}
|
||||
},
|
||||
pgIncrementBy: async () => {
|
||||
return 1;
|
||||
},
|
||||
incrementBy: async () => {
|
||||
return 1;
|
||||
},
|
||||
|
||||
@@ -410,6 +410,7 @@ const envSchema = z
|
||||
Boolean(data.INF_APP_CONNECTION_GITHUB_RADAR_APP_CLIENT_ID) &&
|
||||
Boolean(data.INF_APP_CONNECTION_GITHUB_RADAR_APP_CLIENT_SECRET) &&
|
||||
Boolean(data.INF_APP_CONNECTION_GITHUB_RADAR_APP_WEBHOOK_SECRET),
|
||||
isSecondaryInstance: Boolean(data.INFISICAL_PRIMARY_INSTANCE_URL),
|
||||
isHsmConfigured:
|
||||
Boolean(data.HSM_LIB_PATH) && Boolean(data.HSM_PIN) && Boolean(data.HSM_KEY_LABEL) && data.HSM_SLOT !== undefined,
|
||||
samlDefaultOrgSlug: data.DEFAULT_SAML_ORG_SLUG,
|
||||
|
||||
@@ -250,12 +250,12 @@ export const ormify = <DbOps extends object, Tname extends keyof Tables>(
|
||||
.returning("*");
|
||||
if ($incr) {
|
||||
Object.entries($incr).forEach(([incrementField, incrementValue]) => {
|
||||
void query.increment(incrementField, incrementValue);
|
||||
void query.increment(incrementField, incrementValue as number);
|
||||
});
|
||||
}
|
||||
if ($decr) {
|
||||
Object.entries($decr).forEach(([incrementField, incrementValue]) => {
|
||||
void query.decrement(incrementField, incrementValue);
|
||||
void query.decrement(incrementField, incrementValue as number);
|
||||
});
|
||||
}
|
||||
const [docs] = await query;
|
||||
@@ -273,12 +273,12 @@ export const ormify = <DbOps extends object, Tname extends keyof Tables>(
|
||||
// increment and decrement operation in update
|
||||
if ($incr) {
|
||||
Object.entries($incr).forEach(([incrementField, incrementValue]) => {
|
||||
void query.increment(incrementField, incrementValue);
|
||||
void query.increment(incrementField, incrementValue as number);
|
||||
});
|
||||
}
|
||||
if ($decr) {
|
||||
Object.entries($decr).forEach(([incrementField, incrementValue]) => {
|
||||
void query.increment(incrementField, incrementValue);
|
||||
void query.increment(incrementField, incrementValue as number);
|
||||
});
|
||||
}
|
||||
return (await query) as Tables[Tname]["base"][];
|
||||
|
||||
@@ -5,6 +5,7 @@ import "./lib/telemetry/instrumentation";
|
||||
import dotenv from "dotenv";
|
||||
|
||||
import { initializeHsmModule } from "@app/ee/services/hsm/hsm-fns";
|
||||
import { keyValueStoreDALFactory } from "@app/keystore/key-value-store-dal";
|
||||
|
||||
import { runMigrations } from "./auto-start-migrations";
|
||||
import { initAuditLogDbConnection, initDbConnection } from "./db";
|
||||
@@ -54,7 +55,8 @@ const run = async () => {
|
||||
|
||||
await queue.initialize();
|
||||
|
||||
const keyStore = keyStoreFactory(envConfig);
|
||||
const keyValueStoreDAL = keyValueStoreDALFactory(db);
|
||||
const keyStore = keyStoreFactory(envConfig, keyValueStoreDAL);
|
||||
const redis = buildRedisFromConfig(envConfig);
|
||||
|
||||
const hsmModule = initializeHsmModule(envConfig);
|
||||
|
||||
@@ -123,6 +123,7 @@ import { sshHostGroupMembershipDALFactory } from "@app/ee/services/ssh-host-grou
|
||||
import { sshHostGroupServiceFactory } from "@app/ee/services/ssh-host-group/ssh-host-group-service";
|
||||
import { trustedIpDALFactory } from "@app/ee/services/trusted-ip/trusted-ip-dal";
|
||||
import { trustedIpServiceFactory } from "@app/ee/services/trusted-ip/trusted-ip-service";
|
||||
import { keyValueStoreDALFactory } from "@app/keystore/key-value-store-dal";
|
||||
import { TKeyStoreFactory } from "@app/keystore/keystore";
|
||||
import { getConfig, TEnvConfig } from "@app/lib/config/env";
|
||||
import { crypto } from "@app/lib/crypto/cryptography";
|
||||
@@ -507,6 +508,7 @@ export const registerRoutes = async (
|
||||
const microsoftTeamsIntegrationDAL = microsoftTeamsIntegrationDALFactory(db);
|
||||
const projectMicrosoftTeamsConfigDAL = projectMicrosoftTeamsConfigDALFactory(db);
|
||||
const secretScanningV2DAL = secretScanningV2DALFactory(db);
|
||||
const keyValueStoreDAL = keyValueStoreDALFactory(db);
|
||||
|
||||
const eventBusService = eventBusFactory(server.redis);
|
||||
const sseService = sseServiceFactory(eventBusService, server.redis);
|
||||
@@ -643,6 +645,7 @@ export const registerRoutes = async (
|
||||
const folderTreeCheckpointDAL = folderTreeCheckpointDALFactory(db);
|
||||
const folderCommitDAL = folderCommitDALFactory(db);
|
||||
const folderTreeCheckpointResourcesDAL = folderTreeCheckpointResourcesDALFactory(db);
|
||||
|
||||
const folderCommitQueueService = folderCommitQueueServiceFactory({
|
||||
queueService,
|
||||
folderTreeCheckpointDAL,
|
||||
@@ -1682,6 +1685,7 @@ export const registerRoutes = async (
|
||||
userDAL,
|
||||
identityDAL
|
||||
});
|
||||
|
||||
const dailyResourceCleanUp = dailyResourceCleanUpQueueServiceFactory({
|
||||
auditLogDAL,
|
||||
queueService,
|
||||
@@ -1694,7 +1698,8 @@ export const registerRoutes = async (
|
||||
identityUniversalAuthClientSecretDAL: identityUaClientSecretDAL,
|
||||
serviceTokenService,
|
||||
orgService,
|
||||
userNotificationDAL
|
||||
userNotificationDAL,
|
||||
keyValueStoreDAL
|
||||
});
|
||||
|
||||
const dailyReminderQueueService = dailyReminderQueueServiceFactory({
|
||||
|
||||
@@ -1386,7 +1386,7 @@ export const folderCommitServiceFactory = ({
|
||||
);
|
||||
|
||||
// Invalidate cache to reflect the changes
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await secretV2BridgeDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
|
||||
return {
|
||||
secretChangesCount: secretChanges.length,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { TAuditLogDALFactory } from "@app/ee/services/audit-log/audit-log-dal";
|
||||
import { TSnapshotDALFactory } from "@app/ee/services/secret-snapshot/snapshot-dal";
|
||||
import { TKeyValueStoreDALFactory } from "@app/keystore/key-value-store-dal";
|
||||
import { getConfig } from "@app/lib/config/env";
|
||||
import { logger } from "@app/lib/logger";
|
||||
import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
|
||||
@@ -27,6 +28,7 @@ type TDailyResourceCleanUpQueueServiceFactoryDep = {
|
||||
queueService: TQueueServiceFactory;
|
||||
orgService: TOrgServiceFactory;
|
||||
userNotificationDAL: Pick<TUserNotificationDALFactory, "pruneNotifications">;
|
||||
keyValueStoreDAL: Pick<TKeyValueStoreDALFactory, "pruneExpiredKeys">;
|
||||
};
|
||||
|
||||
export type TDailyResourceCleanUpQueueServiceFactory = ReturnType<typeof dailyResourceCleanUpQueueServiceFactory>;
|
||||
@@ -43,7 +45,8 @@ export const dailyResourceCleanUpQueueServiceFactory = ({
|
||||
identityUniversalAuthClientSecretDAL,
|
||||
serviceTokenService,
|
||||
orgService,
|
||||
userNotificationDAL
|
||||
userNotificationDAL,
|
||||
keyValueStoreDAL
|
||||
}: TDailyResourceCleanUpQueueServiceFactoryDep) => {
|
||||
const appCfg = getConfig();
|
||||
|
||||
@@ -52,6 +55,10 @@ export const dailyResourceCleanUpQueueServiceFactory = ({
|
||||
}
|
||||
|
||||
const init = async () => {
|
||||
if (appCfg.isSecondaryInstance) {
|
||||
return;
|
||||
}
|
||||
|
||||
await queueService.stopRepeatableJob(
|
||||
QueueName.AuditLogPrune,
|
||||
QueueJobs.AuditLogPrune,
|
||||
@@ -82,6 +89,7 @@ export const dailyResourceCleanUpQueueServiceFactory = ({
|
||||
await orgService.notifyInvitedUsers();
|
||||
await auditLogDAL.pruneAuditLog();
|
||||
await userNotificationDAL.pruneNotifications();
|
||||
await keyValueStoreDAL.pruneExpiredKeys();
|
||||
logger.info(`${QueueName.DailyResourceCleanUp}: queue task completed`);
|
||||
} catch (error) {
|
||||
logger.error(error, `${QueueName.DailyResourceCleanUp}: resource cleanup failed`);
|
||||
|
||||
@@ -50,15 +50,14 @@ interface TSecretV2DalArg {
|
||||
}
|
||||
|
||||
export const SECRET_DAL_TTL = () => applyJitter(10 * 60, 2 * 60);
|
||||
export const SECRET_DAL_VERSION_TTL = 15 * 60;
|
||||
export const SECRET_DAL_VERSION_TTL = "15m";
|
||||
export const MAX_SECRET_CACHE_BYTES = 25 * 1024 * 1024;
|
||||
export const secretV2BridgeDALFactory = ({ db, keyStore }: TSecretV2DalArg) => {
|
||||
const secretOrm = ormify(db, TableName.SecretV2);
|
||||
|
||||
const invalidateSecretCacheByProjectId = async (projectId: string) => {
|
||||
const invalidateSecretCacheByProjectId = async (projectId: string, tx?: Knex) => {
|
||||
const secretDalVersionKey = SecretServiceCacheKeys.getSecretDalVersion(projectId);
|
||||
await keyStore.incrementBy(secretDalVersionKey, 1);
|
||||
await keyStore.setExpiry(secretDalVersionKey, SECRET_DAL_VERSION_TTL);
|
||||
await keyStore.pgIncrementBy(secretDalVersionKey, { incr: 1, tx, expiry: SECRET_DAL_VERSION_TTL });
|
||||
};
|
||||
|
||||
const findOne = async (filter: Partial<TSecretsV2>, tx?: Knex) => {
|
||||
|
||||
@@ -118,7 +118,7 @@ type TSecretV2BridgeServiceFactoryDep = {
|
||||
>;
|
||||
snapshotService: Pick<TSecretSnapshotServiceFactory, "performSnapshot">;
|
||||
resourceMetadataDAL: Pick<TResourceMetadataDALFactory, "insertMany" | "delete">;
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "setExpiry" | "setItemWithExpiry" | "deleteItem">;
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "setExpiry" | "setItemWithExpiry" | "deleteItem" | "pgGetIntItem">;
|
||||
reminderService: Pick<TReminderServiceFactory, "createReminder" | "getReminder">;
|
||||
};
|
||||
|
||||
@@ -360,6 +360,7 @@ export const secretV2BridgeServiceFactory = ({
|
||||
tx
|
||||
});
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return createdSecret;
|
||||
});
|
||||
|
||||
@@ -377,7 +378,6 @@ export const secretV2BridgeServiceFactory = ({
|
||||
});
|
||||
}
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
if (inputSecret.type === SecretType.Shared) {
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
await secretQueueService.syncSecrets({
|
||||
@@ -566,8 +566,8 @@ export const secretV2BridgeServiceFactory = ({
|
||||
await $validateSecretReferences(projectId, permission, allSecretReferences);
|
||||
}
|
||||
|
||||
const updatedSecret = await secretDAL.transaction(async (tx) =>
|
||||
fnSecretBulkUpdate({
|
||||
const updatedSecret = await secretDAL.transaction(async (tx) => {
|
||||
const modifiedSecretsInDB = await fnSecretBulkUpdate({
|
||||
folderId,
|
||||
orgId: actorOrgId,
|
||||
resourceMetadataDAL,
|
||||
@@ -598,8 +598,11 @@ export const secretV2BridgeServiceFactory = ({
|
||||
actorId
|
||||
},
|
||||
tx
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return modifiedSecretsInDB;
|
||||
});
|
||||
if (inputSecret.secretReminderRepeatDays) {
|
||||
await reminderService.createReminder({
|
||||
actor,
|
||||
@@ -615,7 +618,6 @@ export const secretV2BridgeServiceFactory = ({
|
||||
});
|
||||
}
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
if (inputSecret.type === SecretType.Shared) {
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
await secretQueueService.syncSecrets({
|
||||
@@ -715,8 +717,8 @@ export const secretV2BridgeServiceFactory = ({
|
||||
);
|
||||
|
||||
try {
|
||||
const deletedSecret = await secretDAL.transaction(async (tx) =>
|
||||
fnSecretBulkDelete({
|
||||
const deletedSecret = await secretDAL.transaction(async (tx) => {
|
||||
const modifiedSecretsInDB = await fnSecretBulkDelete({
|
||||
projectId,
|
||||
folderId,
|
||||
actorId,
|
||||
@@ -732,10 +734,11 @@ export const secretV2BridgeServiceFactory = ({
|
||||
}
|
||||
],
|
||||
tx
|
||||
})
|
||||
);
|
||||
});
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return modifiedSecretsInDB;
|
||||
});
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
if (inputSecret.type === SecretType.Shared) {
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
await secretQueueService.syncSecrets({
|
||||
@@ -1027,7 +1030,7 @@ export const secretV2BridgeServiceFactory = ({
|
||||
});
|
||||
throwIfMissingSecretReadValueOrDescribePermission(permission, ProjectPermissionSecretActions.DescribeSecret);
|
||||
|
||||
const cachedSecretDalVersion = await keyStore.getItem(SecretServiceCacheKeys.getSecretDalVersion(projectId));
|
||||
const cachedSecretDalVersion = await keyStore.pgGetIntItem(SecretServiceCacheKeys.getSecretDalVersion(projectId));
|
||||
const secretDalVersion = Number(cachedSecretDalVersion || 0);
|
||||
const cacheKey = SecretServiceCacheKeys.getSecretsOfServiceLayer(projectId, secretDalVersion, {
|
||||
...dto,
|
||||
@@ -1692,7 +1695,7 @@ export const secretV2BridgeServiceFactory = ({
|
||||
await kmsService.createCipherPairWithDataKey({ type: KmsDataKey.SecretManager, projectId });
|
||||
|
||||
const executeBulkInsert = async (tx: Knex) => {
|
||||
return fnSecretBulkInsert({
|
||||
const modifiedSecretsInDB = await fnSecretBulkInsert({
|
||||
inputSecrets: inputSecrets.map((el) => {
|
||||
const references = secretReferencesGroupByInputSecretKey[el.secretKey]?.nestedReferences;
|
||||
|
||||
@@ -1728,13 +1731,14 @@ export const secretV2BridgeServiceFactory = ({
|
||||
},
|
||||
tx
|
||||
});
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return modifiedSecretsInDB;
|
||||
};
|
||||
|
||||
const newSecrets = providedTx
|
||||
? await executeBulkInsert(providedTx)
|
||||
: await secretDAL.transaction(executeBulkInsert);
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
await secretQueueService.syncSecrets({
|
||||
actor,
|
||||
@@ -2099,6 +2103,7 @@ export const secretV2BridgeServiceFactory = ({
|
||||
}
|
||||
}
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return updatedSecrets;
|
||||
};
|
||||
|
||||
@@ -2106,7 +2111,6 @@ export const secretV2BridgeServiceFactory = ({
|
||||
? await executeBulkUpdate(providedTx)
|
||||
: await secretDAL.transaction(executeBulkUpdate);
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await Promise.allSettled(folders.map((el) => (el?.id ? snapshotService.performSnapshot(el.id) : undefined)));
|
||||
await Promise.allSettled(
|
||||
folders.map((el) =>
|
||||
@@ -2233,7 +2237,7 @@ export const secretV2BridgeServiceFactory = ({
|
||||
});
|
||||
|
||||
const executeBulkDelete = async (tx: Knex) => {
|
||||
return fnSecretBulkDelete({
|
||||
const modifiedSecretsInDB = await fnSecretBulkDelete({
|
||||
secretDAL,
|
||||
secretQueueService,
|
||||
folderCommitService,
|
||||
@@ -2249,6 +2253,8 @@ export const secretV2BridgeServiceFactory = ({
|
||||
commitChanges,
|
||||
tx
|
||||
});
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId, tx);
|
||||
return modifiedSecretsInDB;
|
||||
};
|
||||
|
||||
try {
|
||||
@@ -2256,7 +2262,6 @@ export const secretV2BridgeServiceFactory = ({
|
||||
? await executeBulkDelete(providedTx)
|
||||
: await secretDAL.transaction(executeBulkDelete);
|
||||
|
||||
await secretDAL.invalidateSecretCacheByProjectId(projectId);
|
||||
await snapshotService.performSnapshot(folderId);
|
||||
await secretQueueService.syncSecrets({
|
||||
actor,
|
||||
|
||||
Reference in New Issue
Block a user