Compare commits

...

2 Commits

Author SHA1 Message Date
alrevuelta
5782cf5901 Fix typo + break outer loop 2023-09-01 12:12:37 +02:00
alrevuelta
40648c106f Add validationStrategy Paralel/Seq + ordered validators 2023-09-01 11:35:49 +02:00
3 changed files with 171 additions and 20 deletions

View File

@@ -85,6 +85,9 @@ type
TopicHandler* {.public.} = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [].}
ValidationStrategy* {.pure, public.} = enum
Parallel, Sequential
ValidatorHandler* {.public.} = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe, raises: [].}
@@ -109,7 +112,8 @@ type
triggerSelf*: bool ## trigger own local handler on publish
verifySignature*: bool ## enable signature verification
sign*: bool ## enable message signing
validators*: Table[string, HashSet[ValidatorHandler]]
validators*: Table[string, OrderedSet[ValidatorHandler]]
validationStrategy*: ValidationStrategy
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
msgSeqno*: uint64
@@ -500,7 +504,7 @@ method addValidator*(p: PubSub,
## `Ignore` or `Reject` (which can descore the peer)
for t in topic:
trace "adding validator for topic", topicId = t
p.validators.mgetOrPut(t, HashSet[ValidatorHandler]()).incl(hook)
p.validators.mgetOrPut(t, OrderedSet[ValidatorHandler]()).incl(hook)
method removeValidator*(p: PubSub,
topic: varargs[string],
@@ -513,26 +517,37 @@ method removeValidator*(p: PubSub,
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
var pending: seq[Future[ValidationResult]]
trace "about to validate message"
for topic in message.topicIds:
trace "looking for validators on topic", topicId = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicId = topic
for validator in p.validators[topic]:
pending.add(validator(topic, message))
result = ValidationResult.Accept
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
result = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
if res == ValidationResult.Reject:
trace "about to validate message"
block outer:
for topic in message.topicIds:
trace "looking for validators on topic", topicId = topic,
registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topicId = topic
for validator in p.validators[topic]:
case p.validationStrategy
of ValidationStrategy.Parallel:
pending.add(validator(topic, message))
of ValidationStrategy.Sequential:
let validatorRes = await validator(topic, message)
# early break on first Reject/Ignore
if validatorRes != ValidationResult.Accept:
result = validatorRes
break outer
if p.validationStrategy == ValidationStrategy.Parallel:
let futs = await allFinished(pending)
for fut in futs:
if fut.failed:
result = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
if res == ValidationResult.Reject:
break
case result
of ValidationResult.Accept:
@@ -549,6 +564,7 @@ proc init*[PubParams: object | bool](
anonymize: bool = false,
verifySignature: bool = true,
sign: bool = true,
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
maxMessageSize: int = 1024 * 1024,
@@ -563,6 +579,7 @@ proc init*[PubParams: object | bool](
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
validationStrategy: validationStrategy,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
maxMessageSize: maxMessageSize,
@@ -575,6 +592,7 @@ proc init*[PubParams: object | bool](
anonymize: anonymize,
verifySignature: verifySignature,
sign: sign,
validationStrategy: validationStrategy,
msgIdProvider: msgIdProvider,
subscriptionValidator: subscriptionValidator,
parameters: parameters,

View File

@@ -135,6 +135,137 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub with multiple validators (Sequential): early return":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
# define 3 validators executed sequentially
# validator1(Rejects)->validator2(Accepts)->validator3(Accepts)
var validatorFut1 = newFuture[bool]() # Reject
var validatorFut2 = newFuture[bool]() # Accept
var validatorFut3 = newFuture[bool]() # Accept
proc validator1(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Reject
validatorFut1.complete(true)
proc validator2(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut2.complete(true)
proc validator3(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut3.complete(true)
nodes[1].addValidator("foobar", validator1)
nodes[1].addValidator("foobar", validator2)
nodes[1].addValidator("foobar", validator3)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
# first failed, so the rest are not executed
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
check (await validatorFut2.withTimeout(chronos.seconds(1))) == false
check (await validatorFut3.withTimeout(chronos.seconds(1))) == false
# handler above verifies the message was not received
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub with multiple validators (Sequential): all valid":
var messageFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
# if we complete the future, means the message arrives
messageFut.complete(true)
let
nodes = generateNodes(2, gossip = true, validationStrategy = ValidationStrategy.Sequential)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
# define 3 validators that are executed sequentially
# validator1(Accept)->validator2(Accept)->validator3(Accept)
var validatorFut1 = newFuture[bool]() # Accept
var validatorFut2 = newFuture[bool]() # Accept
var validatorFut3 = newFuture[bool]() # Accept
proc validator1(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut1.complete(true)
proc validator2(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut2.complete(true)
proc validator3(topic: string,
message: Message): Future[ValidationResult] {.async.} =
result = ValidationResult.Accept
validatorFut3.complete(true)
nodes[1].addValidator("foobar", validator1)
nodes[1].addValidator("foobar", validator2)
nodes[1].addValidator("foobar", validator3)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
# all validators were executed
check (await validatorFut1.withTimeout(chronos.seconds(1))) == true
check (await validatorFut2.withTimeout(chronos.seconds(1))) == true
check (await validatorFut3.withTimeout(chronos.seconds(1))) == true
# message was indeed received by the subscriber
check (await messageFut.withTimeout(chronos.seconds(1))) == true
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail (ignore)":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail

View File

@@ -41,6 +41,7 @@ proc generateNodes*(
verifySignature: bool = libp2p_pubsub_verify,
anonymize: bool = libp2p_pubsub_anonymize,
sign: bool = libp2p_pubsub_sign,
validationStrategy: ValidationStrategy = ValidationStrategy.Parallel,
sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds,
maxMessageSize: int = 1024 * 1024,
@@ -54,6 +55,7 @@ proc generateNodes*(
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
validationStrategy = validationStrategy,
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,