mirror of
https://github.com/Infisical/infisical.git
synced 2026-01-08 23:18:05 -05:00
feat: events system implementation (#4246)
* chore: save poc * chore: save wip * fix: undo cors * fix: impl changes * fix: PR changes * fix: mocks * fix: connection tracking and auth changes * fix: PR changes * fix: revert license * feat: frontend change * fix: revert docker compose.dev * fix: duplicate publisher connection * fix: pr changes * chore: move event impl to `ee` * fix: lint errors * fix: check length of events * fix: static permissions matching * fix: secretPath * fix: remove source prefix in bus event name * fix: license check
This commit is contained in:
4
backend/src/@types/fastify.d.ts
vendored
4
backend/src/@types/fastify.d.ts
vendored
@@ -12,6 +12,8 @@ import { TCertificateAuthorityCrlServiceFactory } from "@app/ee/services/certifi
|
|||||||
import { TCertificateEstServiceFactory } from "@app/ee/services/certificate-est/certificate-est-service";
|
import { TCertificateEstServiceFactory } from "@app/ee/services/certificate-est/certificate-est-service";
|
||||||
import { TDynamicSecretServiceFactory } from "@app/ee/services/dynamic-secret/dynamic-secret-types";
|
import { TDynamicSecretServiceFactory } from "@app/ee/services/dynamic-secret/dynamic-secret-types";
|
||||||
import { TDynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-types";
|
import { TDynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-types";
|
||||||
|
import { TEventBusService } from "@app/ee/services/event/event-bus-service";
|
||||||
|
import { TServerSentEventsService } from "@app/ee/services/event/event-sse-service";
|
||||||
import { TExternalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
|
import { TExternalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
|
||||||
import { TGatewayServiceFactory } from "@app/ee/services/gateway/gateway-service";
|
import { TGatewayServiceFactory } from "@app/ee/services/gateway/gateway-service";
|
||||||
import { TGithubOrgSyncServiceFactory } from "@app/ee/services/github-org-sync/github-org-sync-service";
|
import { TGithubOrgSyncServiceFactory } from "@app/ee/services/github-org-sync/github-org-sync-service";
|
||||||
@@ -296,6 +298,8 @@ declare module "fastify" {
|
|||||||
internalCertificateAuthority: TInternalCertificateAuthorityServiceFactory;
|
internalCertificateAuthority: TInternalCertificateAuthorityServiceFactory;
|
||||||
pkiTemplate: TPkiTemplatesServiceFactory;
|
pkiTemplate: TPkiTemplatesServiceFactory;
|
||||||
reminder: TReminderServiceFactory;
|
reminder: TReminderServiceFactory;
|
||||||
|
bus: TEventBusService;
|
||||||
|
sse: TServerSentEventsService;
|
||||||
};
|
};
|
||||||
// this is exclusive use for middlewares in which we need to inject data
|
// this is exclusive use for middlewares in which we need to inject data
|
||||||
// everywhere else access using service layer
|
// everywhere else access using service layer
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import { AxiosError, RawAxiosRequestHeaders } from "axios";
|
import { AxiosError, RawAxiosRequestHeaders } from "axios";
|
||||||
|
|
||||||
import { SecretKeyEncoding } from "@app/db/schemas";
|
import { ProjectType, SecretKeyEncoding } from "@app/db/schemas";
|
||||||
import { getConfig } from "@app/lib/config/env";
|
import { TEventBusService } from "@app/ee/services/event/event-bus-service";
|
||||||
|
import { TopicName, toPublishableEvent } from "@app/ee/services/event/types";
|
||||||
import { request } from "@app/lib/config/request";
|
import { request } from "@app/lib/config/request";
|
||||||
import { crypto } from "@app/lib/crypto/cryptography";
|
import { crypto } from "@app/lib/crypto/cryptography";
|
||||||
import { logger } from "@app/lib/logger";
|
import { logger } from "@app/lib/logger";
|
||||||
@@ -21,6 +22,7 @@ type TAuditLogQueueServiceFactoryDep = {
|
|||||||
queueService: TQueueServiceFactory;
|
queueService: TQueueServiceFactory;
|
||||||
projectDAL: Pick<TProjectDALFactory, "findById">;
|
projectDAL: Pick<TProjectDALFactory, "findById">;
|
||||||
licenseService: Pick<TLicenseServiceFactory, "getPlan">;
|
licenseService: Pick<TLicenseServiceFactory, "getPlan">;
|
||||||
|
eventBusService: TEventBusService;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type TAuditLogQueueServiceFactory = {
|
export type TAuditLogQueueServiceFactory = {
|
||||||
@@ -36,134 +38,18 @@ export const auditLogQueueServiceFactory = async ({
|
|||||||
queueService,
|
queueService,
|
||||||
projectDAL,
|
projectDAL,
|
||||||
licenseService,
|
licenseService,
|
||||||
auditLogStreamDAL
|
auditLogStreamDAL,
|
||||||
|
eventBusService
|
||||||
}: TAuditLogQueueServiceFactoryDep): Promise<TAuditLogQueueServiceFactory> => {
|
}: TAuditLogQueueServiceFactoryDep): Promise<TAuditLogQueueServiceFactory> => {
|
||||||
const appCfg = getConfig();
|
|
||||||
|
|
||||||
const pushToLog = async (data: TCreateAuditLogDTO) => {
|
const pushToLog = async (data: TCreateAuditLogDTO) => {
|
||||||
if (appCfg.USE_PG_QUEUE && appCfg.SHOULD_INIT_PG_QUEUE) {
|
|
||||||
await queueService.queuePg<QueueName.AuditLog>(QueueJobs.AuditLog, data, {
|
|
||||||
retryLimit: 10,
|
|
||||||
retryBackoff: true
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
await queueService.queue<QueueName.AuditLog>(QueueName.AuditLog, QueueJobs.AuditLog, data, {
|
await queueService.queue<QueueName.AuditLog>(QueueName.AuditLog, QueueJobs.AuditLog, data, {
|
||||||
removeOnFail: {
|
removeOnFail: {
|
||||||
count: 3
|
count: 3
|
||||||
},
|
},
|
||||||
removeOnComplete: true
|
removeOnComplete: true
|
||||||
});
|
});
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (appCfg.SHOULD_INIT_PG_QUEUE) {
|
|
||||||
await queueService.startPg<QueueName.AuditLog>(
|
|
||||||
QueueJobs.AuditLog,
|
|
||||||
async ([job]) => {
|
|
||||||
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
|
|
||||||
let { orgId } = job.data;
|
|
||||||
const MS_IN_DAY = 24 * 60 * 60 * 1000;
|
|
||||||
let project;
|
|
||||||
|
|
||||||
if (!orgId) {
|
|
||||||
// it will never be undefined for both org and project id
|
|
||||||
// TODO(akhilmhdh): use caching here in dal to avoid db calls
|
|
||||||
project = await projectDAL.findById(projectId as string);
|
|
||||||
orgId = project.orgId;
|
|
||||||
}
|
|
||||||
|
|
||||||
const plan = await licenseService.getPlan(orgId);
|
|
||||||
if (plan.auditLogsRetentionDays === 0) {
|
|
||||||
// skip inserting if audit log retention is 0 meaning its not supported
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// For project actions, set TTL to project-level audit log retention config
|
|
||||||
// This condition ensures that the plan's audit log retention days cannot be bypassed
|
|
||||||
const ttlInDays =
|
|
||||||
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
|
|
||||||
? project.auditLogsRetentionDays
|
|
||||||
: plan.auditLogsRetentionDays;
|
|
||||||
|
|
||||||
const ttl = ttlInDays * MS_IN_DAY;
|
|
||||||
|
|
||||||
const auditLog = await auditLogDAL.create({
|
|
||||||
actor: actor.type,
|
|
||||||
actorMetadata: actor.metadata,
|
|
||||||
userAgent,
|
|
||||||
projectId,
|
|
||||||
projectName: project?.name,
|
|
||||||
ipAddress,
|
|
||||||
orgId,
|
|
||||||
eventType: event.type,
|
|
||||||
expiresAt: new Date(Date.now() + ttl),
|
|
||||||
eventMetadata: event.metadata,
|
|
||||||
userAgentType
|
|
||||||
});
|
|
||||||
|
|
||||||
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
|
|
||||||
await Promise.allSettled(
|
|
||||||
logStreams.map(
|
|
||||||
async ({
|
|
||||||
url,
|
|
||||||
encryptedHeadersTag,
|
|
||||||
encryptedHeadersIV,
|
|
||||||
encryptedHeadersKeyEncoding,
|
|
||||||
encryptedHeadersCiphertext
|
|
||||||
}) => {
|
|
||||||
const streamHeaders =
|
|
||||||
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
|
|
||||||
? (JSON.parse(
|
|
||||||
crypto
|
|
||||||
.encryption()
|
|
||||||
.symmetric()
|
|
||||||
.decryptWithRootEncryptionKey({
|
|
||||||
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
|
|
||||||
iv: encryptedHeadersIV,
|
|
||||||
tag: encryptedHeadersTag,
|
|
||||||
ciphertext: encryptedHeadersCiphertext
|
|
||||||
})
|
|
||||||
) as LogStreamHeaders[])
|
|
||||||
: [];
|
|
||||||
|
|
||||||
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
|
|
||||||
|
|
||||||
if (streamHeaders.length)
|
|
||||||
streamHeaders.forEach(({ key, value }) => {
|
|
||||||
headers[key] = value;
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await request.post(
|
|
||||||
url,
|
|
||||||
{ ...providerSpecificPayload(url), ...auditLog },
|
|
||||||
{
|
|
||||||
headers,
|
|
||||||
// request timeout
|
|
||||||
timeout: AUDIT_LOG_STREAM_TIMEOUT,
|
|
||||||
// connection timeout
|
|
||||||
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return response;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]`
|
|
||||||
);
|
|
||||||
return error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
batchSize: 1,
|
|
||||||
workerCount: 30,
|
|
||||||
pollingIntervalSeconds: 0.5
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
queueService.start(QueueName.AuditLog, async (job) => {
|
queueService.start(QueueName.AuditLog, async (job) => {
|
||||||
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
|
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
|
||||||
let { orgId } = job.data;
|
let { orgId } = job.data;
|
||||||
@@ -178,11 +64,9 @@ export const auditLogQueueServiceFactory = async ({
|
|||||||
}
|
}
|
||||||
|
|
||||||
const plan = await licenseService.getPlan(orgId);
|
const plan = await licenseService.getPlan(orgId);
|
||||||
if (plan.auditLogsRetentionDays === 0) {
|
|
||||||
// skip inserting if audit log retention is 0 meaning its not supported
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// skip inserting if audit log retention is 0 meaning its not supported
|
||||||
|
if (plan.auditLogsRetentionDays !== 0) {
|
||||||
// For project actions, set TTL to project-level audit log retention config
|
// For project actions, set TTL to project-level audit log retention config
|
||||||
// This condition ensures that the plan's audit log retention days cannot be bypassed
|
// This condition ensures that the plan's audit log retention days cannot be bypassed
|
||||||
const ttlInDays =
|
const ttlInDays =
|
||||||
@@ -260,6 +144,17 @@ export const auditLogQueueServiceFactory = async ({
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const publishable = toPublishableEvent(event);
|
||||||
|
|
||||||
|
if (publishable) {
|
||||||
|
await eventBusService.publish(TopicName.CoreServers, {
|
||||||
|
type: ProjectType.SecretManager,
|
||||||
|
source: "infiscal",
|
||||||
|
data: publishable.data
|
||||||
|
});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
83
backend/src/ee/services/event/event-bus-service.ts
Normal file
83
backend/src/ee/services/event/event-bus-service.ts
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
import Redis from "ioredis";
|
||||||
|
import { z } from "zod";
|
||||||
|
|
||||||
|
import { logger } from "@app/lib/logger";
|
||||||
|
|
||||||
|
import { EventSchema, TopicName } from "./types";
|
||||||
|
|
||||||
|
export const eventBusFactory = (redis: Redis) => {
|
||||||
|
const publisher = redis.duplicate();
|
||||||
|
// Duplicate the publisher to create a subscriber.
|
||||||
|
// This is necessary because Redis does not allow a single connection to both publish and subscribe.
|
||||||
|
const subscriber = publisher.duplicate();
|
||||||
|
|
||||||
|
const init = async (topics: TopicName[] = Object.values(TopicName)) => {
|
||||||
|
subscriber.on("error", (e) => {
|
||||||
|
logger.error(e, "Event Bus subscriber error");
|
||||||
|
});
|
||||||
|
|
||||||
|
publisher.on("error", (e) => {
|
||||||
|
logger.error(e, "Event Bus publisher error");
|
||||||
|
});
|
||||||
|
|
||||||
|
await subscriber.subscribe(...topics);
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publishes an event to the specified topic.
|
||||||
|
* @param topic - The topic to publish the event to.
|
||||||
|
* @param event - The event data to publish.
|
||||||
|
*/
|
||||||
|
const publish = async <T extends z.input<typeof EventSchema>>(topic: TopicName, event: T) => {
|
||||||
|
const json = JSON.stringify(event);
|
||||||
|
|
||||||
|
return publisher.publish(topic, json, (err) => {
|
||||||
|
if (err) {
|
||||||
|
return logger.error(err, `Error publishing to channel ${topic}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param fn - The function to call when a message is received.
|
||||||
|
* It should accept the parsed event data as an argument.
|
||||||
|
* @template T - The type of the event data, which should match the schema defined in EventSchema.
|
||||||
|
* @returns A function that can be called to unsubscribe from the event bus.
|
||||||
|
*/
|
||||||
|
const subscribe = <T extends z.infer<typeof EventSchema>>(fn: (data: T) => Promise<void> | void) => {
|
||||||
|
// Not using async await cause redis client's `on` method does not expect async listeners.
|
||||||
|
const listener = (channel: string, message: string) => {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(message) as T;
|
||||||
|
const thenable = fn(parsed);
|
||||||
|
|
||||||
|
// If the function returns a Promise, catch any errors that occur during processing.
|
||||||
|
if (thenable instanceof Promise) {
|
||||||
|
thenable.catch((error) => {
|
||||||
|
logger.error(error, `Error processing message from channel ${channel}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Error parsing message data from channel ${channel}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
subscriber.on("message", listener);
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
subscriber.off("message", listener);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
const close = async () => {
|
||||||
|
try {
|
||||||
|
await publisher.quit();
|
||||||
|
await subscriber.quit();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, "Error closing event bus connections");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return { init, publish, subscribe, close };
|
||||||
|
};
|
||||||
|
|
||||||
|
export type TEventBusService = ReturnType<typeof eventBusFactory>;
|
||||||
164
backend/src/ee/services/event/event-sse-service.ts
Normal file
164
backend/src/ee/services/event/event-sse-service.ts
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
/* eslint-disable no-continue */
|
||||||
|
import { subject } from "@casl/ability";
|
||||||
|
import Redis from "ioredis";
|
||||||
|
|
||||||
|
import { KeyStorePrefixes } from "@app/keystore/keystore";
|
||||||
|
import { logger } from "@app/lib/logger";
|
||||||
|
|
||||||
|
import { TEventBusService } from "./event-bus-service";
|
||||||
|
import { createEventStreamClient, EventStreamClient, IEventStreamClientOpts } from "./event-sse-stream";
|
||||||
|
import { EventData, RegisteredEvent, toBusEventName } from "./types";
|
||||||
|
|
||||||
|
const AUTH_REFRESH_INTERVAL = 60 * 1000;
|
||||||
|
const HEART_BEAT_INTERVAL = 15 * 1000;
|
||||||
|
|
||||||
|
export const sseServiceFactory = (bus: TEventBusService, redis: Redis) => {
|
||||||
|
let heartbeatInterval: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
const clients = new Set<EventStreamClient>();
|
||||||
|
|
||||||
|
heartbeatInterval = setInterval(() => {
|
||||||
|
for (const client of clients) {
|
||||||
|
if (client.stream.closed) continue;
|
||||||
|
void client.ping();
|
||||||
|
}
|
||||||
|
}, HEART_BEAT_INTERVAL);
|
||||||
|
|
||||||
|
const refreshInterval = setInterval(() => {
|
||||||
|
for (const client of clients) {
|
||||||
|
if (client.stream.closed) continue;
|
||||||
|
void client.refresh();
|
||||||
|
}
|
||||||
|
}, AUTH_REFRESH_INTERVAL);
|
||||||
|
|
||||||
|
const removeActiveConnection = async (projectId: string, identityId: string, connectionId: string) => {
|
||||||
|
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId);
|
||||||
|
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, connectionId);
|
||||||
|
|
||||||
|
await Promise.all([redis.lrem(set, 0, connectionId), redis.del(key)]);
|
||||||
|
};
|
||||||
|
|
||||||
|
const getActiveConnectionsCount = async (projectId: string, identityId: string) => {
|
||||||
|
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId);
|
||||||
|
const connections = await redis.lrange(set, 0, -1);
|
||||||
|
|
||||||
|
if (connections.length === 0) {
|
||||||
|
return 0; // No active connections
|
||||||
|
}
|
||||||
|
|
||||||
|
const keys = connections.map((c) => KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, c));
|
||||||
|
|
||||||
|
const values = await redis.mget(...keys);
|
||||||
|
|
||||||
|
// eslint-disable-next-line no-plusplus
|
||||||
|
for (let i = 0; i < values.length; i++) {
|
||||||
|
if (values[i] === null) {
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
await removeActiveConnection(projectId, identityId, connections[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return redis.llen(set);
|
||||||
|
};
|
||||||
|
|
||||||
|
const onDisconnect = async (client: EventStreamClient) => {
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
clients.delete(client);
|
||||||
|
await removeActiveConnection(client.auth.projectId, client.auth.actorId, client.id);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, "Error during SSE stream disconnection");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
function filterEventsForClient(client: EventStreamClient, event: EventData, registered: RegisteredEvent[]) {
|
||||||
|
const eventType = toBusEventName(event.data.eventType);
|
||||||
|
const match = registered.find((r) => r.event === eventType);
|
||||||
|
if (!match) return;
|
||||||
|
|
||||||
|
const item = event.data.payload;
|
||||||
|
|
||||||
|
if (Array.isArray(item)) {
|
||||||
|
if (item.length === 0) return;
|
||||||
|
|
||||||
|
const baseSubject = {
|
||||||
|
eventType,
|
||||||
|
environment: undefined as string | undefined,
|
||||||
|
secretPath: undefined as string | undefined
|
||||||
|
};
|
||||||
|
|
||||||
|
const filtered = item.filter((ev) => {
|
||||||
|
baseSubject.secretPath = ev.secretPath ?? "/";
|
||||||
|
baseSubject.environment = ev.environment;
|
||||||
|
|
||||||
|
return client.matcher.can("subscribe", subject(event.type, baseSubject));
|
||||||
|
});
|
||||||
|
|
||||||
|
if (filtered.length === 0) return;
|
||||||
|
|
||||||
|
return client.send({
|
||||||
|
...event,
|
||||||
|
data: {
|
||||||
|
...event.data,
|
||||||
|
payload: filtered
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// For single item
|
||||||
|
const baseSubject = {
|
||||||
|
eventType,
|
||||||
|
secretPath: item.secretPath ?? "/",
|
||||||
|
environment: item.environment
|
||||||
|
};
|
||||||
|
|
||||||
|
if (client.matcher.can("subscribe", subject(event.type, baseSubject))) {
|
||||||
|
client.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const subscribe = async (
|
||||||
|
opts: IEventStreamClientOpts & {
|
||||||
|
onClose?: () => void;
|
||||||
|
}
|
||||||
|
) => {
|
||||||
|
const client = createEventStreamClient(redis, opts);
|
||||||
|
|
||||||
|
// Set up event listener on event bus
|
||||||
|
const unsubscribe = bus.subscribe((event) => {
|
||||||
|
if (event.type !== opts.type) return;
|
||||||
|
filterEventsForClient(client, event, opts.registered);
|
||||||
|
});
|
||||||
|
|
||||||
|
client.stream.on("close", () => {
|
||||||
|
unsubscribe();
|
||||||
|
void onDisconnect(client); // This will never throw
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.open();
|
||||||
|
|
||||||
|
clients.add(client);
|
||||||
|
|
||||||
|
return client;
|
||||||
|
};
|
||||||
|
|
||||||
|
const close = () => {
|
||||||
|
if (heartbeatInterval) {
|
||||||
|
clearInterval(heartbeatInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (refreshInterval) {
|
||||||
|
clearInterval(refreshInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const client of clients) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
clients.clear();
|
||||||
|
};
|
||||||
|
|
||||||
|
return { subscribe, close, getActiveConnectionsCount };
|
||||||
|
};
|
||||||
|
|
||||||
|
export type TServerSentEventsService = ReturnType<typeof sseServiceFactory>;
|
||||||
178
backend/src/ee/services/event/event-sse-stream.ts
Normal file
178
backend/src/ee/services/event/event-sse-stream.ts
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
/* eslint-disable no-underscore-dangle */
|
||||||
|
import { Readable } from "node:stream";
|
||||||
|
|
||||||
|
import { MongoAbility, PureAbility } from "@casl/ability";
|
||||||
|
import { MongoQuery } from "@ucast/mongo2js";
|
||||||
|
import Redis from "ioredis";
|
||||||
|
import { nanoid } from "nanoid";
|
||||||
|
|
||||||
|
import { ProjectType } from "@app/db/schemas";
|
||||||
|
import { ProjectPermissionSet } from "@app/ee/services/permission/project-permission";
|
||||||
|
import { KeyStorePrefixes } from "@app/keystore/keystore";
|
||||||
|
import { conditionsMatcher } from "@app/lib/casl";
|
||||||
|
import { logger } from "@app/lib/logger";
|
||||||
|
|
||||||
|
import { EventData, RegisteredEvent } from "./types";
|
||||||
|
|
||||||
|
export const getServerSentEventsHeaders = () =>
|
||||||
|
({
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
Connection: "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no"
|
||||||
|
}) as const;
|
||||||
|
|
||||||
|
type TAuthInfo = {
|
||||||
|
actorId: string;
|
||||||
|
projectId: string;
|
||||||
|
permission: MongoAbility<ProjectPermissionSet, MongoQuery>;
|
||||||
|
};
|
||||||
|
|
||||||
|
export interface IEventStreamClientOpts {
|
||||||
|
type: ProjectType;
|
||||||
|
registered: RegisteredEvent[];
|
||||||
|
onAuthRefresh: (info: TAuthInfo) => Promise<void> | void;
|
||||||
|
getAuthInfo: () => Promise<TAuthInfo> | TAuthInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface EventMessage {
|
||||||
|
time?: string | number;
|
||||||
|
type: string;
|
||||||
|
data?: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
function serializeSseEvent(chunk: EventMessage): string {
|
||||||
|
let payload = "";
|
||||||
|
|
||||||
|
if (chunk.time) payload += `id: ${chunk.time}\n`;
|
||||||
|
if (chunk.type) payload += `event: ${chunk.type}\n`;
|
||||||
|
if (chunk.data) payload += `data: ${JSON.stringify(chunk)}\n`;
|
||||||
|
|
||||||
|
return `${payload}\n`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type EventStreamClient = {
|
||||||
|
id: string;
|
||||||
|
stream: Readable;
|
||||||
|
open: () => Promise<void>;
|
||||||
|
send: (data: EventMessage | EventData) => void;
|
||||||
|
ping: () => Promise<void>;
|
||||||
|
refresh: () => Promise<void>;
|
||||||
|
close: () => void;
|
||||||
|
get auth(): TAuthInfo;
|
||||||
|
signal: AbortSignal;
|
||||||
|
abort: () => void;
|
||||||
|
matcher: PureAbility;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function createEventStreamClient(redis: Redis, options: IEventStreamClientOpts): EventStreamClient {
|
||||||
|
const rules = options.registered.map((r) => ({
|
||||||
|
subject: options.type,
|
||||||
|
action: "subscribe",
|
||||||
|
conditions: {
|
||||||
|
eventType: r.event,
|
||||||
|
secretPath: r.conditions?.secretPath ?? "/",
|
||||||
|
environment: r.conditions?.environmentSlug
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
const id = `sse-${nanoid()}`;
|
||||||
|
const control = new AbortController();
|
||||||
|
const matcher = new PureAbility(rules, { conditionsMatcher });
|
||||||
|
|
||||||
|
let auth: TAuthInfo | undefined;
|
||||||
|
|
||||||
|
const stream = new Readable({
|
||||||
|
objectMode: true
|
||||||
|
});
|
||||||
|
|
||||||
|
// We will manually push data to the stream
|
||||||
|
stream._read = () => {};
|
||||||
|
|
||||||
|
const send = (data: EventMessage | EventData) => {
|
||||||
|
const chunk = serializeSseEvent(data);
|
||||||
|
if (!stream.push(chunk)) {
|
||||||
|
logger.debug("Backpressure detected: dropped manual event");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
stream.on("error", (error: Error) => stream.destroy(error));
|
||||||
|
|
||||||
|
const open = async () => {
|
||||||
|
auth = await options.getAuthInfo();
|
||||||
|
await options.onAuthRefresh(auth);
|
||||||
|
|
||||||
|
const { actorId, projectId } = auth;
|
||||||
|
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, actorId);
|
||||||
|
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id);
|
||||||
|
|
||||||
|
await Promise.all([redis.rpush(set, id), redis.set(key, "1", "EX", 60)]);
|
||||||
|
};
|
||||||
|
|
||||||
|
const ping = async () => {
|
||||||
|
if (!auth) return; // Avoid race condition if ping is called before open
|
||||||
|
|
||||||
|
const { actorId, projectId } = auth;
|
||||||
|
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id);
|
||||||
|
|
||||||
|
await redis.set(key, "1", "EX", 60);
|
||||||
|
|
||||||
|
stream.push("1");
|
||||||
|
};
|
||||||
|
|
||||||
|
const close = () => {
|
||||||
|
if (stream.closed) return;
|
||||||
|
stream.push(null);
|
||||||
|
stream.destroy();
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Refreshes the connection's auth permissions
|
||||||
|
* Must be called atleast once when connection is opened
|
||||||
|
*/
|
||||||
|
const refresh = async () => {
|
||||||
|
try {
|
||||||
|
auth = await options.getAuthInfo();
|
||||||
|
await options.onAuthRefresh(auth);
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
send({
|
||||||
|
type: "error",
|
||||||
|
data: {
|
||||||
|
...error
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return close();
|
||||||
|
}
|
||||||
|
stream.emit("error", error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const abort = () => {
|
||||||
|
try {
|
||||||
|
control.abort();
|
||||||
|
} catch (error) {
|
||||||
|
logger.debug(error, "Error aborting SSE stream");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
stream,
|
||||||
|
open,
|
||||||
|
send,
|
||||||
|
ping,
|
||||||
|
refresh,
|
||||||
|
close,
|
||||||
|
signal: control.signal,
|
||||||
|
abort,
|
||||||
|
matcher,
|
||||||
|
get auth() {
|
||||||
|
if (!auth) {
|
||||||
|
throw new Error("Auth info not set");
|
||||||
|
}
|
||||||
|
|
||||||
|
return auth;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
125
backend/src/ee/services/event/types.ts
Normal file
125
backend/src/ee/services/event/types.ts
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
import { z } from "zod";
|
||||||
|
|
||||||
|
import { ProjectType } from "@app/db/schemas";
|
||||||
|
import { Event, EventType } from "@app/ee/services/audit-log/audit-log-types";
|
||||||
|
|
||||||
|
export enum TopicName {
|
||||||
|
CoreServers = "infisical::core-servers"
|
||||||
|
}
|
||||||
|
|
||||||
|
export enum BusEventName {
|
||||||
|
CreateSecret = "secret:create",
|
||||||
|
UpdateSecret = "secret:update",
|
||||||
|
DeleteSecret = "secret:delete"
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublisableEventTypes =
|
||||||
|
| EventType.CREATE_SECRET
|
||||||
|
| EventType.CREATE_SECRETS
|
||||||
|
| EventType.DELETE_SECRET
|
||||||
|
| EventType.DELETE_SECRETS
|
||||||
|
| EventType.UPDATE_SECRETS
|
||||||
|
| EventType.UPDATE_SECRET;
|
||||||
|
|
||||||
|
export function toBusEventName(input: EventType) {
|
||||||
|
switch (input) {
|
||||||
|
case EventType.CREATE_SECRET:
|
||||||
|
case EventType.CREATE_SECRETS:
|
||||||
|
return BusEventName.CreateSecret;
|
||||||
|
case EventType.UPDATE_SECRET:
|
||||||
|
case EventType.UPDATE_SECRETS:
|
||||||
|
return BusEventName.UpdateSecret;
|
||||||
|
case EventType.DELETE_SECRET:
|
||||||
|
case EventType.DELETE_SECRETS:
|
||||||
|
return BusEventName.DeleteSecret;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const isBulkEvent = (event: Event): event is Extract<Event, { metadata: { secrets: Array<unknown> } }> => {
|
||||||
|
return event.type.endsWith("-secrets"); // Feels so wrong
|
||||||
|
};
|
||||||
|
|
||||||
|
export const toPublishableEvent = (event: Event) => {
|
||||||
|
const name = toBusEventName(event.type);
|
||||||
|
|
||||||
|
if (!name) return null;
|
||||||
|
|
||||||
|
const e = event as Extract<Event, { type: PublisableEventTypes }>;
|
||||||
|
|
||||||
|
if (isBulkEvent(e)) {
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
isBulk: true,
|
||||||
|
data: {
|
||||||
|
eventType: e.type,
|
||||||
|
payload: e.metadata.secrets.map((s) => ({
|
||||||
|
environment: e.metadata.environment,
|
||||||
|
secretPath: e.metadata.secretPath,
|
||||||
|
...s
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
} as const;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
isBulk: false,
|
||||||
|
data: {
|
||||||
|
eventType: e.type,
|
||||||
|
payload: {
|
||||||
|
...e.metadata,
|
||||||
|
environment: e.metadata.environment
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} as const;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const EventName = z.nativeEnum(BusEventName);
|
||||||
|
|
||||||
|
const EventSecretPayload = z.object({
|
||||||
|
secretPath: z.string().optional(),
|
||||||
|
secretId: z.string(),
|
||||||
|
secretKey: z.string(),
|
||||||
|
environment: z.string()
|
||||||
|
});
|
||||||
|
|
||||||
|
export type EventSecret = z.infer<typeof EventSecretPayload>;
|
||||||
|
|
||||||
|
export const EventSchema = z.object({
|
||||||
|
datacontenttype: z.literal("application/json").optional().default("application/json"),
|
||||||
|
type: z.nativeEnum(ProjectType),
|
||||||
|
source: z.string(),
|
||||||
|
time: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.default(() => new Date().toISOString()),
|
||||||
|
data: z.discriminatedUnion("eventType", [
|
||||||
|
z.object({
|
||||||
|
specversion: z.number().optional().default(1),
|
||||||
|
eventType: z.enum([EventType.CREATE_SECRET, EventType.UPDATE_SECRET, EventType.DELETE_SECRET]),
|
||||||
|
payload: EventSecretPayload
|
||||||
|
}),
|
||||||
|
z.object({
|
||||||
|
specversion: z.number().optional().default(1),
|
||||||
|
eventType: z.enum([EventType.CREATE_SECRETS, EventType.UPDATE_SECRETS, EventType.DELETE_SECRETS]),
|
||||||
|
payload: EventSecretPayload.array()
|
||||||
|
})
|
||||||
|
// Add more event types as needed
|
||||||
|
])
|
||||||
|
});
|
||||||
|
|
||||||
|
export type EventData = z.infer<typeof EventSchema>;
|
||||||
|
|
||||||
|
export const EventRegisterSchema = z.object({
|
||||||
|
event: EventName,
|
||||||
|
conditions: z
|
||||||
|
.object({
|
||||||
|
secretPath: z.string().optional().default("/"),
|
||||||
|
environmentSlug: z.string()
|
||||||
|
})
|
||||||
|
.optional()
|
||||||
|
});
|
||||||
|
|
||||||
|
export type RegisteredEvent = z.infer<typeof EventRegisterSchema>;
|
||||||
@@ -59,7 +59,8 @@ export const getDefaultOnPremFeatures = (): TFeatureSet => ({
|
|||||||
secretScanning: false,
|
secretScanning: false,
|
||||||
enterpriseSecretSyncs: false,
|
enterpriseSecretSyncs: false,
|
||||||
enterpriseAppConnections: false,
|
enterpriseAppConnections: false,
|
||||||
fips: false
|
fips: false,
|
||||||
|
eventSubscriptions: false
|
||||||
});
|
});
|
||||||
|
|
||||||
export const setupLicenseRequestWithStore = (
|
export const setupLicenseRequestWithStore = (
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ export type TFeatureSet = {
|
|||||||
enterpriseSecretSyncs: false;
|
enterpriseSecretSyncs: false;
|
||||||
enterpriseAppConnections: false;
|
enterpriseAppConnections: false;
|
||||||
fips: false;
|
fips: false;
|
||||||
|
eventSubscriptions: false;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type TOrgPlansTableDTO = {
|
export type TOrgPlansTableDTO = {
|
||||||
|
|||||||
@@ -161,7 +161,8 @@ const buildAdminPermissionRules = () => {
|
|||||||
ProjectPermissionSecretActions.ReadValue,
|
ProjectPermissionSecretActions.ReadValue,
|
||||||
ProjectPermissionSecretActions.Create,
|
ProjectPermissionSecretActions.Create,
|
||||||
ProjectPermissionSecretActions.Edit,
|
ProjectPermissionSecretActions.Edit,
|
||||||
ProjectPermissionSecretActions.Delete
|
ProjectPermissionSecretActions.Delete,
|
||||||
|
ProjectPermissionSecretActions.Subscribe
|
||||||
],
|
],
|
||||||
ProjectPermissionSub.Secrets
|
ProjectPermissionSub.Secrets
|
||||||
);
|
);
|
||||||
@@ -265,7 +266,8 @@ const buildMemberPermissionRules = () => {
|
|||||||
ProjectPermissionSecretActions.ReadValue,
|
ProjectPermissionSecretActions.ReadValue,
|
||||||
ProjectPermissionSecretActions.Edit,
|
ProjectPermissionSecretActions.Edit,
|
||||||
ProjectPermissionSecretActions.Create,
|
ProjectPermissionSecretActions.Create,
|
||||||
ProjectPermissionSecretActions.Delete
|
ProjectPermissionSecretActions.Delete,
|
||||||
|
ProjectPermissionSecretActions.Subscribe
|
||||||
],
|
],
|
||||||
ProjectPermissionSub.Secrets
|
ProjectPermissionSub.Secrets
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -36,7 +36,8 @@ export enum ProjectPermissionSecretActions {
|
|||||||
ReadValue = "readValue",
|
ReadValue = "readValue",
|
||||||
Create = "create",
|
Create = "create",
|
||||||
Edit = "edit",
|
Edit = "edit",
|
||||||
Delete = "delete"
|
Delete = "delete",
|
||||||
|
Subscribe = "subscribe"
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum ProjectPermissionCmekActions {
|
export enum ProjectPermissionCmekActions {
|
||||||
@@ -204,6 +205,7 @@ export type SecretSubjectFields = {
|
|||||||
secretPath: string;
|
secretPath: string;
|
||||||
secretName?: string;
|
secretName?: string;
|
||||||
secretTags?: string[];
|
secretTags?: string[];
|
||||||
|
eventType?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type SecretFolderSubjectFields = {
|
export type SecretFolderSubjectFields = {
|
||||||
@@ -483,7 +485,17 @@ const SecretConditionV2Schema = z
|
|||||||
.object({
|
.object({
|
||||||
[PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN]
|
[PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN]
|
||||||
})
|
})
|
||||||
|
.partial(),
|
||||||
|
eventType: z.union([
|
||||||
|
z.string(),
|
||||||
|
z
|
||||||
|
.object({
|
||||||
|
[PermissionConditionOperators.$EQ]: PermissionConditionSchema[PermissionConditionOperators.$EQ],
|
||||||
|
[PermissionConditionOperators.$NEQ]: PermissionConditionSchema[PermissionConditionOperators.$NEQ],
|
||||||
|
[PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN]
|
||||||
|
})
|
||||||
.partial()
|
.partial()
|
||||||
|
])
|
||||||
})
|
})
|
||||||
.partial();
|
.partial();
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,11 @@ export const KeyStorePrefixes = {
|
|||||||
IdentityAccessTokenStatusUpdate: (identityAccessTokenId: string) =>
|
IdentityAccessTokenStatusUpdate: (identityAccessTokenId: string) =>
|
||||||
`identity-access-token-status:${identityAccessTokenId}`,
|
`identity-access-token-status:${identityAccessTokenId}`,
|
||||||
ServiceTokenStatusUpdate: (serviceTokenId: string) => `service-token-status:${serviceTokenId}`,
|
ServiceTokenStatusUpdate: (serviceTokenId: string) => `service-token-status:${serviceTokenId}`,
|
||||||
GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}`
|
GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}`,
|
||||||
|
ActiveSSEConnectionsSet: (projectId: string, identityId: string) =>
|
||||||
|
`sse-connections:${projectId}:${identityId}` as const,
|
||||||
|
ActiveSSEConnections: (projectId: string, identityId: string, connectionId: string) =>
|
||||||
|
`sse-connections:${projectId}:${identityId}:${connectionId}` as const
|
||||||
};
|
};
|
||||||
|
|
||||||
export const KeyStoreTtls = {
|
export const KeyStoreTtls = {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ export type TAuthMode =
|
|||||||
orgId: string;
|
orgId: string;
|
||||||
authMethod: AuthMethod;
|
authMethod: AuthMethod;
|
||||||
isMfaVerified?: boolean;
|
isMfaVerified?: boolean;
|
||||||
|
token: AuthModeJwtTokenPayload;
|
||||||
}
|
}
|
||||||
| {
|
| {
|
||||||
authMode: AuthMode.API_KEY;
|
authMode: AuthMode.API_KEY;
|
||||||
@@ -30,6 +31,7 @@ export type TAuthMode =
|
|||||||
userId: string;
|
userId: string;
|
||||||
user: TUsers;
|
user: TUsers;
|
||||||
orgId: string;
|
orgId: string;
|
||||||
|
token: string;
|
||||||
}
|
}
|
||||||
| {
|
| {
|
||||||
authMode: AuthMode.SERVICE_TOKEN;
|
authMode: AuthMode.SERVICE_TOKEN;
|
||||||
@@ -38,6 +40,7 @@ export type TAuthMode =
|
|||||||
serviceTokenId: string;
|
serviceTokenId: string;
|
||||||
orgId: string;
|
orgId: string;
|
||||||
authMethod: null;
|
authMethod: null;
|
||||||
|
token: string;
|
||||||
}
|
}
|
||||||
| {
|
| {
|
||||||
authMode: AuthMode.IDENTITY_ACCESS_TOKEN;
|
authMode: AuthMode.IDENTITY_ACCESS_TOKEN;
|
||||||
@@ -47,6 +50,7 @@ export type TAuthMode =
|
|||||||
orgId: string;
|
orgId: string;
|
||||||
authMethod: null;
|
authMethod: null;
|
||||||
isInstanceAdmin?: boolean;
|
isInstanceAdmin?: boolean;
|
||||||
|
token: TIdentityAccessTokenJwtPayload;
|
||||||
}
|
}
|
||||||
| {
|
| {
|
||||||
authMode: AuthMode.SCIM_TOKEN;
|
authMode: AuthMode.SCIM_TOKEN;
|
||||||
@@ -56,7 +60,7 @@ export type TAuthMode =
|
|||||||
authMethod: null;
|
authMethod: null;
|
||||||
};
|
};
|
||||||
|
|
||||||
const extractAuth = async (req: FastifyRequest, jwtSecret: string) => {
|
export const extractAuth = async (req: FastifyRequest, jwtSecret: string) => {
|
||||||
const apiKey = req.headers?.["x-api-key"];
|
const apiKey = req.headers?.["x-api-key"];
|
||||||
if (apiKey) {
|
if (apiKey) {
|
||||||
return { authMode: AuthMode.API_KEY, token: apiKey, actor: ActorType.USER } as const;
|
return { authMode: AuthMode.API_KEY, token: apiKey, actor: ActorType.USER } as const;
|
||||||
@@ -133,7 +137,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
|
|||||||
actor,
|
actor,
|
||||||
orgId: orgId as string,
|
orgId: orgId as string,
|
||||||
authMethod: token.authMethod,
|
authMethod: token.authMethod,
|
||||||
isMfaVerified: token.isMfaVerified
|
isMfaVerified: token.isMfaVerified,
|
||||||
|
token
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -148,7 +153,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
|
|||||||
identityId: identity.identityId,
|
identityId: identity.identityId,
|
||||||
identityName: identity.name,
|
identityName: identity.name,
|
||||||
authMethod: null,
|
authMethod: null,
|
||||||
isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId)
|
isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId),
|
||||||
|
token
|
||||||
};
|
};
|
||||||
if (token?.identityAuth?.oidc) {
|
if (token?.identityAuth?.oidc) {
|
||||||
requestContext.set("identityAuthInfo", {
|
requestContext.set("identityAuthInfo", {
|
||||||
@@ -179,7 +185,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
|
|||||||
serviceToken,
|
serviceToken,
|
||||||
serviceTokenId: serviceToken.id,
|
serviceTokenId: serviceToken.id,
|
||||||
actor,
|
actor,
|
||||||
authMethod: null
|
authMethod: null,
|
||||||
|
token
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -191,7 +198,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
|
|||||||
actor,
|
actor,
|
||||||
user,
|
user,
|
||||||
orgId: "API_KEY", // We set the orgId to an arbitrary value, since we can't link an API key to a specific org. We have to deprecate API keys soon!
|
orgId: "API_KEY", // We set the orgId to an arbitrary value, since we can't link an API key to a specific org. We have to deprecate API keys soon!
|
||||||
authMethod: null
|
authMethod: null,
|
||||||
|
token: token as string
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ import { buildDynamicSecretProviders } from "@app/ee/services/dynamic-secret/pro
|
|||||||
import { dynamicSecretLeaseDALFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-dal";
|
import { dynamicSecretLeaseDALFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-dal";
|
||||||
import { dynamicSecretLeaseQueueServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-queue";
|
import { dynamicSecretLeaseQueueServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-queue";
|
||||||
import { dynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-service";
|
import { dynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-service";
|
||||||
|
import { eventBusFactory } from "@app/ee/services/event/event-bus-service";
|
||||||
|
import { sseServiceFactory } from "@app/ee/services/event/event-sse-service";
|
||||||
import { externalKmsDALFactory } from "@app/ee/services/external-kms/external-kms-dal";
|
import { externalKmsDALFactory } from "@app/ee/services/external-kms/external-kms-dal";
|
||||||
import { externalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
|
import { externalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
|
||||||
import { gatewayDALFactory } from "@app/ee/services/gateway/gateway-dal";
|
import { gatewayDALFactory } from "@app/ee/services/gateway/gateway-dal";
|
||||||
@@ -495,6 +497,9 @@ export const registerRoutes = async (
|
|||||||
const projectMicrosoftTeamsConfigDAL = projectMicrosoftTeamsConfigDALFactory(db);
|
const projectMicrosoftTeamsConfigDAL = projectMicrosoftTeamsConfigDALFactory(db);
|
||||||
const secretScanningV2DAL = secretScanningV2DALFactory(db);
|
const secretScanningV2DAL = secretScanningV2DALFactory(db);
|
||||||
|
|
||||||
|
const eventBusService = eventBusFactory(server.redis);
|
||||||
|
const sseService = sseServiceFactory(eventBusService, server.redis);
|
||||||
|
|
||||||
const permissionService = permissionServiceFactory({
|
const permissionService = permissionServiceFactory({
|
||||||
permissionDAL,
|
permissionDAL,
|
||||||
orgRoleDAL,
|
orgRoleDAL,
|
||||||
@@ -552,7 +557,8 @@ export const registerRoutes = async (
|
|||||||
queueService,
|
queueService,
|
||||||
projectDAL,
|
projectDAL,
|
||||||
licenseService,
|
licenseService,
|
||||||
auditLogStreamDAL
|
auditLogStreamDAL,
|
||||||
|
eventBusService
|
||||||
});
|
});
|
||||||
|
|
||||||
const auditLogService = auditLogServiceFactory({ auditLogDAL, permissionService, auditLogQueue });
|
const auditLogService = auditLogServiceFactory({ auditLogDAL, permissionService, auditLogQueue });
|
||||||
@@ -1968,6 +1974,7 @@ export const registerRoutes = async (
|
|||||||
await kmsService.startService();
|
await kmsService.startService();
|
||||||
await microsoftTeamsService.start();
|
await microsoftTeamsService.start();
|
||||||
await dynamicSecretQueueService.init();
|
await dynamicSecretQueueService.init();
|
||||||
|
await eventBusService.init();
|
||||||
|
|
||||||
// inject all services
|
// inject all services
|
||||||
server.decorate<FastifyZodProvider["services"]>("services", {
|
server.decorate<FastifyZodProvider["services"]>("services", {
|
||||||
@@ -2074,7 +2081,9 @@ export const registerRoutes = async (
|
|||||||
githubOrgSync: githubOrgSyncConfigService,
|
githubOrgSync: githubOrgSyncConfigService,
|
||||||
folderCommit: folderCommitService,
|
folderCommit: folderCommitService,
|
||||||
secretScanningV2: secretScanningV2Service,
|
secretScanningV2: secretScanningV2Service,
|
||||||
reminder: reminderService
|
reminder: reminderService,
|
||||||
|
bus: eventBusService,
|
||||||
|
sse: sseService
|
||||||
});
|
});
|
||||||
|
|
||||||
const cronJobs: CronJob[] = [];
|
const cronJobs: CronJob[] = [];
|
||||||
@@ -2190,5 +2199,7 @@ export const registerRoutes = async (
|
|||||||
server.addHook("onClose", async () => {
|
server.addHook("onClose", async () => {
|
||||||
cronJobs.forEach((job) => job.stop());
|
cronJobs.forEach((job) => job.stop());
|
||||||
await telemetryService.flushAll();
|
await telemetryService.flushAll();
|
||||||
|
await eventBusService.close();
|
||||||
|
sseService.close();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
118
backend/src/server/routes/v1/event-router.ts
Normal file
118
backend/src/server/routes/v1/event-router.ts
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
/* eslint-disable @typescript-eslint/no-floating-promises */
|
||||||
|
import { subject } from "@casl/ability";
|
||||||
|
import { pipeline } from "stream/promises";
|
||||||
|
import { z } from "zod";
|
||||||
|
|
||||||
|
import { ActionProjectType, ProjectType } from "@app/db/schemas";
|
||||||
|
import { getServerSentEventsHeaders } from "@app/ee/services/event/event-sse-stream";
|
||||||
|
import { EventRegisterSchema } from "@app/ee/services/event/types";
|
||||||
|
import { ProjectPermissionSecretActions, ProjectPermissionSub } from "@app/ee/services/permission/project-permission";
|
||||||
|
import { BadRequestError, ForbiddenRequestError, RateLimitError } from "@app/lib/errors";
|
||||||
|
import { readLimit } from "@app/server/config/rateLimiter";
|
||||||
|
import { verifyAuth } from "@app/server/plugins/auth/verify-auth";
|
||||||
|
import { AuthMode } from "@app/services/auth/auth-type";
|
||||||
|
|
||||||
|
export const registerEventRouter = async (server: FastifyZodProvider) => {
|
||||||
|
server.route({
|
||||||
|
method: "POST",
|
||||||
|
url: "/subscribe/project-events",
|
||||||
|
config: {
|
||||||
|
rateLimit: readLimit
|
||||||
|
},
|
||||||
|
schema: {
|
||||||
|
body: z.object({
|
||||||
|
projectId: z.string().trim(),
|
||||||
|
register: z.array(EventRegisterSchema).max(10)
|
||||||
|
})
|
||||||
|
},
|
||||||
|
onRequest: verifyAuth([AuthMode.JWT, AuthMode.IDENTITY_ACCESS_TOKEN]),
|
||||||
|
handler: async (req, reply) => {
|
||||||
|
try {
|
||||||
|
const { sse, permission, identityAccessToken, authToken, license } = req.server.services;
|
||||||
|
|
||||||
|
const plan = await license.getPlan(req.auth.orgId);
|
||||||
|
|
||||||
|
if (!plan.eventSubscriptions) {
|
||||||
|
throw new BadRequestError({
|
||||||
|
message:
|
||||||
|
"Failed to use event subscriptions due to plan restriction. Upgrade plan to access enterprise event subscriptions."
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const count = await sse.getActiveConnectionsCount(req.body.projectId, req.permission.id);
|
||||||
|
|
||||||
|
if (count >= 5) {
|
||||||
|
throw new RateLimitError({
|
||||||
|
message: `Too many active connections for project ${req.body.projectId}. Please close some connections before opening a new one.`
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const client = await sse.subscribe({
|
||||||
|
type: ProjectType.SecretManager,
|
||||||
|
registered: req.body.register,
|
||||||
|
async getAuthInfo() {
|
||||||
|
const ability = await permission.getProjectPermission({
|
||||||
|
actor: req.auth.actor,
|
||||||
|
projectId: req.body.projectId,
|
||||||
|
actionProjectType: ActionProjectType.Any,
|
||||||
|
actorAuthMethod: req.auth.authMethod,
|
||||||
|
actorId: req.permission.id,
|
||||||
|
actorOrgId: req.permission.orgId
|
||||||
|
});
|
||||||
|
|
||||||
|
return { permission: ability.permission, actorId: req.permission.id, projectId: req.body.projectId };
|
||||||
|
},
|
||||||
|
async onAuthRefresh(info) {
|
||||||
|
switch (req.auth.authMode) {
|
||||||
|
case AuthMode.JWT:
|
||||||
|
await authToken.fnValidateJwtIdentity(req.auth.token);
|
||||||
|
break;
|
||||||
|
case AuthMode.IDENTITY_ACCESS_TOKEN:
|
||||||
|
await identityAccessToken.fnValidateIdentityAccessToken(req.auth.token, req.realIp);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error("Unsupported authentication method");
|
||||||
|
}
|
||||||
|
|
||||||
|
req.body.register.forEach((r) => {
|
||||||
|
const allowed = info.permission.can(
|
||||||
|
ProjectPermissionSecretActions.Subscribe,
|
||||||
|
subject(ProjectPermissionSub.Secrets, {
|
||||||
|
environment: r.conditions?.environmentSlug ?? "",
|
||||||
|
secretPath: r.conditions?.secretPath ?? "/",
|
||||||
|
eventType: r.event
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!allowed) {
|
||||||
|
throw new ForbiddenRequestError({
|
||||||
|
name: "PermissionDenied",
|
||||||
|
message: `You are not allowed to subscribe on secrets`,
|
||||||
|
details: {
|
||||||
|
event: r.event,
|
||||||
|
environmentSlug: r.conditions?.environmentSlug,
|
||||||
|
secretPath: r.conditions?.secretPath ?? "/"
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Switches to manual response and enable SSE streaming
|
||||||
|
reply.hijack();
|
||||||
|
reply.raw.writeHead(200, getServerSentEventsHeaders()).flushHeaders();
|
||||||
|
reply.raw.on("close", client.abort);
|
||||||
|
|
||||||
|
await pipeline(client.stream, reply.raw, { signal: client.signal });
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof Error && error.name === "AbortError") {
|
||||||
|
// If the stream is aborted, we don't need to do anything
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
@@ -13,6 +13,7 @@ import { registerCaRouter } from "./certificate-authority-router";
|
|||||||
import { CERTIFICATE_AUTHORITY_REGISTER_ROUTER_MAP } from "./certificate-authority-routers";
|
import { CERTIFICATE_AUTHORITY_REGISTER_ROUTER_MAP } from "./certificate-authority-routers";
|
||||||
import { registerCertRouter } from "./certificate-router";
|
import { registerCertRouter } from "./certificate-router";
|
||||||
import { registerCertificateTemplateRouter } from "./certificate-template-router";
|
import { registerCertificateTemplateRouter } from "./certificate-template-router";
|
||||||
|
import { registerEventRouter } from "./event-router";
|
||||||
import { registerExternalGroupOrgRoleMappingRouter } from "./external-group-org-role-mapping-router";
|
import { registerExternalGroupOrgRoleMappingRouter } from "./external-group-org-role-mapping-router";
|
||||||
import { registerIdentityAccessTokenRouter } from "./identity-access-token-router";
|
import { registerIdentityAccessTokenRouter } from "./identity-access-token-router";
|
||||||
import { registerIdentityAliCloudAuthRouter } from "./identity-alicloud-auth-router";
|
import { registerIdentityAliCloudAuthRouter } from "./identity-alicloud-auth-router";
|
||||||
@@ -183,4 +184,6 @@ export const registerV1Routes = async (server: FastifyZodProvider) => {
|
|||||||
},
|
},
|
||||||
{ prefix: "/reminders" }
|
{ prefix: "/reminders" }
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await server.register(registerEventRouter, { prefix: "/events" });
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -21,7 +21,8 @@ export enum ProjectPermissionSecretActions {
|
|||||||
ReadValue = "readValue",
|
ReadValue = "readValue",
|
||||||
Create = "create",
|
Create = "create",
|
||||||
Edit = "edit",
|
Edit = "edit",
|
||||||
Delete = "delete"
|
Delete = "delete",
|
||||||
|
Subscribe = "subscribe"
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum ProjectPermissionDynamicSecretActions {
|
export enum ProjectPermissionDynamicSecretActions {
|
||||||
|
|||||||
@@ -39,6 +39,14 @@ export const renderOperatorSelectItems = (type: string) => {
|
|||||||
<SelectItem value={PermissionConditionOperators.$IN}>In</SelectItem>
|
<SelectItem value={PermissionConditionOperators.$IN}>In</SelectItem>
|
||||||
</>
|
</>
|
||||||
);
|
);
|
||||||
|
case "eventType":
|
||||||
|
return (
|
||||||
|
<>
|
||||||
|
<SelectItem value={PermissionConditionOperators.$EQ}>Equal</SelectItem>
|
||||||
|
<SelectItem value={PermissionConditionOperators.$NEQ}>Not Equal</SelectItem>
|
||||||
|
<SelectItem value={PermissionConditionOperators.$IN}>In</SelectItem>
|
||||||
|
</>
|
||||||
|
);
|
||||||
default:
|
default:
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
|
|||||||
@@ -54,7 +54,8 @@ const SecretPolicyActionSchema = z.object({
|
|||||||
[ProjectPermissionSecretActions.ReadValue]: z.boolean().optional(),
|
[ProjectPermissionSecretActions.ReadValue]: z.boolean().optional(),
|
||||||
[ProjectPermissionSecretActions.Edit]: z.boolean().optional(),
|
[ProjectPermissionSecretActions.Edit]: z.boolean().optional(),
|
||||||
[ProjectPermissionSecretActions.Delete]: z.boolean().optional(),
|
[ProjectPermissionSecretActions.Delete]: z.boolean().optional(),
|
||||||
[ProjectPermissionSecretActions.Create]: z.boolean().optional()
|
[ProjectPermissionSecretActions.Create]: z.boolean().optional(),
|
||||||
|
[ProjectPermissionSecretActions.Subscribe]: z.boolean().optional()
|
||||||
});
|
});
|
||||||
|
|
||||||
const ApprovalPolicyActionSchema = z.object({
|
const ApprovalPolicyActionSchema = z.object({
|
||||||
@@ -588,6 +589,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => {
|
|||||||
const canEdit = action.includes(ProjectPermissionSecretActions.Edit);
|
const canEdit = action.includes(ProjectPermissionSecretActions.Edit);
|
||||||
const canDelete = action.includes(ProjectPermissionSecretActions.Delete);
|
const canDelete = action.includes(ProjectPermissionSecretActions.Delete);
|
||||||
const canCreate = action.includes(ProjectPermissionSecretActions.Create);
|
const canCreate = action.includes(ProjectPermissionSecretActions.Create);
|
||||||
|
const canSubscribe = action.includes(ProjectPermissionSecretActions.Subscribe);
|
||||||
|
|
||||||
// from above statement we are sure it won't be undefined
|
// from above statement we are sure it won't be undefined
|
||||||
formVal[subject]!.push({
|
formVal[subject]!.push({
|
||||||
@@ -597,6 +599,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => {
|
|||||||
create: canCreate,
|
create: canCreate,
|
||||||
edit: canEdit,
|
edit: canEdit,
|
||||||
delete: canDelete,
|
delete: canDelete,
|
||||||
|
subscribe: canSubscribe,
|
||||||
conditions: conditions ? convertCaslConditionToFormOperator(conditions) : [],
|
conditions: conditions ? convertCaslConditionToFormOperator(conditions) : [],
|
||||||
inverted
|
inverted
|
||||||
});
|
});
|
||||||
@@ -1111,7 +1114,8 @@ export const PROJECT_PERMISSION_OBJECT: TProjectPermissionObject = {
|
|||||||
{ label: "Read Value", value: ProjectPermissionSecretActions.ReadValue },
|
{ label: "Read Value", value: ProjectPermissionSecretActions.ReadValue },
|
||||||
{ label: "Modify", value: ProjectPermissionSecretActions.Edit },
|
{ label: "Modify", value: ProjectPermissionSecretActions.Edit },
|
||||||
{ label: "Remove", value: ProjectPermissionSecretActions.Delete },
|
{ label: "Remove", value: ProjectPermissionSecretActions.Delete },
|
||||||
{ label: "Create", value: ProjectPermissionSecretActions.Create }
|
{ label: "Create", value: ProjectPermissionSecretActions.Create },
|
||||||
|
{ label: "Subscribe", value: ProjectPermissionSecretActions.Subscribe }
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
[ProjectPermissionSub.SecretFolders]: {
|
[ProjectPermissionSub.SecretFolders]: {
|
||||||
|
|||||||
@@ -17,7 +17,8 @@ export const SecretPermissionConditions = ({ position = 0, isDisabled }: Props)
|
|||||||
{ value: "environment", label: "Environment Slug" },
|
{ value: "environment", label: "Environment Slug" },
|
||||||
{ value: "secretPath", label: "Secret Path" },
|
{ value: "secretPath", label: "Secret Path" },
|
||||||
{ value: "secretName", label: "Secret Name" },
|
{ value: "secretName", label: "Secret Name" },
|
||||||
{ value: "secretTags", label: "Secret Tags" }
|
{ value: "secretTags", label: "Secret Tags" },
|
||||||
|
{ value: "eventType", label: "Event Type" }
|
||||||
]}
|
]}
|
||||||
/>
|
/>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,3 +1,7 @@
|
|||||||
|
upstream api {
|
||||||
|
server backend:4000;
|
||||||
|
}
|
||||||
|
|
||||||
server {
|
server {
|
||||||
listen 80;
|
listen 80;
|
||||||
|
|
||||||
@@ -11,7 +15,7 @@ server {
|
|||||||
proxy_set_header Host $http_host;
|
proxy_set_header Host $http_host;
|
||||||
proxy_set_header X-NginX-Proxy true;
|
proxy_set_header X-NginX-Proxy true;
|
||||||
|
|
||||||
proxy_pass http://backend:4000;
|
proxy_pass http://api;
|
||||||
proxy_redirect off;
|
proxy_redirect off;
|
||||||
|
|
||||||
proxy_cookie_path / "/; SameSite=strict";
|
proxy_cookie_path / "/; SameSite=strict";
|
||||||
@@ -24,7 +28,7 @@ server {
|
|||||||
proxy_set_header Host $http_host;
|
proxy_set_header Host $http_host;
|
||||||
proxy_set_header X-NginX-Proxy true;
|
proxy_set_header X-NginX-Proxy true;
|
||||||
|
|
||||||
proxy_pass http://backend:4000;
|
proxy_pass http://api;
|
||||||
proxy_redirect off;
|
proxy_redirect off;
|
||||||
|
|
||||||
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
|
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
|
||||||
@@ -39,7 +43,7 @@ server {
|
|||||||
proxy_set_header Host $http_host;
|
proxy_set_header Host $http_host;
|
||||||
proxy_set_header X-NginX-Proxy true;
|
proxy_set_header X-NginX-Proxy true;
|
||||||
|
|
||||||
proxy_pass http://backend:4000;
|
proxy_pass http://api;
|
||||||
proxy_redirect off;
|
proxy_redirect off;
|
||||||
|
|
||||||
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
|
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
|
||||||
@@ -57,7 +61,7 @@ server {
|
|||||||
# proxy_set_header X-SSL-Client-Cert $http_x_ssl_client_cert;
|
# proxy_set_header X-SSL-Client-Cert $http_x_ssl_client_cert;
|
||||||
# proxy_pass_request_headers on;
|
# proxy_pass_request_headers on;
|
||||||
|
|
||||||
proxy_pass http://backend:4000;
|
proxy_pass http://api;
|
||||||
proxy_redirect off;
|
proxy_redirect off;
|
||||||
|
|
||||||
# proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
|
# proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
|
||||||
|
|||||||
89
sink/index.html
Normal file
89
sink/index.html
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8" />
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||||
|
<title>Document</title>
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<style>
|
||||||
|
body {
|
||||||
|
background-color: black;
|
||||||
|
color: lime;
|
||||||
|
font-family: "Hack";
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
<body>
|
||||||
|
<h1>EventSource Example</h1>
|
||||||
|
|
||||||
|
<div>
|
||||||
|
<p>This page listens for server-sent events from the backend.</p>
|
||||||
|
<p>Open your browser's console to see the received events.</p>
|
||||||
|
|
||||||
|
<ul id="event-list">
|
||||||
|
<!-- Events will be dynamically added here -->
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
const list = document.getElementById("event-list");
|
||||||
|
|
||||||
|
async function stream() {
|
||||||
|
const response = await fetch("http://localhost:8080/api/v1/events/subscribe", {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
Accept: "text/event-stream",
|
||||||
|
Connection: "keep-alive",
|
||||||
|
},
|
||||||
|
body: JSON.stringify({
|
||||||
|
projectId: "project-id-123",
|
||||||
|
events: [
|
||||||
|
{
|
||||||
|
type: "secret.created",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
const reader = response.body.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
|
||||||
|
let buffer = "";
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
const lines = buffer.split("\n");
|
||||||
|
buffer = lines.pop() ?? "";
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
console.log("Received line:", line);
|
||||||
|
if (line.startsWith("data:")) {
|
||||||
|
const data = line.slice(5).trim();
|
||||||
|
const listItem = document.createElement("li");
|
||||||
|
listItem.textContent = `Event(${event.id}): ${data.event}, Data: ${JSON.stringify(data.data)}`;
|
||||||
|
list.appendChild(listItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream().catch((error) => {
|
||||||
|
console.error("Error in streaming:", error);
|
||||||
|
});
|
||||||
|
|
||||||
|
// source.onmessage = function (event) {
|
||||||
|
// const data = JSON.parse(event.data);
|
||||||
|
// console.log("Received data:", data);
|
||||||
|
// const listItem = document.createElement("li");
|
||||||
|
// listItem.textContent = `Event(${event.id}): ${data.event}, Data: ${JSON.stringify(data.data)}`;
|
||||||
|
// list.appendChild(listItem);
|
||||||
|
// // You can update the UI or perform actions based on the received data
|
||||||
|
// };
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
Reference in New Issue
Block a user