Feat: Add metrics to postman (#871)

* feat: add metrics to postman

* fix: issue with get message to claim query

* fix: import issues

* fix: update unit tests and fix minor issues

* fix: refator metrics

* fix: update typeorm query

* fix: update postman docker config
This commit is contained in:
Victorien Gauch
2025-04-22 11:44:53 +02:00
committed by GitHub
parent 9a16c5e152
commit b2990e9a3c
22 changed files with 1263 additions and 124 deletions

View File

@@ -251,6 +251,8 @@ services:
profiles: [ "l2", "debug" ]
platform: linux/amd64
restart: on-failure
ports:
- "9090:3000"
depends_on:
sequencer:
condition: service_healthy

View File

@@ -39,6 +39,7 @@ POSTGRES_PASSWORD=postgres
POSTGRES_DB=postman_db
DB_CLEANER_ENABLED=false
ENABLE_LINEA_ESTIMATE_GAS=false
API_PORT=3000
L1_L2_ENABLE_POSTMAN_SPONSORING=true
L2_L1_ENABLE_POSTMAN_SPONSORING=false
MAX_POSTMAN_SPONSOR_GAS_LIMIT=250000
MAX_POSTMAN_SPONSOR_GAS_LIMIT=250000

635
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -42,6 +42,7 @@ DB_CLEANER_ENABLED=false
DB_CLEANING_INTERVAL=10000
DB_DAYS_BEFORE_NOW_TO_DELETE=1
ENABLE_LINEA_ESTIMATE_GAS=false
API_PORT=3000
L1_L2_ENABLE_POSTMAN_SPONSORING=true
L2_L1_ENABLE_POSTMAN_SPONSORING=false
MAX_POSTMAN_SPONSOR_GAS_LIMIT=250000

View File

@@ -11,6 +11,7 @@ module.exports = {
"src/clients/blockchain/typechain",
"src/application/postman/persistence/migrations/",
"src/application/postman/persistence/repositories/",
"src/application/postman/persistence/subscribers/",
"src/index.ts",
"src/utils/WinstonLogger.ts",
],
@@ -18,6 +19,7 @@ module.exports = {
"src/clients/blockchain/typechain",
"src/application/postman/persistence/migrations/",
"src/application/postman/persistence/repositories/",
"src/application/postman/persistence/subscribers/",
"src/index.ts",
"src/utils/WinstonLogger.ts",
"src/utils/testing/",

View File

@@ -24,14 +24,17 @@
"class-validator": "0.14.1",
"dotenv": "16.4.5",
"ethers": "6.13.4",
"express": "5.1.0",
"filtrex": "3.1.0",
"pg": "8.13.1",
"prom-client": "15.1.3",
"typeorm": "0.3.20",
"typeorm-naming-strategies": "4.1.0",
"winston": "3.17.0"
},
"devDependencies": {
"@jest/globals": "29.7.0",
"@types/express": "5.0.1",
"@types/jest": "29.5.14",
"@types/yargs": "17.0.33",
"jest": "29.7.0",

View File

@@ -153,8 +153,11 @@ async function main() {
? parseInt(process.env.DB_DAYS_BEFORE_NOW_TO_DELETE)
: undefined,
},
apiOptions: {
port: process.env.API_PORT ? parseInt(process.env.API_PORT) : undefined,
},
});
await client.connectDatabase();
await client.connectServices();
client.startAllServices();
}

View File

@@ -0,0 +1,65 @@
import express, { Express, Request, Response } from "express";
import { IMetricsService } from "../../../core/metrics/IMetricsService";
import { ILogger } from "../../../core/utils/logging/ILogger";
type ApiConfig = {
port: number;
};
export class Api {
private readonly app: Express;
private server?: ReturnType<Express["listen"]>;
constructor(
private readonly config: ApiConfig,
private readonly metricsService: IMetricsService,
private readonly logger: ILogger,
) {
this.app = express();
this.setupMiddleware();
this.setupRoutes();
}
private setupMiddleware(): void {
this.app.use(express.json());
}
private setupRoutes(): void {
this.app.get("/metrics", this.handleMetrics.bind(this));
}
private async handleMetrics(_req: Request, res: Response): Promise<void> {
try {
const registry = this.metricsService.getRegistry();
res.set("Content-Type", registry.contentType);
res.end(await registry.metrics());
} catch (error) {
res.status(500).json({ error: "Failed to collect metrics" });
}
}
public async start(): Promise<void> {
this.server = this.app.listen(this.config.port);
await new Promise<void>((resolve) => {
this.server?.on("listening", () => {
this.logger.info(`Listening on port ${this.config.port}`);
resolve();
});
});
}
public async stop(): Promise<void> {
if (!this.server) return;
await new Promise<void>((resolve, reject) => {
this.server?.close((err) => {
if (err) return reject(err);
this.logger.info(`Closing API server on port ${this.config.port}`);
this.server = undefined;
resolve();
});
});
}
}

View File

@@ -0,0 +1,47 @@
import { Registry } from "prom-client";
import { mock } from "jest-mock-extended";
import { Api } from "../Api";
import { IMetricsService } from "../../../../core/metrics/IMetricsService";
import { ILogger } from "../../../../core/utils/logging/ILogger";
describe("Api", () => {
let api: Api;
const mockConfig = { port: 3000 };
const mockMetricService = mock<IMetricsService>();
const mockLogger = mock<ILogger>();
beforeEach(async () => {
mockMetricService.getRegistry.mockReturnValue({
contentType: "text/plain; version=0.0.4; charset=utf-8",
metrics: async () => "mocked metrics",
} as Registry);
api = new Api(mockConfig, mockMetricService, mockLogger);
});
afterEach(async () => {
await api.stop();
});
it("should initialize the API", () => {
expect(api).toBeDefined();
});
it("should return metrics from the metric service", async () => {
await api.start();
const registry = api["metricsService"].getRegistry();
expect(registry.contentType).toBe("text/plain; version=0.0.4; charset=utf-8");
expect(await registry.metrics()).toBe("mocked metrics");
});
it("should start the server", async () => {
await api.start();
expect(api["server"]).toBeDefined();
});
it("should stop the server", async () => {
await api.start();
await api.stop();
expect(api["server"]).toBeUndefined();
});
});

View File

@@ -0,0 +1,71 @@
import { EntityManager } from "typeorm";
import { Direction } from "@consensys/linea-sdk";
import { MetricsService } from "./MetricsService";
import { MessageEntity } from "../../persistence/entities/Message.entity";
import { MessageStatus } from "../../../../core/enums";
import { LineaPostmanMetrics } from "../../../../core/metrics/IMetricsService";
export class MessageMetricsService extends MetricsService {
constructor(private readonly entityManager: EntityManager) {
super();
this.createGauge(LineaPostmanMetrics.Messages, "Current number of messages by status and direction", [
"status",
"direction",
]);
}
public async initialize(): Promise<void> {
const fullResult = await this.getMessagesCountFromDatabase();
this.initializeGaugeValues(fullResult);
}
private async getMessagesCountFromDatabase(): Promise<
{ status: MessageStatus; direction: Direction; count: number }[]
> {
const totalNumberOfMessagesByStatusAndDirection = await this.entityManager
.createQueryBuilder(MessageEntity, "message")
.select("message.status", "status")
.addSelect("message.direction", "direction")
.addSelect("COUNT(message.id)", "count")
.groupBy("message.status")
.addGroupBy("message.direction")
.getRawMany();
// MessageStatus => MessageDirection => Count
const resultMap = new Map<string, Map<string, number>>();
totalNumberOfMessagesByStatusAndDirection.forEach((r) => {
if (!resultMap.has(r.status)) {
resultMap.set(r.status, new Map());
}
resultMap.get(r.status)!.set(r.direction, Number(r.count));
});
const results: { status: MessageStatus; direction: Direction; count: number }[] = [];
for (const status of Object.values(MessageStatus)) {
for (const direction of Object.values(Direction)) {
results.push({
status,
direction,
count: resultMap.get(status)?.get(direction) || 0,
});
}
}
return results;
}
private initializeGaugeValues(fullResult: { status: MessageStatus; direction: Direction; count: number }[]): void {
for (const { status, count, direction } of fullResult) {
this.incrementGauge(
LineaPostmanMetrics.Messages,
{
status,
direction,
},
count,
);
}
}
}

View File

@@ -0,0 +1,152 @@
import { Counter, Gauge, MetricObjectWithValues, MetricValue, Registry } from "prom-client";
import { IMetricsService, LineaPostmanMetrics } from "../../../../core/metrics/IMetricsService";
/**
* MetricsService class that implements the IMetricsService interface.
* This class provides methods to create and manage Prometheus metrics.
*/
export abstract class MetricsService implements IMetricsService {
private readonly registry: Registry;
private readonly counters: Map<LineaPostmanMetrics, Counter<string>>;
private readonly gauges: Map<LineaPostmanMetrics, Gauge<string>>;
constructor() {
this.registry = new Registry();
this.registry.setDefaultLabels({ app: "postman" });
this.counters = new Map();
this.gauges = new Map();
}
/**
* Returns the registry
* @returns {Registry} The registry instance
*/
public getRegistry(): Registry {
return this.registry;
}
/**
* Creates counter metric
*/
public createCounter(name: LineaPostmanMetrics, help: string, labelNames: string[] = []): Counter<string> {
if (!this.counters.has(name)) {
this.counters.set(
name,
new Counter({
name,
help,
labelNames,
registers: [this.registry],
}),
);
}
return this.counters.get(name) as Counter<string>;
}
/**
* Get counter metric value
* @param name - Name of the metric
* @param labels - Labels for the metric
* @returns Value of the counter metric
*/
public async getCounterValue(name: LineaPostmanMetrics, labels: Record<string, string>): Promise<number | undefined> {
const counter = this.counters.get(name);
if (counter === undefined) {
return undefined;
}
const metricData = await counter.get();
const metricValueWithMatchingLabels = this.findMetricValueWithExactMatchingLabels(metricData, labels);
return metricValueWithMatchingLabels?.value;
}
/**
* Creates gauge metric
* @param name - Name of the metric
* @param help - Help text for the metric
* @param labelNames - Array of label names for the metric
* @returns Gauge metric
*/
public createGauge(name: LineaPostmanMetrics, help: string, labelNames: string[] = []): Gauge<string> {
if (!this.gauges.has(name)) {
this.gauges.set(
name,
new Gauge({
name,
help,
labelNames,
registers: [this.registry],
}),
);
}
return this.gauges.get(name) as Gauge<string>;
}
/**
* Get gauge metric value
* @param name - Name of the metric
* @param labels - Labels for the metric
* @returns Value of the gauge metric
*/
public async getGaugeValue(name: LineaPostmanMetrics, labels: Record<string, string>): Promise<number | undefined> {
const gauge = this.gauges.get(name);
if (gauge === undefined) {
return undefined;
}
const metricData = await gauge.get();
const metricValueWithMatchingLabels = this.findMetricValueWithExactMatchingLabels(metricData, labels);
return metricValueWithMatchingLabels?.value;
}
/**
* Increments a counter metric
* @param name - Name of the metric
* @param labels - Labels for the metric
* @param value - Value to increment by (default is 1)
* @returns void
*/
public incrementCounter(name: LineaPostmanMetrics, labels: Record<string, string> = {}, value?: number): void {
const counter = this.counters.get(name);
if (counter !== undefined) {
counter.inc(labels, value);
}
}
/**
* Increment a gauge metric value
* @param name - Name of the metric
* @param labels - Labels for the metric
* @param value - Value to increment by (default is 1)
* @returns void
*/
public incrementGauge(name: LineaPostmanMetrics, labels: Record<string, string> = {}, value?: number): void {
const gauge = this.gauges.get(name);
if (gauge !== undefined) {
gauge.inc(labels, value);
}
}
/**
* Decrement a gauge metric value
* @param name - Name of the metric
* @param value - Value to decrement by (default is 1)
* @param labels - Labels for the metric
* @returns void
*/
public decrementGauge(name: LineaPostmanMetrics, labels: Record<string, string> = {}, value?: number): void {
const gauge = this.gauges.get(name);
if (gauge !== undefined) {
gauge.dec(labels, value);
}
}
private findMetricValueWithExactMatchingLabels(
metricData: MetricObjectWithValues<MetricValue<string>>,
labels: Record<string, string>,
): MetricValue<string> | undefined {
return metricData.values.find((value) => Object.entries(labels).every(([key, val]) => value.labels[key] === val));
}
}

View File

@@ -0,0 +1,64 @@
import { EntityManager, SelectQueryBuilder } from "typeorm";
import { Direction } from "@consensys/linea-sdk";
import { MessageMetricsService } from "../MessageMetricsService";
import { mock, MockProxy } from "jest-mock-extended";
import { MessageStatus } from "../../../../../core/enums";
import { LineaPostmanMetrics } from "../../../../../core/metrics/IMetricsService";
describe("MessageMetricsService", () => {
let messageMetricsService: MessageMetricsService;
let mockEntityManager: MockProxy<EntityManager>;
beforeEach(() => {
mockEntityManager = mock<EntityManager>();
messageMetricsService = new MessageMetricsService(mockEntityManager);
});
it("should update gauges based on message status", async () => {
jest.spyOn(mockEntityManager, "maximum").mockResolvedValue(10);
jest.spyOn(mockEntityManager, "createQueryBuilder").mockReturnValue({
select: jest.fn().mockReturnThis(),
addSelect: jest.fn().mockReturnThis(),
groupBy: jest.fn().mockReturnThis(),
addGroupBy: jest.fn().mockReturnThis(),
getRawMany: jest.fn().mockResolvedValue([
{ status: MessageStatus.SENT, direction: Direction.L1_TO_L2, count: 5 },
{ status: MessageStatus.CLAIMED_SUCCESS, direction: Direction.L1_TO_L2, count: 10 },
]),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as unknown as SelectQueryBuilder<any>);
await messageMetricsService.initialize();
// Check if the gauge was updated
expect(
await messageMetricsService.getGaugeValue(LineaPostmanMetrics.Messages, {
status: MessageStatus.SENT,
direction: Direction.L1_TO_L2,
}),
).toBe(5);
expect(
await messageMetricsService.getGaugeValue(LineaPostmanMetrics.Messages, {
status: MessageStatus.CLAIMED_SUCCESS,
direction: Direction.L1_TO_L2,
}),
).toBe(10);
});
it("should return the correct gauge value", async () => {
messageMetricsService.incrementGauge(
LineaPostmanMetrics.Messages,
{
status: "processed",
direction: Direction.L1_TO_L2,
},
10,
);
const gaugeValue = await messageMetricsService.getGaugeValue(LineaPostmanMetrics.Messages, {
status: "processed",
direction: Direction.L1_TO_L2,
});
expect(gaugeValue).toBe(10);
});
});

View File

@@ -0,0 +1,60 @@
import { Counter, Gauge } from "prom-client";
import { LineaPostmanMetrics } from "../../../../../core/metrics/IMetricsService";
import { MetricsService } from "../MetricsService";
class TestMetricService extends MetricsService {
constructor() {
super();
}
}
describe("MetricsService", () => {
let metricService: TestMetricService;
beforeEach(() => {
metricService = new TestMetricService();
});
it("should create a counter", () => {
const counter = metricService.createCounter(LineaPostmanMetrics.Messages, "A test counter");
expect(counter).toBeInstanceOf(Counter);
});
it("should increment a counter", async () => {
const counter = metricService.createCounter(LineaPostmanMetrics.Messages, "A test counter");
metricService.incrementCounter(LineaPostmanMetrics.Messages, {}, 1);
expect((await counter.get()).values[0].value).toBe(1);
});
it("should create a gauge", () => {
const gauge = metricService.createGauge(LineaPostmanMetrics.Messages, "A test gauge");
expect(gauge).toBeInstanceOf(Gauge);
});
it("should increment a gauge", async () => {
const gauge = metricService.createGauge(LineaPostmanMetrics.Messages, "A test gauge");
metricService.incrementGauge(LineaPostmanMetrics.Messages, {}, 5);
expect((await gauge.get()).values[0].value).toBe(5);
});
it("should decrement a gauge", async () => {
metricService.createGauge(LineaPostmanMetrics.Messages, "A test gauge");
metricService.incrementGauge(LineaPostmanMetrics.Messages, {}, 5);
metricService.decrementGauge(LineaPostmanMetrics.Messages, {}, 2);
expect(await metricService.getGaugeValue(LineaPostmanMetrics.Messages, {})).toBe(3);
});
it("should return the correct counter value", async () => {
metricService.createCounter(LineaPostmanMetrics.Messages, "A test counter");
metricService.incrementCounter(LineaPostmanMetrics.Messages, {}, 5);
const counterValue = await metricService.getCounterValue(LineaPostmanMetrics.Messages, {});
expect(counterValue).toBe(5);
});
it("should return the correct gauge value", async () => {
metricService.createGauge(LineaPostmanMetrics.Messages, "A test gauge");
metricService.incrementGauge(LineaPostmanMetrics.Messages, {}, 10);
const gaugeValue = await metricService.getGaugeValue(LineaPostmanMetrics.Messages, {});
expect(gaugeValue).toBe(10);
});
});

View File

@@ -11,7 +11,7 @@ import {
MessageSentEventProcessor,
L2ClaimMessageTransactionSizeProcessor,
} from "../../../services/processors";
import { PostmanOptions } from "./config/config";
import { PostmanConfig, PostmanOptions } from "./config/config";
import { DB } from "../persistence/dataSource";
import {
MessageSentEventPoller,
@@ -26,6 +26,9 @@ import { L2ClaimTransactionSizeCalculator } from "../../../services/L2ClaimTrans
import { LineaTransactionValidationService } from "../../../services/LineaTransactionValidationService";
import { EthereumTransactionValidationService } from "../../../services/EthereumTransactionValidationService";
import { getConfig } from "./config/utils";
import { Api } from "../api/Api";
import { MessageStatusSubscriber } from "../persistence/subscribers/MessageStatusSubscriber";
import { MessageMetricsService } from "../api/metrics/MessageMetricsService";
export class PostmanServiceClient {
// L1 -> L2 flow
@@ -49,6 +52,8 @@ export class PostmanServiceClient {
private l1L2AutoClaimEnabled: boolean;
private l2L1AutoClaimEnabled: boolean;
private api: Api;
private config: PostmanConfig;
/**
* Initializes a new instance of the PostmanServiceClient.
@@ -57,6 +62,7 @@ export class PostmanServiceClient {
*/
constructor(options: PostmanOptions) {
const config = getConfig(options);
this.config = config;
this.logger = new WinstonLogger(PostmanServiceClient.name, config.loggerOptions);
this.l1L2AutoClaimEnabled = config.l1L2AutoClaimEnabled;
@@ -359,10 +365,50 @@ export class PostmanServiceClient {
}
/**
* Initializes the database connection using the configuration provided.
* Initializes the database connection.
*/
public async connectDatabase() {
await this.db.initialize();
public async initializeDatabase(): Promise<void> {
try {
await this.db.initialize();
this.logger.info("Database connection established successfully.");
} catch (error) {
this.logger.error("Failed to connect to the database.", error);
throw error;
}
}
/**
* Initializes metrics, registers subscribers, and configures the API.
* This method expects the database to be connected.
*/
public async initializeMetricsAndApi(): Promise<void> {
try {
const metricService = new MessageMetricsService(this.db.manager);
await metricService.initialize();
const messageStatusSubscriber = new MessageStatusSubscriber(
metricService,
new WinstonLogger(MessageStatusSubscriber.name),
);
this.db.subscribers.push(messageStatusSubscriber);
// Initialize or reinitialize the API using the metrics service.
this.api = new Api({ port: this.config.apiConfig.port }, metricService, new WinstonLogger(Api.name));
this.logger.info("Metrics and API have been initialized successfully.");
} catch (error) {
this.logger.error("Failed to initialize metrics or API.", error);
throw error;
}
}
/**
* Connects services by first initializing the database and then setting up metrics and the API.
*/
public async connectServices(): Promise<void> {
// Database initialization must happen before metrics initialization
await this.initializeDatabase();
await this.initializeMetricsAndApi();
}
/**
@@ -389,6 +435,8 @@ export class PostmanServiceClient {
// Database Cleaner
this.databaseCleaningPoller.start();
this.api.start();
this.logger.info("All listeners and message deliverers have been started.");
}
@@ -416,6 +464,8 @@ export class PostmanServiceClient {
// Database Cleaner
this.databaseCleaningPoller.stop();
this.api.stop();
this.logger.info("All listeners and message deliverers have been stopped.");
}
}

View File

@@ -26,6 +26,9 @@ import { DatabaseCleaningPoller } from "../../../../services/pollers/DatabaseCle
import { TypeOrmMessageRepository } from "../../persistence/repositories/TypeOrmMessageRepository";
import { L2ClaimMessageTransactionSizePoller } from "../../../../services/pollers/L2ClaimMessageTransactionSizePoller";
import { DEFAULT_MAX_CLAIM_GAS_LIMIT } from "../../../../core/constants";
import { MessageStatusSubscriber } from "../../persistence/subscribers/MessageStatusSubscriber";
import { MessageMetricsService } from "../../api/metrics/MessageMetricsService";
import { Api } from "../../api/Api";
jest.mock("ethers", () => {
const allAutoMocked = jest.createMockFromModule("ethers");
@@ -171,8 +174,9 @@ describe("PostmanServiceClient", () => {
});
});
describe("connectDatabase", () => {
it("should initialize the db", async () => {
describe("connectServices", () => {
it("should initialize API and database", async () => {
jest.spyOn(MessageMetricsService.prototype, "initialize").mockResolvedValueOnce();
const initializeSpy = jest.spyOn(DataSource.prototype, "initialize").mockResolvedValue(
new DataSource({
type: "postgres",
@@ -190,19 +194,20 @@ describe("PostmanServiceClient", () => {
RemoveUniqueConstraint1689084924789,
AddNewIndexes1701265652528,
],
subscribers: [MessageStatusSubscriber],
migrationsTableName: "migrations",
logging: ["error"],
migrationsRun: true,
}),
);
await postmanServiceClient.connectDatabase();
await postmanServiceClient.connectServices();
expect(initializeSpy).toHaveBeenCalledTimes(1);
});
});
describe("startAllServices", () => {
it("should start all postman services", () => {
it("should start all postman services", async () => {
jest.spyOn(MessageSentEventPoller.prototype, "start").mockImplementationOnce(jest.fn());
jest.spyOn(MessageAnchoringPoller.prototype, "start").mockImplementationOnce(jest.fn());
jest.spyOn(MessageClaimingPoller.prototype, "start").mockImplementationOnce(jest.fn());
@@ -210,27 +215,35 @@ describe("PostmanServiceClient", () => {
jest.spyOn(MessagePersistingPoller.prototype, "start").mockImplementationOnce(jest.fn());
jest.spyOn(DatabaseCleaningPoller.prototype, "start").mockImplementationOnce(jest.fn());
jest.spyOn(TypeOrmMessageRepository.prototype, "getLatestMessageSent").mockImplementationOnce(jest.fn());
jest.spyOn(Api.prototype, "start").mockImplementationOnce(jest.fn());
jest.spyOn(MessageMetricsService.prototype, "initialize").mockResolvedValueOnce();
await postmanServiceClient.initializeMetricsAndApi();
postmanServiceClient.startAllServices();
expect(loggerSpy).toHaveBeenCalledTimes(5);
expect(loggerSpy).toHaveBeenCalledWith("All listeners and message deliverers have been started.");
expect(loggerSpy).toHaveBeenCalledTimes(6);
expect(loggerSpy).toHaveBeenLastCalledWith("All listeners and message deliverers have been started.");
postmanServiceClient.stopAllServices();
});
it("should stop all postman services", () => {
it("should stop all postman services", async () => {
jest.spyOn(MessageSentEventPoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(MessageAnchoringPoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(MessageClaimingPoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(L2ClaimMessageTransactionSizePoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(MessagePersistingPoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(DatabaseCleaningPoller.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(Api.prototype, "stop").mockImplementationOnce(jest.fn());
jest.spyOn(MessageMetricsService.prototype, "initialize").mockResolvedValueOnce();
await postmanServiceClient.initializeMetricsAndApi();
postmanServiceClient.stopAllServices();
expect(loggerSpy).toHaveBeenCalledTimes(9);
expect(loggerSpy).toHaveBeenCalledWith("All listeners and message deliverers have been stopped.");
expect(loggerSpy).toHaveBeenCalledTimes(10);
expect(loggerSpy).toHaveBeenLastCalledWith("All listeners and message deliverers have been stopped.");
});
});
});

View File

@@ -133,6 +133,9 @@ describe("Config utils", () => {
},
l2L1AutoClaimEnabled: false,
loggerOptions: undefined,
apiConfig: {
port: 3000,
},
});
});
@@ -169,6 +172,9 @@ describe("Config utils", () => {
databaseCleanerOptions: {
enabled: true,
},
apiOptions: {
port: 9090,
},
});
expect(config).toStrictEqual({
databaseCleanerConfig: {
@@ -242,6 +248,9 @@ describe("Config utils", () => {
},
l2L1AutoClaimEnabled: true,
loggerOptions: undefined,
apiConfig: {
port: 9090,
},
});
});
});

View File

@@ -16,6 +16,7 @@ export type PostmanOptions = {
databaseOptions: DBOptions;
databaseCleanerOptions?: DBCleanerOptions;
loggerOptions?: LoggerOptions;
apiOptions?: ApiOptions;
};
/**
@@ -29,6 +30,7 @@ export type PostmanConfig = {
databaseOptions: DBOptions;
databaseCleanerConfig: DBCleanerConfig;
loggerOptions?: LoggerOptions;
apiConfig: ApiConfig;
};
/**
@@ -113,3 +115,9 @@ export type ListenerOptions = {
export type ListenerConfig = Required<Omit<ListenerOptions, "eventFilters">> &
Partial<Pick<ListenerOptions, "eventFilters">>;
export type ApiOptions = {
port?: number;
};
export type ApiConfig = Required<ApiOptions>;

View File

@@ -38,6 +38,7 @@ export function getConfig(postmanOptions: PostmanOptions): PostmanConfig {
databaseOptions,
databaseCleanerOptions,
loggerOptions,
apiOptions,
} = postmanOptions;
if (l1Options.listener.eventFilters) {
@@ -124,6 +125,9 @@ export function getConfig(postmanOptions: PostmanOptions): PostmanConfig {
daysBeforeNowToDelete: databaseCleanerOptions?.daysBeforeNowToDelete ?? 14,
},
loggerOptions,
apiConfig: {
port: apiOptions?.port ?? 3000,
},
};
}

View File

@@ -302,6 +302,13 @@ export class TypeOrmMessageRepository<TransactionResponse extends ContractTransa
claimTxHash: tx.hash,
},
);
// Store updated entity in the queryRunner to access it in the afterTransactionCommit hook
entityManager.queryRunner!.data.updatedEntity = {
previousStatus: message.status,
newStatus: MessageStatus.PENDING,
direction: message.direction,
};
});
}
}

View File

@@ -0,0 +1,124 @@
import {
EventSubscriber,
EntitySubscriberInterface,
InsertEvent,
UpdateEvent,
RemoveEvent,
TransactionCommitEvent,
} from "typeorm";
import { Direction } from "@consensys/linea-sdk";
import { MessageEntity } from "../entities/Message.entity";
import { IMetricsService, LineaPostmanMetrics } from "../../../../core/metrics/IMetricsService";
import { ILogger } from "../../../../core/utils/logging/ILogger";
import { MessageStatus } from "../../../../core/enums";
@EventSubscriber()
export class MessageStatusSubscriber implements EntitySubscriberInterface<MessageEntity> {
constructor(
private readonly metricsService: IMetricsService,
private readonly logger: ILogger,
) {}
listenTo() {
return MessageEntity;
}
async afterInsert(event: InsertEvent<MessageEntity>): Promise<void> {
const { status, direction } = event.entity;
this.updateMessageMetricsOnInsert(status, direction);
}
async afterUpdate(event: UpdateEvent<MessageEntity>): Promise<void> {
if (!event.entity || !event.databaseEntity) return;
const prevStatus = event.databaseEntity.status;
const newStatus = event.entity.status;
const direction = event.databaseEntity.direction;
if (prevStatus !== newStatus) {
await this.swapStatus(
LineaPostmanMetrics.Messages,
{ status: prevStatus, direction },
{ status: newStatus, direction },
);
}
}
async afterRemove(event: RemoveEvent<MessageEntity>): Promise<void> {
if (event.entity) {
await this.updateMessageMetricsOnRemove(event.databaseEntity.status, event.databaseEntity.direction);
}
}
async afterTransactionCommit(event: TransactionCommitEvent): Promise<void> {
const updatedEntity = event.queryRunner?.data?.updatedEntity;
if (updatedEntity) {
await this.swapStatus(
LineaPostmanMetrics.Messages,
{ status: updatedEntity.previousStatus, direction: updatedEntity.direction },
{ status: updatedEntity.newStatus, direction: updatedEntity.direction },
);
}
}
private async updateMessageMetricsOnInsert(messageStatus: MessageStatus, messageDirection: Direction): Promise<void> {
try {
const prevGaugeValue = await this.metricsService.getGaugeValue(LineaPostmanMetrics.Messages, {
status: messageStatus,
direction: messageDirection,
});
if (prevGaugeValue === undefined) {
return;
}
this.metricsService.incrementGauge(LineaPostmanMetrics.Messages, {
status: messageStatus,
direction: messageDirection,
});
} catch (error) {
this.logger.error("Failed to update metrics:", error);
}
}
private async updateMessageMetricsOnRemove(messageStatus: MessageStatus, messageDirection: Direction): Promise<void> {
try {
const prevGaugeValue = await this.metricsService.getGaugeValue(LineaPostmanMetrics.Messages, {
status: messageStatus,
direction: messageDirection,
});
if (prevGaugeValue && prevGaugeValue > 0) {
this.metricsService.decrementGauge(LineaPostmanMetrics.Messages, {
status: messageStatus,
direction: messageDirection,
});
}
} catch (error) {
this.logger.error("Failed to update metrics:", error);
}
}
private async swapStatus(
name: LineaPostmanMetrics,
previous: Record<string, string>,
next: Record<string, string>,
): Promise<void> {
try {
const [prevVal, newVal] = await Promise.all([
this.metricsService.getGaugeValue(name, previous),
this.metricsService.getGaugeValue(name, next),
]);
if (prevVal && prevVal > 0) {
this.metricsService.decrementGauge(name, previous);
}
if (newVal !== undefined) {
this.metricsService.incrementGauge(name, next);
}
} catch (error) {
this.logger.error("Metrics swap failed:", error);
}
}
}

View File

@@ -0,0 +1,16 @@
import { Counter, Gauge, Registry } from "prom-client";
export enum LineaPostmanMetrics {
Messages = "linea_postman_messages",
}
export interface IMetricsService {
getRegistry(): Registry;
createCounter(name: LineaPostmanMetrics, help: string, labelNames?: string[]): Counter<string>;
createGauge(name: LineaPostmanMetrics, help: string, labelNames?: string[]): Gauge<string>;
incrementCounter(name: LineaPostmanMetrics, labels?: Record<string, string>, value?: number): void;
incrementGauge(name: LineaPostmanMetrics, labels?: Record<string, string>, value?: number): void;
decrementGauge(name: LineaPostmanMetrics, labels?: Record<string, string>, value?: number): void;
getGaugeValue(name: LineaPostmanMetrics, labels: Record<string, string>): Promise<number | undefined>;
getCounterValue(name: LineaPostmanMetrics, labels: Record<string, string>): Promise<number | undefined>;
}

View File

@@ -236,15 +236,17 @@ export class MessageClaimingProcessor implements IMessageClaimingProcessor {
maxFeePerGas: bigint,
): Promise<boolean> {
if (isUnderPriced) {
this.logger.warn(
"Fee underpriced found in this message: messageHash=%s messageInfo=%s transactionGasLimit=%s maxFeePerGas=%s",
message.messageHash,
message.toString(),
estimatedGasLimit?.toString(),
maxFeePerGas.toString(),
);
message.edit({ status: MessageStatus.FEE_UNDERPRICED });
await this.databaseService.updateMessage(message);
if (message.status !== MessageStatus.FEE_UNDERPRICED) {
this.logger.warn(
"Fee underpriced found in this message: messageHash=%s messageInfo=%s transactionGasLimit=%s maxFeePerGas=%s",
message.messageHash,
message.toString(),
estimatedGasLimit?.toString(),
maxFeePerGas.toString(),
);
message.edit({ status: MessageStatus.FEE_UNDERPRICED });
await this.databaseService.updateMessage(message);
}
return true;
}
return false;