From 52f773c647df1db8e649361bb63078f854a5064e Mon Sep 17 00:00:00 2001 From: Sid <58144379+sidwebworks@users.noreply.github.com> Date: Fri, 1 Aug 2025 01:20:45 +0530 Subject: [PATCH] feat: events system implementation (#4246) * chore: save poc * chore: save wip * fix: undo cors * fix: impl changes * fix: PR changes * fix: mocks * fix: connection tracking and auth changes * fix: PR changes * fix: revert license * feat: frontend change * fix: revert docker compose.dev * fix: duplicate publisher connection * fix: pr changes * chore: move event impl to `ee` * fix: lint errors * fix: check length of events * fix: static permissions matching * fix: secretPath * fix: remove source prefix in bus event name * fix: license check --- backend/src/@types/fastify.d.ts | 4 + .../ee/services/audit-log/audit-log-queue.ts | 305 ++++++------------ .../ee/services/event/event-bus-service.ts | 83 +++++ .../ee/services/event/event-sse-service.ts | 164 ++++++++++ .../src/ee/services/event/event-sse-stream.ts | 178 ++++++++++ backend/src/ee/services/event/types.ts | 125 +++++++ .../src/ee/services/license/license-fns.ts | 3 +- .../src/ee/services/license/license-types.ts | 1 + .../ee/services/permission/default-roles.ts | 6 +- .../services/permission/project-permission.ts | 16 +- backend/src/keystore/keystore.ts | 6 +- .../server/plugins/auth/inject-identity.ts | 18 +- backend/src/server/routes/index.ts | 15 +- backend/src/server/routes/v1/event-router.ts | 118 +++++++ backend/src/server/routes/v1/index.ts | 3 + docker-compose.dev.yml | 2 +- .../context/ProjectPermissionContext/types.ts | 3 +- .../components/PermissionConditionHelpers.tsx | 8 + .../ProjectRoleModifySection.utils.tsx | 8 +- .../components/SecretPermissionConditions.tsx | 3 +- nginx/default.dev.conf | 12 +- sink/index.html | 89 +++++ 22 files changed, 943 insertions(+), 227 deletions(-) create mode 100644 backend/src/ee/services/event/event-bus-service.ts create mode 100644 backend/src/ee/services/event/event-sse-service.ts create mode 100644 backend/src/ee/services/event/event-sse-stream.ts create mode 100644 backend/src/ee/services/event/types.ts create mode 100644 backend/src/server/routes/v1/event-router.ts create mode 100644 sink/index.html diff --git a/backend/src/@types/fastify.d.ts b/backend/src/@types/fastify.d.ts index bcc50ee6cf..15e9679481 100644 --- a/backend/src/@types/fastify.d.ts +++ b/backend/src/@types/fastify.d.ts @@ -12,6 +12,8 @@ import { TCertificateAuthorityCrlServiceFactory } from "@app/ee/services/certifi import { TCertificateEstServiceFactory } from "@app/ee/services/certificate-est/certificate-est-service"; import { TDynamicSecretServiceFactory } from "@app/ee/services/dynamic-secret/dynamic-secret-types"; import { TDynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-types"; +import { TEventBusService } from "@app/ee/services/event/event-bus-service"; +import { TServerSentEventsService } from "@app/ee/services/event/event-sse-service"; import { TExternalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service"; import { TGatewayServiceFactory } from "@app/ee/services/gateway/gateway-service"; import { TGithubOrgSyncServiceFactory } from "@app/ee/services/github-org-sync/github-org-sync-service"; @@ -296,6 +298,8 @@ declare module "fastify" { internalCertificateAuthority: TInternalCertificateAuthorityServiceFactory; pkiTemplate: TPkiTemplatesServiceFactory; reminder: TReminderServiceFactory; + bus: TEventBusService; + sse: TServerSentEventsService; }; // this is exclusive use for middlewares in which we need to inject data // everywhere else access using service layer diff --git a/backend/src/ee/services/audit-log/audit-log-queue.ts b/backend/src/ee/services/audit-log/audit-log-queue.ts index be58c4c437..9e0f5f9989 100644 --- a/backend/src/ee/services/audit-log/audit-log-queue.ts +++ b/backend/src/ee/services/audit-log/audit-log-queue.ts @@ -1,7 +1,8 @@ import { AxiosError, RawAxiosRequestHeaders } from "axios"; -import { SecretKeyEncoding } from "@app/db/schemas"; -import { getConfig } from "@app/lib/config/env"; +import { ProjectType, SecretKeyEncoding } from "@app/db/schemas"; +import { TEventBusService } from "@app/ee/services/event/event-bus-service"; +import { TopicName, toPublishableEvent } from "@app/ee/services/event/types"; import { request } from "@app/lib/config/request"; import { crypto } from "@app/lib/crypto/cryptography"; import { logger } from "@app/lib/logger"; @@ -21,6 +22,7 @@ type TAuditLogQueueServiceFactoryDep = { queueService: TQueueServiceFactory; projectDAL: Pick; licenseService: Pick; + eventBusService: TEventBusService; }; export type TAuditLogQueueServiceFactory = { @@ -36,133 +38,17 @@ export const auditLogQueueServiceFactory = async ({ queueService, projectDAL, licenseService, - auditLogStreamDAL + auditLogStreamDAL, + eventBusService }: TAuditLogQueueServiceFactoryDep): Promise => { - const appCfg = getConfig(); - const pushToLog = async (data: TCreateAuditLogDTO) => { - if (appCfg.USE_PG_QUEUE && appCfg.SHOULD_INIT_PG_QUEUE) { - await queueService.queuePg(QueueJobs.AuditLog, data, { - retryLimit: 10, - retryBackoff: true - }); - } else { - await queueService.queue(QueueName.AuditLog, QueueJobs.AuditLog, data, { - removeOnFail: { - count: 3 - }, - removeOnComplete: true - }); - } - }; - - if (appCfg.SHOULD_INIT_PG_QUEUE) { - await queueService.startPg( - QueueJobs.AuditLog, - async ([job]) => { - const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; - let { orgId } = job.data; - const MS_IN_DAY = 24 * 60 * 60 * 1000; - let project; - - if (!orgId) { - // it will never be undefined for both org and project id - // TODO(akhilmhdh): use caching here in dal to avoid db calls - project = await projectDAL.findById(projectId as string); - orgId = project.orgId; - } - - const plan = await licenseService.getPlan(orgId); - if (plan.auditLogsRetentionDays === 0) { - // skip inserting if audit log retention is 0 meaning its not supported - return; - } - - // For project actions, set TTL to project-level audit log retention config - // This condition ensures that the plan's audit log retention days cannot be bypassed - const ttlInDays = - project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays - ? project.auditLogsRetentionDays - : plan.auditLogsRetentionDays; - - const ttl = ttlInDays * MS_IN_DAY; - - const auditLog = await auditLogDAL.create({ - actor: actor.type, - actorMetadata: actor.metadata, - userAgent, - projectId, - projectName: project?.name, - ipAddress, - orgId, - eventType: event.type, - expiresAt: new Date(Date.now() + ttl), - eventMetadata: event.metadata, - userAgentType - }); - - const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; - await Promise.allSettled( - logStreams.map( - async ({ - url, - encryptedHeadersTag, - encryptedHeadersIV, - encryptedHeadersKeyEncoding, - encryptedHeadersCiphertext - }) => { - const streamHeaders = - encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag - ? (JSON.parse( - crypto - .encryption() - .symmetric() - .decryptWithRootEncryptionKey({ - keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, - iv: encryptedHeadersIV, - tag: encryptedHeadersTag, - ciphertext: encryptedHeadersCiphertext - }) - ) as LogStreamHeaders[]) - : []; - - const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; - - if (streamHeaders.length) - streamHeaders.forEach(({ key, value }) => { - headers[key] = value; - }); - - try { - const response = await request.post( - url, - { ...providerSpecificPayload(url), ...auditLog }, - { - headers, - // request timeout - timeout: AUDIT_LOG_STREAM_TIMEOUT, - // connection timeout - signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) - } - ); - return response; - } catch (error) { - logger.error( - `Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]` - ); - return error; - } - } - ) - ); + await queueService.queue(QueueName.AuditLog, QueueJobs.AuditLog, data, { + removeOnFail: { + count: 3 }, - { - batchSize: 1, - workerCount: 30, - pollingIntervalSeconds: 0.5 - } - ); - } + removeOnComplete: true + }); + }; queueService.start(QueueName.AuditLog, async (job) => { const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data; @@ -178,88 +64,97 @@ export const auditLogQueueServiceFactory = async ({ } const plan = await licenseService.getPlan(orgId); - if (plan.auditLogsRetentionDays === 0) { - // skip inserting if audit log retention is 0 meaning its not supported - return; + + // skip inserting if audit log retention is 0 meaning its not supported + if (plan.auditLogsRetentionDays !== 0) { + // For project actions, set TTL to project-level audit log retention config + // This condition ensures that the plan's audit log retention days cannot be bypassed + const ttlInDays = + project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays + ? project.auditLogsRetentionDays + : plan.auditLogsRetentionDays; + + const ttl = ttlInDays * MS_IN_DAY; + + const auditLog = await auditLogDAL.create({ + actor: actor.type, + actorMetadata: actor.metadata, + userAgent, + projectId, + projectName: project?.name, + ipAddress, + orgId, + eventType: event.type, + expiresAt: new Date(Date.now() + ttl), + eventMetadata: event.metadata, + userAgentType + }); + + const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; + await Promise.allSettled( + logStreams.map( + async ({ + url, + encryptedHeadersTag, + encryptedHeadersIV, + encryptedHeadersKeyEncoding, + encryptedHeadersCiphertext + }) => { + const streamHeaders = + encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag + ? (JSON.parse( + crypto + .encryption() + .symmetric() + .decryptWithRootEncryptionKey({ + keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, + iv: encryptedHeadersIV, + tag: encryptedHeadersTag, + ciphertext: encryptedHeadersCiphertext + }) + ) as LogStreamHeaders[]) + : []; + + const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; + + if (streamHeaders.length) + streamHeaders.forEach(({ key, value }) => { + headers[key] = value; + }); + + try { + const response = await request.post( + url, + { ...providerSpecificPayload(url), ...auditLog }, + { + headers, + // request timeout + timeout: AUDIT_LOG_STREAM_TIMEOUT, + // connection timeout + signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) + } + ); + return response; + } catch (error) { + logger.error( + `Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]` + ); + return error; + } + } + ) + ); } - // For project actions, set TTL to project-level audit log retention config - // This condition ensures that the plan's audit log retention days cannot be bypassed - const ttlInDays = - project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays - ? project.auditLogsRetentionDays - : plan.auditLogsRetentionDays; + const publishable = toPublishableEvent(event); - const ttl = ttlInDays * MS_IN_DAY; - - const auditLog = await auditLogDAL.create({ - actor: actor.type, - actorMetadata: actor.metadata, - userAgent, - projectId, - projectName: project?.name, - ipAddress, - orgId, - eventType: event.type, - expiresAt: new Date(Date.now() + ttl), - eventMetadata: event.metadata, - userAgentType - }); - - const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : []; - await Promise.allSettled( - logStreams.map( - async ({ - url, - encryptedHeadersTag, - encryptedHeadersIV, - encryptedHeadersKeyEncoding, - encryptedHeadersCiphertext - }) => { - const streamHeaders = - encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag - ? (JSON.parse( - crypto - .encryption() - .symmetric() - .decryptWithRootEncryptionKey({ - keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding, - iv: encryptedHeadersIV, - tag: encryptedHeadersTag, - ciphertext: encryptedHeadersCiphertext - }) - ) as LogStreamHeaders[]) - : []; - - const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" }; - - if (streamHeaders.length) - streamHeaders.forEach(({ key, value }) => { - headers[key] = value; - }); - - try { - const response = await request.post( - url, - { ...providerSpecificPayload(url), ...auditLog }, - { - headers, - // request timeout - timeout: AUDIT_LOG_STREAM_TIMEOUT, - // connection timeout - signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT) - } - ); - return response; - } catch (error) { - logger.error( - `Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]` - ); - return error; - } - } - ) - ); + if (publishable) { + await eventBusService.publish(TopicName.CoreServers, { + type: ProjectType.SecretManager, + source: "infiscal", + data: publishable.data + }); + } }); return { diff --git a/backend/src/ee/services/event/event-bus-service.ts b/backend/src/ee/services/event/event-bus-service.ts new file mode 100644 index 0000000000..63f3b9fae1 --- /dev/null +++ b/backend/src/ee/services/event/event-bus-service.ts @@ -0,0 +1,83 @@ +import Redis from "ioredis"; +import { z } from "zod"; + +import { logger } from "@app/lib/logger"; + +import { EventSchema, TopicName } from "./types"; + +export const eventBusFactory = (redis: Redis) => { + const publisher = redis.duplicate(); + // Duplicate the publisher to create a subscriber. + // This is necessary because Redis does not allow a single connection to both publish and subscribe. + const subscriber = publisher.duplicate(); + + const init = async (topics: TopicName[] = Object.values(TopicName)) => { + subscriber.on("error", (e) => { + logger.error(e, "Event Bus subscriber error"); + }); + + publisher.on("error", (e) => { + logger.error(e, "Event Bus publisher error"); + }); + + await subscriber.subscribe(...topics); + }; + + /** + * Publishes an event to the specified topic. + * @param topic - The topic to publish the event to. + * @param event - The event data to publish. + */ + const publish = async >(topic: TopicName, event: T) => { + const json = JSON.stringify(event); + + return publisher.publish(topic, json, (err) => { + if (err) { + return logger.error(err, `Error publishing to channel ${topic}`); + } + }); + }; + + /** + * @param fn - The function to call when a message is received. + * It should accept the parsed event data as an argument. + * @template T - The type of the event data, which should match the schema defined in EventSchema. + * @returns A function that can be called to unsubscribe from the event bus. + */ + const subscribe = >(fn: (data: T) => Promise | void) => { + // Not using async await cause redis client's `on` method does not expect async listeners. + const listener = (channel: string, message: string) => { + try { + const parsed = JSON.parse(message) as T; + const thenable = fn(parsed); + + // If the function returns a Promise, catch any errors that occur during processing. + if (thenable instanceof Promise) { + thenable.catch((error) => { + logger.error(error, `Error processing message from channel ${channel}`); + }); + } + } catch (error) { + logger.error(error, `Error parsing message data from channel ${channel}`); + } + }; + subscriber.on("message", listener); + + return () => { + subscriber.off("message", listener); + }; + }; + + const close = async () => { + try { + await publisher.quit(); + await subscriber.quit(); + } catch (error) { + logger.error(error, "Error closing event bus connections"); + } + }; + + return { init, publish, subscribe, close }; +}; + +export type TEventBusService = ReturnType; diff --git a/backend/src/ee/services/event/event-sse-service.ts b/backend/src/ee/services/event/event-sse-service.ts new file mode 100644 index 0000000000..f8c025e77f --- /dev/null +++ b/backend/src/ee/services/event/event-sse-service.ts @@ -0,0 +1,164 @@ +/* eslint-disable no-continue */ +import { subject } from "@casl/ability"; +import Redis from "ioredis"; + +import { KeyStorePrefixes } from "@app/keystore/keystore"; +import { logger } from "@app/lib/logger"; + +import { TEventBusService } from "./event-bus-service"; +import { createEventStreamClient, EventStreamClient, IEventStreamClientOpts } from "./event-sse-stream"; +import { EventData, RegisteredEvent, toBusEventName } from "./types"; + +const AUTH_REFRESH_INTERVAL = 60 * 1000; +const HEART_BEAT_INTERVAL = 15 * 1000; + +export const sseServiceFactory = (bus: TEventBusService, redis: Redis) => { + let heartbeatInterval: NodeJS.Timeout | null = null; + + const clients = new Set(); + + heartbeatInterval = setInterval(() => { + for (const client of clients) { + if (client.stream.closed) continue; + void client.ping(); + } + }, HEART_BEAT_INTERVAL); + + const refreshInterval = setInterval(() => { + for (const client of clients) { + if (client.stream.closed) continue; + void client.refresh(); + } + }, AUTH_REFRESH_INTERVAL); + + const removeActiveConnection = async (projectId: string, identityId: string, connectionId: string) => { + const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId); + const key = KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, connectionId); + + await Promise.all([redis.lrem(set, 0, connectionId), redis.del(key)]); + }; + + const getActiveConnectionsCount = async (projectId: string, identityId: string) => { + const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId); + const connections = await redis.lrange(set, 0, -1); + + if (connections.length === 0) { + return 0; // No active connections + } + + const keys = connections.map((c) => KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, c)); + + const values = await redis.mget(...keys); + + // eslint-disable-next-line no-plusplus + for (let i = 0; i < values.length; i++) { + if (values[i] === null) { + // eslint-disable-next-line no-await-in-loop + await removeActiveConnection(projectId, identityId, connections[i]); + } + } + + return redis.llen(set); + }; + + const onDisconnect = async (client: EventStreamClient) => { + try { + client.close(); + clients.delete(client); + await removeActiveConnection(client.auth.projectId, client.auth.actorId, client.id); + } catch (error) { + logger.error(error, "Error during SSE stream disconnection"); + } + }; + + function filterEventsForClient(client: EventStreamClient, event: EventData, registered: RegisteredEvent[]) { + const eventType = toBusEventName(event.data.eventType); + const match = registered.find((r) => r.event === eventType); + if (!match) return; + + const item = event.data.payload; + + if (Array.isArray(item)) { + if (item.length === 0) return; + + const baseSubject = { + eventType, + environment: undefined as string | undefined, + secretPath: undefined as string | undefined + }; + + const filtered = item.filter((ev) => { + baseSubject.secretPath = ev.secretPath ?? "/"; + baseSubject.environment = ev.environment; + + return client.matcher.can("subscribe", subject(event.type, baseSubject)); + }); + + if (filtered.length === 0) return; + + return client.send({ + ...event, + data: { + ...event.data, + payload: filtered + } + }); + } + + // For single item + const baseSubject = { + eventType, + secretPath: item.secretPath ?? "/", + environment: item.environment + }; + + if (client.matcher.can("subscribe", subject(event.type, baseSubject))) { + client.send(event); + } + } + + const subscribe = async ( + opts: IEventStreamClientOpts & { + onClose?: () => void; + } + ) => { + const client = createEventStreamClient(redis, opts); + + // Set up event listener on event bus + const unsubscribe = bus.subscribe((event) => { + if (event.type !== opts.type) return; + filterEventsForClient(client, event, opts.registered); + }); + + client.stream.on("close", () => { + unsubscribe(); + void onDisconnect(client); // This will never throw + }); + + await client.open(); + + clients.add(client); + + return client; + }; + + const close = () => { + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + } + + if (refreshInterval) { + clearInterval(refreshInterval); + } + + for (const client of clients) { + client.close(); + } + + clients.clear(); + }; + + return { subscribe, close, getActiveConnectionsCount }; +}; + +export type TServerSentEventsService = ReturnType; diff --git a/backend/src/ee/services/event/event-sse-stream.ts b/backend/src/ee/services/event/event-sse-stream.ts new file mode 100644 index 0000000000..d73eccb42c --- /dev/null +++ b/backend/src/ee/services/event/event-sse-stream.ts @@ -0,0 +1,178 @@ +/* eslint-disable no-underscore-dangle */ +import { Readable } from "node:stream"; + +import { MongoAbility, PureAbility } from "@casl/ability"; +import { MongoQuery } from "@ucast/mongo2js"; +import Redis from "ioredis"; +import { nanoid } from "nanoid"; + +import { ProjectType } from "@app/db/schemas"; +import { ProjectPermissionSet } from "@app/ee/services/permission/project-permission"; +import { KeyStorePrefixes } from "@app/keystore/keystore"; +import { conditionsMatcher } from "@app/lib/casl"; +import { logger } from "@app/lib/logger"; + +import { EventData, RegisteredEvent } from "./types"; + +export const getServerSentEventsHeaders = () => + ({ + "Cache-Control": "no-cache", + "Content-Type": "text/event-stream", + Connection: "keep-alive", + "X-Accel-Buffering": "no" + }) as const; + +type TAuthInfo = { + actorId: string; + projectId: string; + permission: MongoAbility; +}; + +export interface IEventStreamClientOpts { + type: ProjectType; + registered: RegisteredEvent[]; + onAuthRefresh: (info: TAuthInfo) => Promise | void; + getAuthInfo: () => Promise | TAuthInfo; +} + +interface EventMessage { + time?: string | number; + type: string; + data?: unknown; +} + +function serializeSseEvent(chunk: EventMessage): string { + let payload = ""; + + if (chunk.time) payload += `id: ${chunk.time}\n`; + if (chunk.type) payload += `event: ${chunk.type}\n`; + if (chunk.data) payload += `data: ${JSON.stringify(chunk)}\n`; + + return `${payload}\n`; +} + +export type EventStreamClient = { + id: string; + stream: Readable; + open: () => Promise; + send: (data: EventMessage | EventData) => void; + ping: () => Promise; + refresh: () => Promise; + close: () => void; + get auth(): TAuthInfo; + signal: AbortSignal; + abort: () => void; + matcher: PureAbility; +}; + +export function createEventStreamClient(redis: Redis, options: IEventStreamClientOpts): EventStreamClient { + const rules = options.registered.map((r) => ({ + subject: options.type, + action: "subscribe", + conditions: { + eventType: r.event, + secretPath: r.conditions?.secretPath ?? "/", + environment: r.conditions?.environmentSlug + } + })); + + const id = `sse-${nanoid()}`; + const control = new AbortController(); + const matcher = new PureAbility(rules, { conditionsMatcher }); + + let auth: TAuthInfo | undefined; + + const stream = new Readable({ + objectMode: true + }); + + // We will manually push data to the stream + stream._read = () => {}; + + const send = (data: EventMessage | EventData) => { + const chunk = serializeSseEvent(data); + if (!stream.push(chunk)) { + logger.debug("Backpressure detected: dropped manual event"); + } + }; + + stream.on("error", (error: Error) => stream.destroy(error)); + + const open = async () => { + auth = await options.getAuthInfo(); + await options.onAuthRefresh(auth); + + const { actorId, projectId } = auth; + const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, actorId); + const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id); + + await Promise.all([redis.rpush(set, id), redis.set(key, "1", "EX", 60)]); + }; + + const ping = async () => { + if (!auth) return; // Avoid race condition if ping is called before open + + const { actorId, projectId } = auth; + const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id); + + await redis.set(key, "1", "EX", 60); + + stream.push("1"); + }; + + const close = () => { + if (stream.closed) return; + stream.push(null); + stream.destroy(); + }; + + /** + * Refreshes the connection's auth permissions + * Must be called atleast once when connection is opened + */ + const refresh = async () => { + try { + auth = await options.getAuthInfo(); + await options.onAuthRefresh(auth); + } catch (error) { + if (error instanceof Error) { + send({ + type: "error", + data: { + ...error + } + }); + return close(); + } + stream.emit("error", error); + } + }; + + const abort = () => { + try { + control.abort(); + } catch (error) { + logger.debug(error, "Error aborting SSE stream"); + } + }; + + return { + id, + stream, + open, + send, + ping, + refresh, + close, + signal: control.signal, + abort, + matcher, + get auth() { + if (!auth) { + throw new Error("Auth info not set"); + } + + return auth; + } + }; +} diff --git a/backend/src/ee/services/event/types.ts b/backend/src/ee/services/event/types.ts new file mode 100644 index 0000000000..721e6ea13b --- /dev/null +++ b/backend/src/ee/services/event/types.ts @@ -0,0 +1,125 @@ +import { z } from "zod"; + +import { ProjectType } from "@app/db/schemas"; +import { Event, EventType } from "@app/ee/services/audit-log/audit-log-types"; + +export enum TopicName { + CoreServers = "infisical::core-servers" +} + +export enum BusEventName { + CreateSecret = "secret:create", + UpdateSecret = "secret:update", + DeleteSecret = "secret:delete" +} + +type PublisableEventTypes = + | EventType.CREATE_SECRET + | EventType.CREATE_SECRETS + | EventType.DELETE_SECRET + | EventType.DELETE_SECRETS + | EventType.UPDATE_SECRETS + | EventType.UPDATE_SECRET; + +export function toBusEventName(input: EventType) { + switch (input) { + case EventType.CREATE_SECRET: + case EventType.CREATE_SECRETS: + return BusEventName.CreateSecret; + case EventType.UPDATE_SECRET: + case EventType.UPDATE_SECRETS: + return BusEventName.UpdateSecret; + case EventType.DELETE_SECRET: + case EventType.DELETE_SECRETS: + return BusEventName.DeleteSecret; + default: + return null; + } +} + +const isBulkEvent = (event: Event): event is Extract } }> => { + return event.type.endsWith("-secrets"); // Feels so wrong +}; + +export const toPublishableEvent = (event: Event) => { + const name = toBusEventName(event.type); + + if (!name) return null; + + const e = event as Extract; + + if (isBulkEvent(e)) { + return { + name, + isBulk: true, + data: { + eventType: e.type, + payload: e.metadata.secrets.map((s) => ({ + environment: e.metadata.environment, + secretPath: e.metadata.secretPath, + ...s + })) + } + } as const; + } + + return { + name, + isBulk: false, + data: { + eventType: e.type, + payload: { + ...e.metadata, + environment: e.metadata.environment + } + } + } as const; +}; + +export const EventName = z.nativeEnum(BusEventName); + +const EventSecretPayload = z.object({ + secretPath: z.string().optional(), + secretId: z.string(), + secretKey: z.string(), + environment: z.string() +}); + +export type EventSecret = z.infer; + +export const EventSchema = z.object({ + datacontenttype: z.literal("application/json").optional().default("application/json"), + type: z.nativeEnum(ProjectType), + source: z.string(), + time: z + .string() + .optional() + .default(() => new Date().toISOString()), + data: z.discriminatedUnion("eventType", [ + z.object({ + specversion: z.number().optional().default(1), + eventType: z.enum([EventType.CREATE_SECRET, EventType.UPDATE_SECRET, EventType.DELETE_SECRET]), + payload: EventSecretPayload + }), + z.object({ + specversion: z.number().optional().default(1), + eventType: z.enum([EventType.CREATE_SECRETS, EventType.UPDATE_SECRETS, EventType.DELETE_SECRETS]), + payload: EventSecretPayload.array() + }) + // Add more event types as needed + ]) +}); + +export type EventData = z.infer; + +export const EventRegisterSchema = z.object({ + event: EventName, + conditions: z + .object({ + secretPath: z.string().optional().default("/"), + environmentSlug: z.string() + }) + .optional() +}); + +export type RegisteredEvent = z.infer; diff --git a/backend/src/ee/services/license/license-fns.ts b/backend/src/ee/services/license/license-fns.ts index 5b755567b3..fecba7ba73 100644 --- a/backend/src/ee/services/license/license-fns.ts +++ b/backend/src/ee/services/license/license-fns.ts @@ -59,7 +59,8 @@ export const getDefaultOnPremFeatures = (): TFeatureSet => ({ secretScanning: false, enterpriseSecretSyncs: false, enterpriseAppConnections: false, - fips: false + fips: false, + eventSubscriptions: false }); export const setupLicenseRequestWithStore = ( diff --git a/backend/src/ee/services/license/license-types.ts b/backend/src/ee/services/license/license-types.ts index a3412f5747..84a652c468 100644 --- a/backend/src/ee/services/license/license-types.ts +++ b/backend/src/ee/services/license/license-types.ts @@ -76,6 +76,7 @@ export type TFeatureSet = { enterpriseSecretSyncs: false; enterpriseAppConnections: false; fips: false; + eventSubscriptions: false; }; export type TOrgPlansTableDTO = { diff --git a/backend/src/ee/services/permission/default-roles.ts b/backend/src/ee/services/permission/default-roles.ts index cca4efaf24..11d8e05181 100644 --- a/backend/src/ee/services/permission/default-roles.ts +++ b/backend/src/ee/services/permission/default-roles.ts @@ -161,7 +161,8 @@ const buildAdminPermissionRules = () => { ProjectPermissionSecretActions.ReadValue, ProjectPermissionSecretActions.Create, ProjectPermissionSecretActions.Edit, - ProjectPermissionSecretActions.Delete + ProjectPermissionSecretActions.Delete, + ProjectPermissionSecretActions.Subscribe ], ProjectPermissionSub.Secrets ); @@ -265,7 +266,8 @@ const buildMemberPermissionRules = () => { ProjectPermissionSecretActions.ReadValue, ProjectPermissionSecretActions.Edit, ProjectPermissionSecretActions.Create, - ProjectPermissionSecretActions.Delete + ProjectPermissionSecretActions.Delete, + ProjectPermissionSecretActions.Subscribe ], ProjectPermissionSub.Secrets ); diff --git a/backend/src/ee/services/permission/project-permission.ts b/backend/src/ee/services/permission/project-permission.ts index 966146c27b..899d364ebc 100644 --- a/backend/src/ee/services/permission/project-permission.ts +++ b/backend/src/ee/services/permission/project-permission.ts @@ -36,7 +36,8 @@ export enum ProjectPermissionSecretActions { ReadValue = "readValue", Create = "create", Edit = "edit", - Delete = "delete" + Delete = "delete", + Subscribe = "subscribe" } export enum ProjectPermissionCmekActions { @@ -204,6 +205,7 @@ export type SecretSubjectFields = { secretPath: string; secretName?: string; secretTags?: string[]; + eventType?: string; }; export type SecretFolderSubjectFields = { @@ -483,7 +485,17 @@ const SecretConditionV2Schema = z .object({ [PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN] }) - .partial() + .partial(), + eventType: z.union([ + z.string(), + z + .object({ + [PermissionConditionOperators.$EQ]: PermissionConditionSchema[PermissionConditionOperators.$EQ], + [PermissionConditionOperators.$NEQ]: PermissionConditionSchema[PermissionConditionOperators.$NEQ], + [PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN] + }) + .partial() + ]) }) .partial(); diff --git a/backend/src/keystore/keystore.ts b/backend/src/keystore/keystore.ts index 79679d256c..26aff767e6 100644 --- a/backend/src/keystore/keystore.ts +++ b/backend/src/keystore/keystore.ts @@ -46,7 +46,11 @@ export const KeyStorePrefixes = { IdentityAccessTokenStatusUpdate: (identityAccessTokenId: string) => `identity-access-token-status:${identityAccessTokenId}`, ServiceTokenStatusUpdate: (serviceTokenId: string) => `service-token-status:${serviceTokenId}`, - GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}` + GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}`, + ActiveSSEConnectionsSet: (projectId: string, identityId: string) => + `sse-connections:${projectId}:${identityId}` as const, + ActiveSSEConnections: (projectId: string, identityId: string, connectionId: string) => + `sse-connections:${projectId}:${identityId}:${connectionId}` as const }; export const KeyStoreTtls = { diff --git a/backend/src/server/plugins/auth/inject-identity.ts b/backend/src/server/plugins/auth/inject-identity.ts index 4fe639510e..97c62b545d 100644 --- a/backend/src/server/plugins/auth/inject-identity.ts +++ b/backend/src/server/plugins/auth/inject-identity.ts @@ -22,6 +22,7 @@ export type TAuthMode = orgId: string; authMethod: AuthMethod; isMfaVerified?: boolean; + token: AuthModeJwtTokenPayload; } | { authMode: AuthMode.API_KEY; @@ -30,6 +31,7 @@ export type TAuthMode = userId: string; user: TUsers; orgId: string; + token: string; } | { authMode: AuthMode.SERVICE_TOKEN; @@ -38,6 +40,7 @@ export type TAuthMode = serviceTokenId: string; orgId: string; authMethod: null; + token: string; } | { authMode: AuthMode.IDENTITY_ACCESS_TOKEN; @@ -47,6 +50,7 @@ export type TAuthMode = orgId: string; authMethod: null; isInstanceAdmin?: boolean; + token: TIdentityAccessTokenJwtPayload; } | { authMode: AuthMode.SCIM_TOKEN; @@ -56,7 +60,7 @@ export type TAuthMode = authMethod: null; }; -const extractAuth = async (req: FastifyRequest, jwtSecret: string) => { +export const extractAuth = async (req: FastifyRequest, jwtSecret: string) => { const apiKey = req.headers?.["x-api-key"]; if (apiKey) { return { authMode: AuthMode.API_KEY, token: apiKey, actor: ActorType.USER } as const; @@ -133,7 +137,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => { actor, orgId: orgId as string, authMethod: token.authMethod, - isMfaVerified: token.isMfaVerified + isMfaVerified: token.isMfaVerified, + token }; break; } @@ -148,7 +153,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => { identityId: identity.identityId, identityName: identity.name, authMethod: null, - isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId) + isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId), + token }; if (token?.identityAuth?.oidc) { requestContext.set("identityAuthInfo", { @@ -179,7 +185,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => { serviceToken, serviceTokenId: serviceToken.id, actor, - authMethod: null + authMethod: null, + token }; break; } @@ -191,7 +198,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => { actor, user, orgId: "API_KEY", // We set the orgId to an arbitrary value, since we can't link an API key to a specific org. We have to deprecate API keys soon! - authMethod: null + authMethod: null, + token: token as string }; break; } diff --git a/backend/src/server/routes/index.ts b/backend/src/server/routes/index.ts index ff779df5dc..256c622f95 100644 --- a/backend/src/server/routes/index.ts +++ b/backend/src/server/routes/index.ts @@ -31,6 +31,8 @@ import { buildDynamicSecretProviders } from "@app/ee/services/dynamic-secret/pro import { dynamicSecretLeaseDALFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-dal"; import { dynamicSecretLeaseQueueServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-queue"; import { dynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-service"; +import { eventBusFactory } from "@app/ee/services/event/event-bus-service"; +import { sseServiceFactory } from "@app/ee/services/event/event-sse-service"; import { externalKmsDALFactory } from "@app/ee/services/external-kms/external-kms-dal"; import { externalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service"; import { gatewayDALFactory } from "@app/ee/services/gateway/gateway-dal"; @@ -495,6 +497,9 @@ export const registerRoutes = async ( const projectMicrosoftTeamsConfigDAL = projectMicrosoftTeamsConfigDALFactory(db); const secretScanningV2DAL = secretScanningV2DALFactory(db); + const eventBusService = eventBusFactory(server.redis); + const sseService = sseServiceFactory(eventBusService, server.redis); + const permissionService = permissionServiceFactory({ permissionDAL, orgRoleDAL, @@ -552,7 +557,8 @@ export const registerRoutes = async ( queueService, projectDAL, licenseService, - auditLogStreamDAL + auditLogStreamDAL, + eventBusService }); const auditLogService = auditLogServiceFactory({ auditLogDAL, permissionService, auditLogQueue }); @@ -1968,6 +1974,7 @@ export const registerRoutes = async ( await kmsService.startService(); await microsoftTeamsService.start(); await dynamicSecretQueueService.init(); + await eventBusService.init(); // inject all services server.decorate("services", { @@ -2074,7 +2081,9 @@ export const registerRoutes = async ( githubOrgSync: githubOrgSyncConfigService, folderCommit: folderCommitService, secretScanningV2: secretScanningV2Service, - reminder: reminderService + reminder: reminderService, + bus: eventBusService, + sse: sseService }); const cronJobs: CronJob[] = []; @@ -2190,5 +2199,7 @@ export const registerRoutes = async ( server.addHook("onClose", async () => { cronJobs.forEach((job) => job.stop()); await telemetryService.flushAll(); + await eventBusService.close(); + sseService.close(); }); }; diff --git a/backend/src/server/routes/v1/event-router.ts b/backend/src/server/routes/v1/event-router.ts new file mode 100644 index 0000000000..b8a043db55 --- /dev/null +++ b/backend/src/server/routes/v1/event-router.ts @@ -0,0 +1,118 @@ +/* eslint-disable @typescript-eslint/no-floating-promises */ +import { subject } from "@casl/ability"; +import { pipeline } from "stream/promises"; +import { z } from "zod"; + +import { ActionProjectType, ProjectType } from "@app/db/schemas"; +import { getServerSentEventsHeaders } from "@app/ee/services/event/event-sse-stream"; +import { EventRegisterSchema } from "@app/ee/services/event/types"; +import { ProjectPermissionSecretActions, ProjectPermissionSub } from "@app/ee/services/permission/project-permission"; +import { BadRequestError, ForbiddenRequestError, RateLimitError } from "@app/lib/errors"; +import { readLimit } from "@app/server/config/rateLimiter"; +import { verifyAuth } from "@app/server/plugins/auth/verify-auth"; +import { AuthMode } from "@app/services/auth/auth-type"; + +export const registerEventRouter = async (server: FastifyZodProvider) => { + server.route({ + method: "POST", + url: "/subscribe/project-events", + config: { + rateLimit: readLimit + }, + schema: { + body: z.object({ + projectId: z.string().trim(), + register: z.array(EventRegisterSchema).max(10) + }) + }, + onRequest: verifyAuth([AuthMode.JWT, AuthMode.IDENTITY_ACCESS_TOKEN]), + handler: async (req, reply) => { + try { + const { sse, permission, identityAccessToken, authToken, license } = req.server.services; + + const plan = await license.getPlan(req.auth.orgId); + + if (!plan.eventSubscriptions) { + throw new BadRequestError({ + message: + "Failed to use event subscriptions due to plan restriction. Upgrade plan to access enterprise event subscriptions." + }); + } + + const count = await sse.getActiveConnectionsCount(req.body.projectId, req.permission.id); + + if (count >= 5) { + throw new RateLimitError({ + message: `Too many active connections for project ${req.body.projectId}. Please close some connections before opening a new one.` + }); + } + + const client = await sse.subscribe({ + type: ProjectType.SecretManager, + registered: req.body.register, + async getAuthInfo() { + const ability = await permission.getProjectPermission({ + actor: req.auth.actor, + projectId: req.body.projectId, + actionProjectType: ActionProjectType.Any, + actorAuthMethod: req.auth.authMethod, + actorId: req.permission.id, + actorOrgId: req.permission.orgId + }); + + return { permission: ability.permission, actorId: req.permission.id, projectId: req.body.projectId }; + }, + async onAuthRefresh(info) { + switch (req.auth.authMode) { + case AuthMode.JWT: + await authToken.fnValidateJwtIdentity(req.auth.token); + break; + case AuthMode.IDENTITY_ACCESS_TOKEN: + await identityAccessToken.fnValidateIdentityAccessToken(req.auth.token, req.realIp); + break; + default: + throw new Error("Unsupported authentication method"); + } + + req.body.register.forEach((r) => { + const allowed = info.permission.can( + ProjectPermissionSecretActions.Subscribe, + subject(ProjectPermissionSub.Secrets, { + environment: r.conditions?.environmentSlug ?? "", + secretPath: r.conditions?.secretPath ?? "/", + eventType: r.event + }) + ); + + if (!allowed) { + throw new ForbiddenRequestError({ + name: "PermissionDenied", + message: `You are not allowed to subscribe on secrets`, + details: { + event: r.event, + environmentSlug: r.conditions?.environmentSlug, + secretPath: r.conditions?.secretPath ?? "/" + } + }); + } + }); + } + }); + + // Switches to manual response and enable SSE streaming + reply.hijack(); + reply.raw.writeHead(200, getServerSentEventsHeaders()).flushHeaders(); + reply.raw.on("close", client.abort); + + await pipeline(client.stream, reply.raw, { signal: client.signal }); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + // If the stream is aborted, we don't need to do anything + return; + } + + throw error; + } + } + }); +}; diff --git a/backend/src/server/routes/v1/index.ts b/backend/src/server/routes/v1/index.ts index 848bd4f314..458efd1941 100644 --- a/backend/src/server/routes/v1/index.ts +++ b/backend/src/server/routes/v1/index.ts @@ -13,6 +13,7 @@ import { registerCaRouter } from "./certificate-authority-router"; import { CERTIFICATE_AUTHORITY_REGISTER_ROUTER_MAP } from "./certificate-authority-routers"; import { registerCertRouter } from "./certificate-router"; import { registerCertificateTemplateRouter } from "./certificate-template-router"; +import { registerEventRouter } from "./event-router"; import { registerExternalGroupOrgRoleMappingRouter } from "./external-group-org-role-mapping-router"; import { registerIdentityAccessTokenRouter } from "./identity-access-token-router"; import { registerIdentityAliCloudAuthRouter } from "./identity-alicloud-auth-router"; @@ -183,4 +184,6 @@ export const registerV1Routes = async (server: FastifyZodProvider) => { }, { prefix: "/reminders" } ); + + await server.register(registerEventRouter, { prefix: "/events" }); }; diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 00dc19a46b..2de271180b 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -197,4 +197,4 @@ volumes: driver: local ldap_data: ldap_config: - grafana_storage: + grafana_storage: \ No newline at end of file diff --git a/frontend/src/context/ProjectPermissionContext/types.ts b/frontend/src/context/ProjectPermissionContext/types.ts index 3f301942bb..3c0f9df321 100644 --- a/frontend/src/context/ProjectPermissionContext/types.ts +++ b/frontend/src/context/ProjectPermissionContext/types.ts @@ -21,7 +21,8 @@ export enum ProjectPermissionSecretActions { ReadValue = "readValue", Create = "create", Edit = "edit", - Delete = "delete" + Delete = "delete", + Subscribe = "subscribe" } export enum ProjectPermissionDynamicSecretActions { diff --git a/frontend/src/pages/project/RoleDetailsBySlugPage/components/PermissionConditionHelpers.tsx b/frontend/src/pages/project/RoleDetailsBySlugPage/components/PermissionConditionHelpers.tsx index d124c4dffc..a39ae8d640 100644 --- a/frontend/src/pages/project/RoleDetailsBySlugPage/components/PermissionConditionHelpers.tsx +++ b/frontend/src/pages/project/RoleDetailsBySlugPage/components/PermissionConditionHelpers.tsx @@ -39,6 +39,14 @@ export const renderOperatorSelectItems = (type: string) => { In ); + case "eventType": + return ( + <> + Equal + Not Equal + In + + ); default: return ( <> diff --git a/frontend/src/pages/project/RoleDetailsBySlugPage/components/ProjectRoleModifySection.utils.tsx b/frontend/src/pages/project/RoleDetailsBySlugPage/components/ProjectRoleModifySection.utils.tsx index 471246d04b..602ce457ee 100644 --- a/frontend/src/pages/project/RoleDetailsBySlugPage/components/ProjectRoleModifySection.utils.tsx +++ b/frontend/src/pages/project/RoleDetailsBySlugPage/components/ProjectRoleModifySection.utils.tsx @@ -54,7 +54,8 @@ const SecretPolicyActionSchema = z.object({ [ProjectPermissionSecretActions.ReadValue]: z.boolean().optional(), [ProjectPermissionSecretActions.Edit]: z.boolean().optional(), [ProjectPermissionSecretActions.Delete]: z.boolean().optional(), - [ProjectPermissionSecretActions.Create]: z.boolean().optional() + [ProjectPermissionSecretActions.Create]: z.boolean().optional(), + [ProjectPermissionSecretActions.Subscribe]: z.boolean().optional() }); const ApprovalPolicyActionSchema = z.object({ @@ -588,6 +589,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => { const canEdit = action.includes(ProjectPermissionSecretActions.Edit); const canDelete = action.includes(ProjectPermissionSecretActions.Delete); const canCreate = action.includes(ProjectPermissionSecretActions.Create); + const canSubscribe = action.includes(ProjectPermissionSecretActions.Subscribe); // from above statement we are sure it won't be undefined formVal[subject]!.push({ @@ -597,6 +599,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => { create: canCreate, edit: canEdit, delete: canDelete, + subscribe: canSubscribe, conditions: conditions ? convertCaslConditionToFormOperator(conditions) : [], inverted }); @@ -1111,7 +1114,8 @@ export const PROJECT_PERMISSION_OBJECT: TProjectPermissionObject = { { label: "Read Value", value: ProjectPermissionSecretActions.ReadValue }, { label: "Modify", value: ProjectPermissionSecretActions.Edit }, { label: "Remove", value: ProjectPermissionSecretActions.Delete }, - { label: "Create", value: ProjectPermissionSecretActions.Create } + { label: "Create", value: ProjectPermissionSecretActions.Create }, + { label: "Subscribe", value: ProjectPermissionSecretActions.Subscribe } ] }, [ProjectPermissionSub.SecretFolders]: { diff --git a/frontend/src/pages/project/RoleDetailsBySlugPage/components/SecretPermissionConditions.tsx b/frontend/src/pages/project/RoleDetailsBySlugPage/components/SecretPermissionConditions.tsx index 9978b3e458..b7f846bbd2 100644 --- a/frontend/src/pages/project/RoleDetailsBySlugPage/components/SecretPermissionConditions.tsx +++ b/frontend/src/pages/project/RoleDetailsBySlugPage/components/SecretPermissionConditions.tsx @@ -17,7 +17,8 @@ export const SecretPermissionConditions = ({ position = 0, isDisabled }: Props) { value: "environment", label: "Environment Slug" }, { value: "secretPath", label: "Secret Path" }, { value: "secretName", label: "Secret Name" }, - { value: "secretTags", label: "Secret Tags" } + { value: "secretTags", label: "Secret Tags" }, + { value: "eventType", label: "Event Type" } ]} /> ); diff --git a/nginx/default.dev.conf b/nginx/default.dev.conf index f1de1790c5..c92199a6ee 100644 --- a/nginx/default.dev.conf +++ b/nginx/default.dev.conf @@ -1,3 +1,7 @@ +upstream api { + server backend:4000; +} + server { listen 80; @@ -11,7 +15,7 @@ server { proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; - proxy_pass http://backend:4000; + proxy_pass http://api; proxy_redirect off; proxy_cookie_path / "/; SameSite=strict"; @@ -24,7 +28,7 @@ server { proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; - proxy_pass http://backend:4000; + proxy_pass http://api; proxy_redirect off; proxy_cookie_path / "/; HttpOnly; SameSite=strict"; @@ -39,7 +43,7 @@ server { proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; - proxy_pass http://backend:4000; + proxy_pass http://api; proxy_redirect off; proxy_cookie_path / "/; HttpOnly; SameSite=strict"; @@ -57,7 +61,7 @@ server { # proxy_set_header X-SSL-Client-Cert $http_x_ssl_client_cert; # proxy_pass_request_headers on; - proxy_pass http://backend:4000; + proxy_pass http://api; proxy_redirect off; # proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict"; diff --git a/sink/index.html b/sink/index.html new file mode 100644 index 0000000000..f140d1c3b1 --- /dev/null +++ b/sink/index.html @@ -0,0 +1,89 @@ + + + + + + Document + + + + +

EventSource Example

+ +
+

This page listens for server-sent events from the backend.

+

Open your browser's console to see the received events.

+ +
    + +
+
+ + + +