misc: made partition opt-in

This commit is contained in:
Sheen Capadngan
2024-10-08 17:53:53 +08:00
parent 98ed063ce6
commit 22b417b50b
13 changed files with 169 additions and 220 deletions

View File

@@ -90,7 +90,12 @@ const main = async () => {
.whereRaw("table_schema = current_schema()")
.select<{ tableName: string }[]>("table_name as tableName")
.orderBy("table_name")
).filter((el) => !el.tableName.includes("_migrations") && !el.tableName.includes("partitioned_audit_logs_"));
).filter(
(el) =>
!el.tableName.includes("_migrations") &&
!el.tableName.includes("audit_logs_") &&
el.tableName !== "intermediate_audit_logs"
);
for (let i = 0; i < tables.length; i += 1) {
const { tableName } = tables[i];

View File

@@ -170,9 +170,6 @@ import {
TOrgRoles,
TOrgRolesInsert,
TOrgRolesUpdate,
TPartitionedAuditLogs,
TPartitionedAuditLogsInsert,
TPartitionedAuditLogsUpdate,
TPkiAlerts,
TPkiAlertsInsert,
TPkiAlertsUpdate,
@@ -718,11 +715,6 @@ declare module "knex/types/tables" {
TAuditLogStreamsInsert,
TAuditLogStreamsUpdate
>;
[TableName.PartitionedAuditLog]: KnexOriginal.CompositeTableType<
TPartitionedAuditLogs,
TPartitionedAuditLogsInsert,
TPartitionedAuditLogsUpdate
>;
[TableName.GitAppInstallSession]: KnexOriginal.CompositeTableType<
TGitAppInstallSessions,
TGitAppInstallSessionsInsert,

View File

@@ -0,0 +1,145 @@
import kx, { Knex } from "knex";
import { TableName } from "../schemas";
const INTERMEDIATE_AUDIT_LOG_TABLE = "intermediate_audit_logs";
const formatPartitionDate = (date: Date) => {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, "0");
const day = String(date.getDate()).padStart(2, "0");
return `${year}-${month}-${day}`;
};
const createAuditLogPartition = async (knex: Knex, startDate: Date, endDate: Date) => {
const startDateStr = formatPartitionDate(startDate);
const endDateStr = formatPartitionDate(endDate);
const partitionName = `${TableName.AuditLog}_${startDateStr.replace(/-/g, "")}_${endDateStr.replace(/-/g, "")}`;
await knex.schema.raw(
`CREATE TABLE ${partitionName} PARTITION OF ${TableName.AuditLog} FOR VALUES FROM ('${startDateStr}') TO ('${endDateStr}')`
);
};
const up = async (knex: Knex): Promise<void> => {
console.info("Dropping primary key of audit log table...");
await knex.schema.alterTable(TableName.AuditLog, (t) => {
// remove existing keys
t.dropPrimary();
});
// renaming audit log to intermediate table
console.log("Renaming audit log table to the intermediate name");
await knex.schema.renameTable(TableName.AuditLog, INTERMEDIATE_AUDIT_LOG_TABLE);
if (!(await knex.schema.hasTable(TableName.AuditLog))) {
const createTableSql = knex.schema
.createTable(TableName.AuditLog, (t) => {
t.uuid("id").defaultTo(knex.fn.uuid());
t.string("actor").notNullable();
t.jsonb("actorMetadata").notNullable();
t.string("ipAddress");
t.string("eventType").notNullable();
t.jsonb("eventMetadata");
t.string("userAgent");
t.string("userAgentType");
t.datetime("expiresAt");
t.timestamps(true, true, true);
t.uuid("orgId");
t.string("projectId");
t.string("projectName");
t.primary(["id", "createdAt"]);
})
.toString();
console.info("Creating partition table...");
await knex.schema.raw(`
${createTableSql} PARTITION BY RANGE ("createdAt");
`);
console.log("Adding indices...");
await knex.schema.alterTable(TableName.AuditLog, (t) => {
t.index(["projectId", "createdAt"]);
t.index(["orgId", "createdAt"]);
t.index("expiresAt");
t.index("orgId");
t.index("projectId");
});
console.log("Adding GIN indices...");
await knex.raw(
`CREATE INDEX IF NOT EXISTS "audit_logs_actorMetadata_idx" ON ${TableName.AuditLog} USING gin("actorMetadata" jsonb_path_ops)`
);
console.log("GIN index for actorMetadata done");
await knex.raw(
`CREATE INDEX IF NOT EXISTS "audit_logs_eventMetadata_idx" ON ${TableName.AuditLog} USING gin("eventMetadata" jsonb_path_ops)`
);
console.log("GIN index for eventMetadata done");
// create default partition
console.log("Creating default partition...");
await knex.schema.raw(`CREATE TABLE ${TableName.AuditLog}_default PARTITION OF ${TableName.AuditLog} DEFAULT`);
const nextDate = new Date();
nextDate.setDate(nextDate.getDate() + 1);
const nextDateStr = formatPartitionDate(nextDate);
console.log("Attaching existing audit log table as a partition...");
await knex.schema.raw(`
ALTER TABLE ${INTERMEDIATE_AUDIT_LOG_TABLE} ADD CONSTRAINT audit_log_old
CHECK ( "createdAt" < DATE '${nextDateStr}' );
ALTER TABLE ${TableName.AuditLog} ATTACH PARTITION ${INTERMEDIATE_AUDIT_LOG_TABLE}
FOR VALUES FROM (MINVALUE) TO ('${nextDateStr}' );
`);
// create partition from now until end of month
console.log("Creating audit log partitions ahead of time... next date:", nextDateStr);
await createAuditLogPartition(knex, nextDate, new Date(nextDate.getFullYear(), nextDate.getMonth() + 1));
// create partitions 4 years ahead
const partitionMonths = 4 * 12;
const partitionPromises: Promise<void>[] = [];
for (let x = 1; x <= partitionMonths; x += 1) {
partitionPromises.push(
createAuditLogPartition(
knex,
new Date(nextDate.getFullYear(), nextDate.getMonth() + x, 1),
new Date(nextDate.getFullYear(), nextDate.getMonth() + (x + 1), 1)
)
);
}
await Promise.all(partitionPromises);
console.log("Partition migration complete");
process.exit(0);
}
};
export const executeMigration = async (url: string) => {
console.log("Executing migration to:", url);
const knex = kx({
client: "pg",
connection: url
});
await knex.transaction(async (tx) => {
await up(tx);
});
};
const args = process.argv.slice(2);
const dbUrl = args[0];
if (!dbUrl) {
console.error("Please provide a DB connection URL as the first argument.");
process.exit(1);
}
void executeMigration(dbUrl).then(() => {
console.log("Migration: partition-audit-logs DONE");
});

View File

@@ -1,164 +0,0 @@
import { Knex } from "knex";
import { TableName } from "../schemas";
const formatPartitionDate = (date: Date) => {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, "0");
const day = String(date.getDate()).padStart(2, "0");
return `${year}-${month}-${day}`;
};
const createAuditLogPartition = async (knex: Knex, startDate: Date, endDate: Date) => {
const startDateStr = formatPartitionDate(startDate);
const endDateStr = formatPartitionDate(endDate);
const partitionName = `${TableName.PartitionedAuditLog}_${startDateStr.replace(/-/g, "")}_${endDateStr.replace(
/-/g,
""
)}`;
await knex.schema.raw(
`CREATE TABLE ${partitionName} PARTITION OF ${TableName.PartitionedAuditLog} FOR VALUES FROM ('${startDateStr}') TO ('${endDateStr}')`
);
};
const isUsingDedicatedAuditLogDb = Boolean(process.env.AUDIT_LOGS_DB_CONNECTION_URI);
export async function up(knex: Knex): Promise<void> {
if (!isUsingDedicatedAuditLogDb && (await knex.schema.hasTable(TableName.AuditLog))) {
console.info("Dropping primary key of Audit Log table...");
await knex.schema.alterTable(TableName.AuditLog, (t) => {
// remove existing keys
t.dropPrimary();
});
}
// create a new partitioned table for audit logs
if (!(await knex.schema.hasTable(TableName.PartitionedAuditLog))) {
const createTableSql = knex.schema
.createTable(TableName.PartitionedAuditLog, (t) => {
t.uuid("id").defaultTo(knex.fn.uuid());
t.string("actor").notNullable();
t.jsonb("actorMetadata").notNullable();
t.string("ipAddress");
t.string("eventType").notNullable();
t.jsonb("eventMetadata");
t.string("userAgent");
t.string("userAgentType");
t.datetime("expiresAt");
t.timestamps(true, true, true);
t.uuid("orgId");
t.string("projectId");
t.string("projectName");
t.primary(["id", "createdAt"]);
})
.toString();
console.info("Creating partition table...");
await knex.schema.raw(`
${createTableSql} PARTITION BY RANGE ("createdAt");
`);
console.log("Adding indices...");
await knex.schema.alterTable(TableName.PartitionedAuditLog, (t) => {
t.index(["projectId", "createdAt"]);
t.index(["orgId", "createdAt"]);
t.index("expiresAt");
t.index("orgId");
t.index("projectId");
});
console.log("Adding GIN indices...");
await knex.raw(
`CREATE INDEX IF NOT EXISTS "audit_logs_actorMetadata_idx" ON ${TableName.PartitionedAuditLog} USING gin("actorMetadata" jsonb_path_ops)`
);
console.log("GIN index for actorMetadata done");
await knex.raw(
`CREATE INDEX IF NOT EXISTS "audit_logs_eventMetadata_idx" ON ${TableName.PartitionedAuditLog} USING gin("eventMetadata" jsonb_path_ops)`
);
console.log("GIN index for eventMetadata done");
// create default partition
console.log("Creating default partition...");
await knex.schema.raw(
`CREATE TABLE ${TableName.PartitionedAuditLog}_default PARTITION OF ${TableName.PartitionedAuditLog} DEFAULT`
);
const nextDate = new Date();
nextDate.setDate(nextDate.getDate() + 1);
const nextDateStr = formatPartitionDate(nextDate);
// attach existing audit log table as a partition ONLY if using the same DB
if (!isUsingDedicatedAuditLogDb) {
console.log("Attaching existing audit log table as a partition...");
await knex.schema.raw(`
ALTER TABLE ${TableName.AuditLog} ADD CONSTRAINT audit_log_old
CHECK ( "createdAt" < DATE '${nextDateStr}' );
ALTER TABLE ${TableName.PartitionedAuditLog} ATTACH PARTITION ${TableName.AuditLog}
FOR VALUES FROM (MINVALUE) TO ('${nextDateStr}' );
`);
}
// create partition from now until end of month
console.log("Creating audit log partitions ahead of time... next date:", nextDateStr);
await createAuditLogPartition(knex, nextDate, new Date(nextDate.getFullYear(), nextDate.getMonth() + 1));
// create partitions 4 years ahead
const partitionMonths = 4 * 12;
const partitionPromises: Promise<void>[] = [];
for (let x = 1; x <= partitionMonths; x += 1) {
partitionPromises.push(
createAuditLogPartition(
knex,
new Date(nextDate.getFullYear(), nextDate.getMonth() + x, 1),
new Date(nextDate.getFullYear(), nextDate.getMonth() + (x + 1), 1)
)
);
}
await Promise.all(partitionPromises);
console.log("Partition migration complete");
}
}
export async function down(knex: Knex): Promise<void> {
const partitionSearchResult = await knex.raw(`
SELECT inhrelid::regclass::text
FROM pg_inherits
WHERE inhparent::regclass::text = '${TableName.PartitionedAuditLog}'
AND inhrelid::regclass::text = '${TableName.AuditLog}'
`);
const isAuditLogAPartition = partitionSearchResult.rows.length > 0;
if (isAuditLogAPartition) {
// detach audit log from partition
console.log("Detaching original audit log table from new partition table...");
await knex.schema.raw(`
ALTER TABLE ${TableName.PartitionedAuditLog} DETACH PARTITION ${TableName.AuditLog};
ALTER TABLE ${TableName.AuditLog} DROP CONSTRAINT audit_log_old;
`);
// revert audit log modifications
console.log("Reverting changes made to the audit log table...");
if (await knex.schema.hasTable(TableName.AuditLog)) {
await knex.schema.alterTable(TableName.AuditLog, (t) => {
// we drop this first because adding to the partition results in a new primary key
t.dropPrimary();
// add back the original keys of the audit logs table
t.primary(["id"], {
constraintName: "audit_logs_pkey"
});
});
}
}
await knex.schema.dropTableIfExists(TableName.PartitionedAuditLog);
console.log("Partition rollback complete");
}

View File

@@ -20,7 +20,8 @@ export const AuditLogsSchema = z.object({
createdAt: z.date(),
updatedAt: z.date(),
orgId: z.string().uuid().nullable().optional(),
projectId: z.string().nullable().optional()
projectId: z.string().nullable().optional(),
projectName: z.string().nullable().optional()
});
export type TAuditLogs = z.infer<typeof AuditLogsSchema>;

View File

@@ -55,7 +55,6 @@ export * from "./org-bots";
export * from "./org-memberships";
export * from "./org-roles";
export * from "./organizations";
export * from "./partitioned-audit-logs";
export * from "./pki-alerts";
export * from "./pki-collection-items";
export * from "./pki-collections";

View File

@@ -90,7 +90,6 @@ export enum TableName {
OidcConfig = "oidc_configs",
LdapGroupMap = "ldap_group_maps",
AuditLog = "audit_logs",
PartitionedAuditLog = "partitioned_audit_logs",
AuditLogStream = "audit_log_streams",
GitAppInstallSession = "git_app_install_sessions",
GitAppOrg = "git_app_org",

View File

@@ -1,29 +0,0 @@
// Code generated by automation script, DO NOT EDIT.
// Automated by pulling database and generating zod schema
// To update. Just run npm run generate:schema
// Written by akhilmhdh.
import { z } from "zod";
import { TImmutableDBKeys } from "./models";
export const PartitionedAuditLogsSchema = z.object({
id: z.string().uuid(),
actor: z.string(),
actorMetadata: z.unknown(),
ipAddress: z.string().nullable().optional(),
eventType: z.string(),
eventMetadata: z.unknown().nullable().optional(),
userAgent: z.string().nullable().optional(),
userAgentType: z.string().nullable().optional(),
expiresAt: z.date().nullable().optional(),
createdAt: z.date(),
updatedAt: z.date(),
orgId: z.string().uuid().nullable().optional(),
projectId: z.string().nullable().optional(),
projectName: z.string().nullable().optional()
});
export type TPartitionedAuditLogs = z.infer<typeof PartitionedAuditLogsSchema>;
export type TPartitionedAuditLogsInsert = Omit<z.input<typeof PartitionedAuditLogsSchema>, TImmutableDBKeys>;
export type TPartitionedAuditLogsUpdate = Partial<Omit<z.input<typeof PartitionedAuditLogsSchema>, TImmutableDBKeys>>;

View File

@@ -1,6 +1,6 @@
import { z } from "zod";
import { PartitionedAuditLogsSchema, SecretSnapshotsSchema } from "@app/db/schemas";
import { AuditLogsSchema, SecretSnapshotsSchema } from "@app/db/schemas";
import { EventType, UserAgentType } from "@app/ee/services/audit-log/audit-log-types";
import { AUDIT_LOGS, PROJECTS } from "@app/lib/api-docs";
import { getLastMidnightDateISO, removeTrailingSlash } from "@app/lib/fn";
@@ -120,7 +120,7 @@ export const registerProjectRouter = async (server: FastifyZodProvider) => {
}),
response: {
200: z.object({
auditLogs: PartitionedAuditLogsSchema.omit({
auditLogs: AuditLogsSchema.omit({
eventMetadata: true,
eventType: true,
actor: true,

View File

@@ -26,7 +26,7 @@ type TFindQuery = {
};
export const auditLogDALFactory = (db: TDbClient) => {
const auditLogOrm = ormify(db, TableName.PartitionedAuditLog);
const auditLogOrm = ormify(db, TableName.AuditLog);
const find = async (
{
@@ -55,13 +55,13 @@ export const auditLogDALFactory = (db: TDbClient) => {
try {
// Find statements
const sqlQuery = (tx || db.replicaNode())(TableName.PartitionedAuditLog)
const sqlQuery = (tx || db.replicaNode())(TableName.AuditLog)
// eslint-disable-next-line func-names
.where(function () {
if (orgId) {
void this.where(`${TableName.PartitionedAuditLog}.orgId`, orgId);
void this.where(`${TableName.AuditLog}.orgId`, orgId);
} else if (projectId) {
void this.where(`${TableName.PartitionedAuditLog}.projectId`, projectId);
void this.where(`${TableName.AuditLog}.projectId`, projectId);
}
});
@@ -71,10 +71,10 @@ export const auditLogDALFactory = (db: TDbClient) => {
// Select statements
void sqlQuery
.select(selectAllTableCols(TableName.PartitionedAuditLog))
.select(selectAllTableCols(TableName.AuditLog))
.limit(limit)
.offset(offset)
.orderBy(`${TableName.PartitionedAuditLog}.createdAt`, "desc");
.orderBy(`${TableName.AuditLog}.createdAt`, "desc");
// Special case: Filter by actor ID
if (actorId) {
@@ -100,10 +100,10 @@ export const auditLogDALFactory = (db: TDbClient) => {
// Filter by date range
if (startDate) {
void sqlQuery.where(`${TableName.PartitionedAuditLog}.createdAt`, ">=", startDate);
void sqlQuery.where(`${TableName.AuditLog}.createdAt`, ">=", startDate);
}
if (endDate) {
void sqlQuery.where(`${TableName.PartitionedAuditLog}.createdAt`, "<=", endDate);
void sqlQuery.where(`${TableName.AuditLog}.createdAt`, "<=", endDate);
}
// we timeout long running queries to prevent DB resource issues (2 minutes)
@@ -135,13 +135,13 @@ export const auditLogDALFactory = (db: TDbClient) => {
logger.info(`${QueueName.DailyResourceCleanUp}: audit log started`);
do {
try {
const findExpiredLogSubQuery = (tx || db)(TableName.PartitionedAuditLog)
const findExpiredLogSubQuery = (tx || db)(TableName.AuditLog)
.where("expiresAt", "<", today)
.select("id")
.limit(AUDIT_LOG_PRUNE_BATCH_SIZE);
// eslint-disable-next-line no-await-in-loop
deletedAuditLogIds = await (tx || db)(TableName.PartitionedAuditLog)
deletedAuditLogIds = await (tx || db)(TableName.AuditLog)
.whereIn("id", findExpiredLogSubQuery)
.del()
.returning("id");

View File

@@ -1,12 +1,12 @@
import { z } from "zod";
import {
AuditLogsSchema,
GroupsSchema,
IncidentContactsSchema,
OrganizationsSchema,
OrgMembershipsSchema,
OrgRolesSchema,
PartitionedAuditLogsSchema,
UsersSchema
} from "@app/db/schemas";
import { EventType, UserAgentType } from "@app/ee/services/audit-log/audit-log-types";
@@ -115,7 +115,7 @@ export const registerOrgRouter = async (server: FastifyZodProvider) => {
response: {
200: z.object({
auditLogs: PartitionedAuditLogsSchema.omit({
auditLogs: AuditLogsSchema.omit({
eventMetadata: true,
eventType: true,
actor: true,

View File

@@ -887,4 +887,5 @@ export type AuditLog = {
createdAt: string;
updatedAt: string;
projectName?: string;
projectId?: string;
};

View File

@@ -573,7 +573,7 @@ export const LogsTableRow = ({ auditLog, isOrgAuditLogs, showActorColumn }: Prop
<Tr className={`log-${auditLog.id} h-10 border-x-0 border-b border-t-0`}>
<Td>{formatDate(auditLog.createdAt)}</Td>
<Td>{`${eventToNameMap[auditLog.event.type]}`}</Td>
{isOrgAuditLogs && <Td>{auditLog?.projectName ?? "N/A"}</Td>}
{isOrgAuditLogs && <Td>{auditLog?.projectName ?? auditLog?.projectId ?? "N/A"}</Td>}
{showActorColumn && renderActor(auditLog.actor)}
{renderSource()}
{renderMetadata(auditLog.event)}