mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 08:08:03 -05:00
Compare commits
2 Commits
v1.7.0
...
order-vali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5782cf5901 | ||
|
|
40648c106f |
@@ -85,6 +85,9 @@ type
|
||||
TopicHandler* {.public.} = proc(topic: string,
|
||||
data: seq[byte]): Future[void] {.gcsafe, raises: [].}
|
||||
|
||||
ValidationStrategy* {.pure, public.} = enum
|
||||
Parallel, Sequential
|
||||
|
||||
ValidatorHandler* {.public.} = proc(topic: string,
|
||||
message: Message): Future[ValidationResult] {.gcsafe, raises: [].}
|
||||
|
||||
@@ -109,7 +112,8 @@ type
|
||||
triggerSelf*: bool ## trigger own local handler on publish
|
||||
verifySignature*: bool ## enable signature verification
|
||||
sign*: bool ## enable message signing
|
||||
validators*: Table[string, HashSet[ValidatorHandler]]
|
||||
validators*: Table[string, OrderedSet[ValidatorHandler]]
|
||||
validationStrategy*: ValidationStrategy
|
||||
observers: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
|
||||
msgSeqno*: uint64
|
||||
@@ -500,7 +504,7 @@ method addValidator*(p: PubSub,
|
||||
## `Ignore` or `Reject` (which can descore the peer)
|
||||
for t in topic:
|
||||
trace "adding validator for topic", topicId = t
|
||||
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
|
||||
p.validators.mgetOrPut(t, OrderedSet[ValidatorHandler]()).incl(hook)
|
||||
|
||||
method removeValidator*(p: PubSub,
|
||||
topic: varargs[string],
|
||||
@@ -513,26 +517,37 @@ method removeValidator*(p: PubSub,
|
||||
|
||||
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
|
||||
var pending: seq[Future[ValidationResult]]
|
||||
trace "about to validate message"
|
||||
for topic in message.topicIds:
|
||||
trace "looking for validators on topic", topicId = topic,
|
||||
registered = toSeq(p.validators.keys)
|
||||
if topic in p.validators:
|
||||
trace "running validators for topic", topicId = topic
|
||||
for validator in p.validators[topic]:
|
||||
pending.add(validator(topic, message))
|
||||
|
||||
result = ValidationResult.Accept
|
||||
let futs = await allFinished(pending)
|
||||
for fut in futs:
|
||||
if fut.failed:
|
||||
result = ValidationResult.Reject
|
||||
break
|
||||
let res = fut.read()
|
||||
if res != ValidationResult.Accept:
|
||||
result = res
|
||||
if res == ValidationResult.Reject:
|
||||
|
||||
trace "about to validate message"
|
||||
block outer:
|
||||
for topic in message.topicIds:
|
||||
trace "looking for validators on topic", topicId = topic,
|
||||
registered = toSeq(p.validators.keys)
|
||||
if topic in p.validators:
|
||||
trace "running validators for topic", topicId = topic
|
||||
for validator in p.validators[topic]:
|
||||
case p.validationStrategy
|
||||
of ValidationStrategy.Parallel:
|
||||
pending.add(validator(topic, message))
|
||||
of ValidationStrategy.Sequential:
|
||||
let validatorRes = await validator(topic, message)
|
||||
# early break on first Reject/Ignore
|
||||
if validatorRes != ValidationResult.Accept:
|
||||
result = validatorRes
|
||||
break outer
|
||||
|
||||
if p.validationStrategy == ValidationStrategy.Parallel:
|
||||
let futs = await allFinished(pending)
|
||||
for fut in futs:
|
||||
if fut.failed:
|
||||
result = ValidationResult.Reject
|
||||
break
|
||||
let res = fut.read()
|
||||
if res != ValidationResult.Accept:
|
||||
result = res
|
||||
if res == ValidationResult.Reject:
|
||||
break
|
||||
|
||||
case result
|
||||
of ValidationResult.Accept:
|
||||
@@ -549,6 +564,7 @@ proc init*[PubParams: object | bool](
|
||||
anonymize: bool = false,
|
||||
verifySignature: bool = true,
|
||||
sign: bool = true,
|
||||
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
|
||||
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
|
||||
subscriptionValidator: SubscriptionValidator = nil,
|
||||
maxMessageSize: int = 1024 * 1024,
|
||||
@@ -563,6 +579,7 @@ proc init*[PubParams: object | bool](
|
||||
anonymize: anonymize,
|
||||
verifySignature: verifySignature,
|
||||
sign: sign,
|
||||
validationStrategy: validationStrategy,
|
||||
msgIdProvider: msgIdProvider,
|
||||
subscriptionValidator: subscriptionValidator,
|
||||
maxMessageSize: maxMessageSize,
|
||||
@@ -575,6 +592,7 @@ proc init*[PubParams: object | bool](
|
||||
anonymize: anonymize,
|
||||
verifySignature: verifySignature,
|
||||
sign: sign,
|
||||
validationStrategy: validationStrategy,
|
||||
msgIdProvider: msgIdProvider,
|
||||
subscriptionValidator: subscriptionValidator,
|
||||
parameters: parameters,
|
||||
|
||||
@@ -135,6 +135,137 @@ suite "GossipSub":
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "GossipSub with multiple validators (Sequential): early return":
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
check false # if we get here, it should fail
|
||||
|
||||
let
|
||||
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
|
||||
|
||||
# start switches
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start(),
|
||||
)
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
|
||||
await waitSubGraph(nodes, "foobar")
|
||||
|
||||
let gossip1 = GossipSub(nodes[0])
|
||||
let gossip2 = GossipSub(nodes[1])
|
||||
|
||||
# define 3 validators executed sequentially
|
||||
# validator1(Rejects)->validator2(Accepts)->validator3(Accepts)
|
||||
var validatorFut1 = newFuture[bool]() # Reject
|
||||
var validatorFut2 = newFuture[bool]() # Accept
|
||||
var validatorFut3 = newFuture[bool]() # Accept
|
||||
|
||||
proc validator1(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Reject
|
||||
validatorFut1.complete(true)
|
||||
|
||||
proc validator2(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Accept
|
||||
validatorFut2.complete(true)
|
||||
|
||||
proc validator3(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Accept
|
||||
validatorFut3.complete(true)
|
||||
|
||||
nodes[1].addValidator("foobar", validator1)
|
||||
nodes[1].addValidator("foobar", validator2)
|
||||
nodes[1].addValidator("foobar", validator3)
|
||||
|
||||
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
|
||||
|
||||
# first failed, so the rest are not executed
|
||||
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
|
||||
check (await validatorFut2.withTimeout(chronos.seconds(1))) == false
|
||||
check (await validatorFut3.withTimeout(chronos.seconds(1))) == false
|
||||
|
||||
# handler above verifies the message was not received
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
nodes[1].switch.stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "GossipSub with multiple validators (Sequential): all valid":
|
||||
var messageFut = newFuture[bool]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
# if we complete the future, means the message arrives
|
||||
messageFut.complete(true)
|
||||
|
||||
let
|
||||
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
|
||||
|
||||
# start switches
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start(),
|
||||
)
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
|
||||
await waitSubGraph(nodes, "foobar")
|
||||
|
||||
let gossip1 = GossipSub(nodes[0])
|
||||
let gossip2 = GossipSub(nodes[1])
|
||||
|
||||
# define 3 validators that are executed sequentially
|
||||
# validator1(Accept)->validator2(Accept)->validator3(Accept)
|
||||
var validatorFut1 = newFuture[bool]() # Accept
|
||||
var validatorFut2 = newFuture[bool]() # Accept
|
||||
var validatorFut3 = newFuture[bool]() # Accept
|
||||
|
||||
proc validator1(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Accept
|
||||
validatorFut1.complete(true)
|
||||
|
||||
proc validator2(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Accept
|
||||
validatorFut2.complete(true)
|
||||
|
||||
proc validator3(topic: string,
|
||||
message: Message): Future[ValidationResult] {.async.} =
|
||||
result = ValidationResult.Accept
|
||||
validatorFut3.complete(true)
|
||||
|
||||
nodes[1].addValidator("foobar", validator1)
|
||||
nodes[1].addValidator("foobar", validator2)
|
||||
nodes[1].addValidator("foobar", validator3)
|
||||
|
||||
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
|
||||
|
||||
# all validators were executed
|
||||
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
|
||||
check (await validatorFut2.withTimeout(chronos.seconds(1))) == true
|
||||
check (await validatorFut3.withTimeout(chronos.seconds(1))) == true
|
||||
|
||||
# message was indeed received by the subscriber
|
||||
check (await messageFut.withTimeout(chronos.seconds(1))) == true
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
nodes[1].switch.stop()
|
||||
)
|
||||
|
||||
await allFuturesThrowing(nodesFut.concat())
|
||||
|
||||
asyncTest "GossipSub validation should fail (ignore)":
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
check false # if we get here, it should fail
|
||||
|
||||
@@ -41,6 +41,7 @@ proc generateNodes*(
|
||||
verifySignature: bool = libp2p_pubsub_verify,
|
||||
anonymize: bool = libp2p_pubsub_anonymize,
|
||||
sign: bool = libp2p_pubsub_sign,
|
||||
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
|
||||
sendSignedPeerRecord = false,
|
||||
unsubscribeBackoff = 1.seconds,
|
||||
maxMessageSize: int = 1024 * 1024,
|
||||
@@ -54,6 +55,7 @@ proc generateNodes*(
|
||||
triggerSelf = triggerSelf,
|
||||
verifySignature = verifySignature,
|
||||
sign = sign,
|
||||
validationStrategy = validationStrategy,
|
||||
msgIdProvider = msgIdProvider,
|
||||
anonymize = anonymize,
|
||||
maxMessageSize = maxMessageSize,
|
||||
|
||||
Reference in New Issue
Block a user