diff --git a/packages/lodestar/src/network/gossip/encoding.ts b/packages/lodestar/src/network/gossip/encoding.ts index b751b7e002..65f9219f1b 100644 --- a/packages/lodestar/src/network/gossip/encoding.ts +++ b/packages/lodestar/src/network/gossip/encoding.ts @@ -1,48 +1,28 @@ import {compress, uncompress} from "snappyjs"; import {intToBytes} from "@chainsafe/lodestar-utils"; import {hash} from "@chainsafe/ssz"; +import {ForkName} from "@chainsafe/lodestar-params"; import { DEFAULT_ENCODING, GOSSIP_MSGID_LENGTH, MESSAGE_DOMAIN_INVALID_SNAPPY, MESSAGE_DOMAIN_VALID_SNAPPY, } from "./constants"; -import {GossipEncoding, GossipTopic} from "./interface"; -import {ForkName} from "@chainsafe/lodestar-params"; - -export interface IUncompressCache { - uncompress(input: Uint8Array): Uint8Array; -} - -export class UncompressCache implements IUncompressCache { - private cache = new WeakMap(); - - uncompress(input: Uint8Array): Uint8Array { - let uncompressed = this.cache.get(input); - if (!uncompressed) { - uncompressed = uncompress(input); - this.cache.set(input, uncompressed); - } - return uncompressed; - } -} +import {Eth2InMessage, GossipEncoding, GossipTopic} from "./interface"; /** - * Decode message using `IUncompressCache`. Message will have been uncompressed before to compute the msgId. - * We must re-use that result to prevent uncompressing the object again here. + * Uncompressed data is used to + * - compute message id + * - if message is not seen then we use it to deserialize to gossip object + * + * We cache uncompressed data in InMessage to prevent uncompressing multiple times. */ -export function decodeMessageData( - encoding: GossipEncoding, - msgData: Uint8Array, - uncompressCache: IUncompressCache -): Uint8Array { - switch (encoding) { - case GossipEncoding.ssz_snappy: - return uncompressCache.uncompress(msgData); - - default: - throw new Error(`Unsupported encoding ${encoding}`); +export function getUncompressedData(msg: Eth2InMessage): Uint8Array { + if (!msg.uncompressedData) { + msg.uncompressedData = uncompress(msg.data); } + + return msg.uncompressedData; } export function encodeMessageData(encoding: GossipEncoding, msgData: Uint8Array): Uint8Array { @@ -58,18 +38,13 @@ export function encodeMessageData(encoding: GossipEncoding, msgData: Uint8Array) /** * Function to compute message id for all forks. */ -export function computeMsgId( - topic: GossipTopic, - topicStr: string, - msgData: Uint8Array, - uncompressCache: IUncompressCache -): Uint8Array { +export function computeMsgId(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array { switch (topic.fork) { case ForkName.phase0: - return computeMsgIdPhase0(topic, msgData, uncompressCache); + return computeMsgIdPhase0(topic, msg); case ForkName.altair: case ForkName.bellatrix: - return computeMsgIdAltair(topic, topicStr, msgData, uncompressCache); + return computeMsgIdAltair(topic, topicStr, msg); } } @@ -79,18 +54,14 @@ export function computeMsgId( * SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20] * ``` */ -export function computeMsgIdPhase0( - topic: GossipTopic, - msgData: Uint8Array, - uncompressCache: IUncompressCache -): Uint8Array { +export function computeMsgIdPhase0(topic: GossipTopic, msg: Eth2InMessage): Uint8Array { switch (topic.encoding ?? DEFAULT_ENCODING) { case GossipEncoding.ssz_snappy: try { - const uncompressed = uncompressCache.uncompress(msgData); + const uncompressed = getUncompressedData(msg); return hashGossipMsgData(MESSAGE_DOMAIN_VALID_SNAPPY, uncompressed); } catch (e) { - return hashGossipMsgData(MESSAGE_DOMAIN_INVALID_SNAPPY, msgData); + return hashGossipMsgData(MESSAGE_DOMAIN_INVALID_SNAPPY, msg.data); } } } @@ -108,16 +79,11 @@ export function computeMsgIdPhase0( * ``` * https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages */ -export function computeMsgIdAltair( - topic: GossipTopic, - topicStr: string, - msgData: Uint8Array, - uncompressCache: IUncompressCache -): Uint8Array { +export function computeMsgIdAltair(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array { switch (topic.encoding ?? DEFAULT_ENCODING) { case GossipEncoding.ssz_snappy: try { - const uncompressed = uncompressCache.uncompress(msgData); + const uncompressed = getUncompressedData(msg); return hashGossipMsgData( MESSAGE_DOMAIN_VALID_SNAPPY, intToBytes(topicStr.length, 8), @@ -129,7 +95,7 @@ export function computeMsgIdAltair( MESSAGE_DOMAIN_INVALID_SNAPPY, intToBytes(topicStr.length, 8), Buffer.from(topicStr), - msgData + msg.data ); } } diff --git a/packages/lodestar/src/network/gossip/gossipsub.ts b/packages/lodestar/src/network/gossip/gossipsub.ts index 81a9cb6b47..a8950ded91 100644 --- a/packages/lodestar/src/network/gossip/gossipsub.ts +++ b/packages/lodestar/src/network/gossip/gossipsub.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/naming-convention */ import Gossipsub from "libp2p-gossipsub"; import {ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants"; -import {InMessage} from "libp2p-interfaces/src/pubsub"; import Libp2p from "libp2p"; import {AbortSignal} from "@chainsafe/abort-controller"; import {IBeaconConfig} from "@chainsafe/lodestar-config"; @@ -19,9 +18,10 @@ import { GossipTypeMap, ValidatorFnsByType, GossipHandlers, + Eth2InMessage, } from "./interface"; import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic} from "./topic"; -import {computeMsgId, encodeMessageData, UncompressCache} from "./encoding"; +import {computeMsgId, encodeMessageData} from "./encoding"; import {DEFAULT_ENCODING} from "./constants"; import {GossipValidationError} from "./errors"; import {GOSSIP_MAX_SIZE} from "../../constants"; @@ -30,7 +30,6 @@ import {Map2d, Map2dArr} from "../../util/map"; import pipe from "it-pipe"; import PeerStreams from "libp2p-interfaces/src/pubsub/peer-streams"; import BufferList from "bl"; -// import {RPC} from "libp2p-interfaces/src/pubsub/message/rpc"; import {RPC} from "libp2p-gossipsub/src/message/rpc"; import {normalizeInRpcMessage} from "libp2p-interfaces/src/pubsub/utils"; @@ -54,12 +53,6 @@ export interface IGossipsubModules { gossipHandlers: GossipHandlers; } -/** - * Cache message id right in message so that we don't have to compute it twice. - * When we send messages to other peers, protobuf will just ignore `msgId` field. - */ -type Eth2InMessage = InMessage & {msgId?: Uint8Array}; - /** * Wrapper around js-libp2p-gossipsub with the following extensions: * - Eth2 message id @@ -80,7 +73,6 @@ export class Eth2Gossipsub extends Gossipsub { // Internal caches private readonly gossipTopicCache: GossipTopicCache; - private readonly uncompressCache = new UncompressCache(); private readonly validatorFnsByType: ValidatorFnsByType; @@ -108,7 +100,6 @@ export class Eth2Gossipsub extends Gossipsub { const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, { config, logger, - uncompressCache: this.uncompressCache, metrics, signal, }); @@ -144,7 +135,7 @@ export class Eth2Gossipsub extends Gossipsub { if (!msgId) { const topicStr = msg.topicIDs[0]; const topic = this.gossipTopicCache.getTopic(topicStr); - msgId = computeMsgId(topic, topicStr, msg.data, this.uncompressCache); + msgId = computeMsgId(topic, topicStr, msg); msg.msgId = msgId; } return msgId; @@ -233,7 +224,7 @@ export class Eth2Gossipsub extends Gossipsub { * @override https://github.com/libp2p/js-libp2p-interfaces/blob/ff3bd10704a4c166ce63135747e3736915b0be8d/src/pubsub/index.js#L513 * Note: this does not call super. All logic is re-implemented below */ - async validate(message: InMessage): Promise { + async validate(message: Eth2InMessage): Promise { try { // messages must have a single topicID const topicStr = Array.isArray(message.topicIDs) ? message.topicIDs[0] : undefined; diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index f6670481fd..9e18d6b313 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -103,6 +103,15 @@ export interface IGossipModules { chain: IBeaconChain; } +/** + * Extend the standard InMessage with additional fields so that we don't have to compute them twice. + * When we send messages to other peers, protobuf will just ignore these fields. + */ +export type Eth2InMessage = InMessage & { + msgId?: Uint8Array; + uncompressedData?: Uint8Array; +}; + /** * Contains various methods for validation of incoming gossip topic data. * The conditions for valid gossip topics and how they are handled are specified here: @@ -114,7 +123,7 @@ export interface IGossipModules { * * js-libp2p-gossipsub expects validation functions that look like this */ -export type GossipValidatorFn = (topic: GossipTopic, message: InMessage, seenTimestampSec: number) => Promise; +export type GossipValidatorFn = (topic: GossipTopic, message: Eth2InMessage, seenTimestampSec: number) => Promise; export type ValidatorFnsByType = {[K in GossipType]: GossipValidatorFn}; diff --git a/packages/lodestar/src/network/gossip/validation/index.ts b/packages/lodestar/src/network/gossip/validation/index.ts index d235a41875..b15030da7a 100644 --- a/packages/lodestar/src/network/gossip/validation/index.ts +++ b/packages/lodestar/src/network/gossip/validation/index.ts @@ -14,16 +14,14 @@ import { } from "../interface"; import {GossipValidationError} from "../errors"; import {GossipActionError, GossipAction} from "../../../chain/errors"; -import {decodeMessageData, UncompressCache} from "../encoding"; import {createValidationQueues} from "./queue"; -import {DEFAULT_ENCODING} from "../constants"; import {getGossipAcceptMetadataByType, GetGossipAcceptMetadataFn} from "./onAccept"; +import {getUncompressedData} from "../encoding"; type ValidatorFnModules = { config: IChainForkConfig; logger: ILogger; metrics: IMetrics | null; - uncompressCache: UncompressCache; }; /** @@ -72,19 +70,17 @@ function getGossipValidatorFn( type: K, modules: ValidatorFnModules ): GossipValidatorFn { - const {config, logger, metrics, uncompressCache} = modules; + const {config, logger, metrics} = modules; const getGossipObjectAcceptMetadata = getGossipAcceptMetadataByType[type] as GetGossipAcceptMetadataFn; return async function gossipValidatorFn(topic, gossipMsg, seenTimestampSec) { // Define in scope above try {} to be used in catch {} if object was parsed let gossipObject; try { - const encoding = topic.encoding ?? DEFAULT_ENCODING; - // Deserialize object from bytes ONLY after being picked up from the validation queue try { const sszType = getGossipSSZType(topic); - const messageData = decodeMessageData(encoding, gossipMsg.data, uncompressCache); + const messageData = getUncompressedData(gossipMsg); gossipObject = // TODO: Review if it's really necessary to deserialize this as TreeBacked topic.type === GossipType.beacon_block || topic.type === GossipType.beacon_aggregate_and_proof