Cache uncompressed data in Eth2InMessage (#3839)

This commit is contained in:
tuyennhv
2022-03-09 23:44:59 +07:00
committed by GitHub
parent f597feab2f
commit 1378bb990b
4 changed files with 38 additions and 76 deletions

View File

@@ -1,48 +1,28 @@
import {compress, uncompress} from "snappyjs";
import {intToBytes} from "@chainsafe/lodestar-utils";
import {hash} from "@chainsafe/ssz";
import {ForkName} from "@chainsafe/lodestar-params";
import {
DEFAULT_ENCODING,
GOSSIP_MSGID_LENGTH,
MESSAGE_DOMAIN_INVALID_SNAPPY,
MESSAGE_DOMAIN_VALID_SNAPPY,
} from "./constants";
import {GossipEncoding, GossipTopic} from "./interface";
import {ForkName} from "@chainsafe/lodestar-params";
export interface IUncompressCache {
uncompress(input: Uint8Array): Uint8Array;
}
export class UncompressCache implements IUncompressCache {
private cache = new WeakMap<Uint8Array, Uint8Array>();
uncompress(input: Uint8Array): Uint8Array {
let uncompressed = this.cache.get(input);
if (!uncompressed) {
uncompressed = uncompress(input);
this.cache.set(input, uncompressed);
}
return uncompressed;
}
}
import {Eth2InMessage, GossipEncoding, GossipTopic} from "./interface";
/**
* Decode message using `IUncompressCache`. Message will have been uncompressed before to compute the msgId.
* We must re-use that result to prevent uncompressing the object again here.
* Uncompressed data is used to
* - compute message id
* - if message is not seen then we use it to deserialize to gossip object
*
* We cache uncompressed data in InMessage to prevent uncompressing multiple times.
*/
export function decodeMessageData(
encoding: GossipEncoding,
msgData: Uint8Array,
uncompressCache: IUncompressCache
): Uint8Array {
switch (encoding) {
case GossipEncoding.ssz_snappy:
return uncompressCache.uncompress(msgData);
default:
throw new Error(`Unsupported encoding ${encoding}`);
export function getUncompressedData(msg: Eth2InMessage): Uint8Array {
if (!msg.uncompressedData) {
msg.uncompressedData = uncompress(msg.data);
}
return msg.uncompressedData;
}
export function encodeMessageData(encoding: GossipEncoding, msgData: Uint8Array): Uint8Array {
@@ -58,18 +38,13 @@ export function encodeMessageData(encoding: GossipEncoding, msgData: Uint8Array)
/**
* Function to compute message id for all forks.
*/
export function computeMsgId(
topic: GossipTopic,
topicStr: string,
msgData: Uint8Array,
uncompressCache: IUncompressCache
): Uint8Array {
export function computeMsgId(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array {
switch (topic.fork) {
case ForkName.phase0:
return computeMsgIdPhase0(topic, msgData, uncompressCache);
return computeMsgIdPhase0(topic, msg);
case ForkName.altair:
case ForkName.bellatrix:
return computeMsgIdAltair(topic, topicStr, msgData, uncompressCache);
return computeMsgIdAltair(topic, topicStr, msg);
}
}
@@ -79,18 +54,14 @@ export function computeMsgId(
* SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20]
* ```
*/
export function computeMsgIdPhase0(
topic: GossipTopic,
msgData: Uint8Array,
uncompressCache: IUncompressCache
): Uint8Array {
export function computeMsgIdPhase0(topic: GossipTopic, msg: Eth2InMessage): Uint8Array {
switch (topic.encoding ?? DEFAULT_ENCODING) {
case GossipEncoding.ssz_snappy:
try {
const uncompressed = uncompressCache.uncompress(msgData);
const uncompressed = getUncompressedData(msg);
return hashGossipMsgData(MESSAGE_DOMAIN_VALID_SNAPPY, uncompressed);
} catch (e) {
return hashGossipMsgData(MESSAGE_DOMAIN_INVALID_SNAPPY, msgData);
return hashGossipMsgData(MESSAGE_DOMAIN_INVALID_SNAPPY, msg.data);
}
}
}
@@ -108,16 +79,11 @@ export function computeMsgIdPhase0(
* ```
* https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages
*/
export function computeMsgIdAltair(
topic: GossipTopic,
topicStr: string,
msgData: Uint8Array,
uncompressCache: IUncompressCache
): Uint8Array {
export function computeMsgIdAltair(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array {
switch (topic.encoding ?? DEFAULT_ENCODING) {
case GossipEncoding.ssz_snappy:
try {
const uncompressed = uncompressCache.uncompress(msgData);
const uncompressed = getUncompressedData(msg);
return hashGossipMsgData(
MESSAGE_DOMAIN_VALID_SNAPPY,
intToBytes(topicStr.length, 8),
@@ -129,7 +95,7 @@ export function computeMsgIdAltair(
MESSAGE_DOMAIN_INVALID_SNAPPY,
intToBytes(topicStr.length, 8),
Buffer.from(topicStr),
msgData
msg.data
);
}
}

View File

@@ -1,7 +1,6 @@
/* eslint-disable @typescript-eslint/naming-convention */
import Gossipsub from "libp2p-gossipsub";
import {ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants";
import {InMessage} from "libp2p-interfaces/src/pubsub";
import Libp2p from "libp2p";
import {AbortSignal} from "@chainsafe/abort-controller";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
@@ -19,9 +18,10 @@ import {
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
Eth2InMessage,
} from "./interface";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic} from "./topic";
import {computeMsgId, encodeMessageData, UncompressCache} from "./encoding";
import {computeMsgId, encodeMessageData} from "./encoding";
import {DEFAULT_ENCODING} from "./constants";
import {GossipValidationError} from "./errors";
import {GOSSIP_MAX_SIZE} from "../../constants";
@@ -30,7 +30,6 @@ import {Map2d, Map2dArr} from "../../util/map";
import pipe from "it-pipe";
import PeerStreams from "libp2p-interfaces/src/pubsub/peer-streams";
import BufferList from "bl";
// import {RPC} from "libp2p-interfaces/src/pubsub/message/rpc";
import {RPC} from "libp2p-gossipsub/src/message/rpc";
import {normalizeInRpcMessage} from "libp2p-interfaces/src/pubsub/utils";
@@ -54,12 +53,6 @@ export interface IGossipsubModules {
gossipHandlers: GossipHandlers;
}
/**
* Cache message id right in message so that we don't have to compute it twice.
* When we send messages to other peers, protobuf will just ignore `msgId` field.
*/
type Eth2InMessage = InMessage & {msgId?: Uint8Array};
/**
* Wrapper around js-libp2p-gossipsub with the following extensions:
* - Eth2 message id
@@ -80,7 +73,6 @@ export class Eth2Gossipsub extends Gossipsub {
// Internal caches
private readonly gossipTopicCache: GossipTopicCache;
private readonly uncompressCache = new UncompressCache();
private readonly validatorFnsByType: ValidatorFnsByType;
@@ -108,7 +100,6 @@ export class Eth2Gossipsub extends Gossipsub {
const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, {
config,
logger,
uncompressCache: this.uncompressCache,
metrics,
signal,
});
@@ -144,7 +135,7 @@ export class Eth2Gossipsub extends Gossipsub {
if (!msgId) {
const topicStr = msg.topicIDs[0];
const topic = this.gossipTopicCache.getTopic(topicStr);
msgId = computeMsgId(topic, topicStr, msg.data, this.uncompressCache);
msgId = computeMsgId(topic, topicStr, msg);
msg.msgId = msgId;
}
return msgId;
@@ -233,7 +224,7 @@ export class Eth2Gossipsub extends Gossipsub {
* @override https://github.com/libp2p/js-libp2p-interfaces/blob/ff3bd10704a4c166ce63135747e3736915b0be8d/src/pubsub/index.js#L513
* Note: this does not call super. All logic is re-implemented below
*/
async validate(message: InMessage): Promise<void> {
async validate(message: Eth2InMessage): Promise<void> {
try {
// messages must have a single topicID
const topicStr = Array.isArray(message.topicIDs) ? message.topicIDs[0] : undefined;

View File

@@ -103,6 +103,15 @@ export interface IGossipModules {
chain: IBeaconChain;
}
/**
* Extend the standard InMessage with additional fields so that we don't have to compute them twice.
* When we send messages to other peers, protobuf will just ignore these fields.
*/
export type Eth2InMessage = InMessage & {
msgId?: Uint8Array;
uncompressedData?: Uint8Array;
};
/**
* Contains various methods for validation of incoming gossip topic data.
* The conditions for valid gossip topics and how they are handled are specified here:
@@ -114,7 +123,7 @@ export interface IGossipModules {
*
* js-libp2p-gossipsub expects validation functions that look like this
*/
export type GossipValidatorFn = (topic: GossipTopic, message: InMessage, seenTimestampSec: number) => Promise<void>;
export type GossipValidatorFn = (topic: GossipTopic, message: Eth2InMessage, seenTimestampSec: number) => Promise<void>;
export type ValidatorFnsByType = {[K in GossipType]: GossipValidatorFn};

View File

@@ -14,16 +14,14 @@ import {
} from "../interface";
import {GossipValidationError} from "../errors";
import {GossipActionError, GossipAction} from "../../../chain/errors";
import {decodeMessageData, UncompressCache} from "../encoding";
import {createValidationQueues} from "./queue";
import {DEFAULT_ENCODING} from "../constants";
import {getGossipAcceptMetadataByType, GetGossipAcceptMetadataFn} from "./onAccept";
import {getUncompressedData} from "../encoding";
type ValidatorFnModules = {
config: IChainForkConfig;
logger: ILogger;
metrics: IMetrics | null;
uncompressCache: UncompressCache;
};
/**
@@ -72,19 +70,17 @@ function getGossipValidatorFn<K extends GossipType>(
type: K,
modules: ValidatorFnModules
): GossipValidatorFn {
const {config, logger, metrics, uncompressCache} = modules;
const {config, logger, metrics} = modules;
const getGossipObjectAcceptMetadata = getGossipAcceptMetadataByType[type] as GetGossipAcceptMetadataFn;
return async function gossipValidatorFn(topic, gossipMsg, seenTimestampSec) {
// Define in scope above try {} to be used in catch {} if object was parsed
let gossipObject;
try {
const encoding = topic.encoding ?? DEFAULT_ENCODING;
// Deserialize object from bytes ONLY after being picked up from the validation queue
try {
const sszType = getGossipSSZType(topic);
const messageData = decodeMessageData(encoding, gossipMsg.data, uncompressCache);
const messageData = getUncompressedData(gossipMsg);
gossipObject =
// TODO: Review if it's really necessary to deserialize this as TreeBacked
topic.type === GossipType.beacon_block || topic.type === GossipType.beacon_aggregate_and_proof