Compare commits

...

10 Commits

Author SHA1 Message Date
Giovanni Petrantoni
92c5225c23 macro and template fixing 2020-08-12 10:17:32 +09:00
Giovanni Petrantoni
537e3bc27d make a easy macro to wrap procs, add more keys 2020-08-11 02:06:48 +09:00
Giovanni Petrantoni
4ead4bd219 add instrumentation via optick 2020-08-11 00:06:02 +09:00
Giovanni Petrantoni
c73447cd38 rework sending, remove helpers from pubsubpeer, unify in broadcast 2020-08-09 12:18:53 +09:00
Dmitriy Ryajov
2eb5347ad2 fix tests 2020-08-08 16:57:07 -06:00
Dmitriy Ryajov
a21bfce32e fix peertable hasPeerId 2020-08-08 16:13:31 -06:00
Dmitriy Ryajov
c7f27bd361 fix tests 2020-08-08 15:44:30 -06:00
Dmitriy Ryajov
25f41dff6c properly cleanup up failed peers 2020-08-08 14:51:14 -06:00
Dmitriy Ryajov
5ab8fe8ced use join on lpstreams 2020-08-08 01:31:29 -06:00
Dmitriy Ryajov
90ad63bf1e move pubsub of off switch, pass switch into pubsub 2020-08-08 01:31:11 -06:00
22 changed files with 907 additions and 922 deletions

View File

@@ -121,7 +121,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
## triggers the connections resource cleanup
##
await conn.closeEvent.wait()
await conn.join()
trace "triggering connection cleanup"
await c.cleanupConn(conn)

View File

@@ -65,3 +65,107 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped =
except CatchableError as exc:
warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
trace "Exception details", exc = exc.msg
when defined(profiler_optick):
import dynlib
type
OptickEvent* = distinct uint64
OptickEventCtx* = distinct uint64
OptickCreateEvent* = proc(inFunctionName: cstring, inFunctionLength: uint16, inFileName: cstring, inFileNameLenght: uint16, inFileLine: uint32): OptickEvent {.cdecl.}
OptickPushEvent* = proc(event: OptickEvent): OptickEventCtx {.cdecl.}
OptickPopEvent* = proc(ctx: OptickEventCtx) {.cdecl.}
OptickStartCapture* = proc() {.cdecl.}
OptickStopCapture* = proc(filename: cstring, nameLen: uint16) {.cdecl.}
OptickNextFrame* = proc() {.cdecl.}
var
createEvent*: OptickCreateEvent
pushEvent*: OptickPushEvent
popEvent*: OptickPopEvent
startCapture*: OptickStartCapture
stopCapture*: OptickStopCapture
nextFrame*: OptickNextFrame
template profile*(name: string): untyped =
var ev {.inject.}: OptickEventCtx
defer:
{.gcsafe.}:
popEvent(ev)
{.gcsafe.}:
const pos = instantiationInfo()
let event_desc {.global.} = createEvent(name.cstring, name.len.uint16, pos.filename.cstring, pos.filename.len.uint16, pos.line.uint32)
ev = pushEvent(event_desc)
proc getName(node: NimNode): string {.compileTime.} =
case node.kind
of nnkSym:
return $node
of nnkPostfix:
return node[1].strVal
of nnkIdent:
return node.strVal
of nnkEmpty:
return "anonymous"
else:
error("Unknown name.")
macro profiled*(p: untyped): untyped =
let name = p.name.getName()
var code = newStmtList()
var keySym = genSym(nskLet)
let body = p.body
let newBody = quote do:
{.gcsafe.}:
const pos = instantiationInfo()
let event_desc {.global.} = createEvent(`name`.cstring, `name`.len.uint16, pos.filename.cstring, pos.filename.len.uint16, pos.line.uint32)
let `keySym` = pushEvent(event_desc)
defer:
popEvent(`keySym`)
`body`
p.body = newBody
p
proc load() =
var candidates: seq[string]
libCandidates("OptickCore", candidates)
for c in candidates:
let lib = loadLib("OptickCore")
if lib != nil:
createEvent = cast[OptickCreateEvent](lib.symAddr("OptickAPI_CreateEventDescription"))
pushEvent = cast[OptickPushEvent](lib.symAddr("OptickAPI_PushEvent"))
popEvent = cast[OptickPopEvent](lib.symAddr("OptickAPI_PopEvent"))
startCapture = cast[OptickStartCapture](lib.symAddr("OptickAPI_StartCapture"))
stopCapture = cast[OptickStopCapture](lib.symAddr("OptickAPI_StopCapture"))
nextFrame = cast[OptickNextFrame](lib.symAddr("OptickAPI_NextFrame"))
return
doAssert(false, "OptickCore failed to load")
load()
startCapture()
addQuitProc(proc () {.noconv.} =
stopCapture("profiled.opt", "profiled.opt".len))
# proc frameTicker() {.async.} =
# while true:
# {.gcsafe.}:
# nextFrame()
# await sleepAsync(100.millis)
# let poll_event = createEvent("poll", "poll".len, "", 0, 0)
# proc pollHook() {.async.} =
# while true:
# {.gcsafe.}:
# let ev = pushEvent(poll_event)
# await sleepAsync(0)
# defer:
# popEvent(ev)
# asyncCheck frameTicker()
# asyncCheck pollHook()
else:
macro profiled*(p: untyped): untyped =
p

View File

@@ -11,7 +11,8 @@ import strutils
import chronos, chronicles, stew/byteutils
import stream/connection,
vbuffer,
protocols/protocol
protocols/protocol,
errors
logScope:
topics = "multistream"
@@ -49,7 +50,7 @@ template validateSuffix(str: string): untyped =
proc select*(m: MultistreamSelect,
conn: Connection,
proto: seq[string]):
Future[string] {.async.} =
Future[string] {.async, profiled.} =
trace "initiating handshake", codec = m.codec
## select a remote protocol
await conn.write(m.codec) # write handshake

View File

@@ -96,7 +96,7 @@ proc newStreamInternal*(m: Mplex,
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
##
await chann.closeEvent.wait()
await chann.join()
if not isNil(chann):
m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id

View File

@@ -31,14 +31,9 @@ type
method subscribeTopic*(f: FloodSub,
topic: string,
subscribe: bool,
peerId: string) {.gcsafe, async.} =
peerId: PeerID) {.gcsafe, async.} =
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
let peer = f.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!", peer = peerId
return
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()
@@ -51,16 +46,20 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
##
procCall PubSub(f).handleDisconnect(peer)
if not(isNil(peer)) and peer.peerInfo notin f.conns:
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer)
trace "unsubscribing floodsub peer", peer = $peer
let pubSubPeer = f.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(pubSubPeer)
procCall PubSub(f).unsubscribePeer(peer)
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
@@ -77,7 +76,7 @@ method rpcHandler*(f: FloodSub,
if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache
if f.verifySignature and not msg.verify(peer.peerInfo):
if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
continue
@@ -102,7 +101,7 @@ method rpcHandler*(f: FloodSub,
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
let published = await f.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout)
trace "forwared message to peers", peers = published
@@ -118,11 +117,6 @@ method init*(f: FloodSub) =
f.handler = handler
f.codec = FloodSubCodec
method subscribePeer*(p: FloodSub,
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec)
method publish*(f: FloodSub,
topic: string,
data: seq[byte],
@@ -143,7 +137,7 @@ method publish*(f: FloodSub,
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
let published = await f.broadcast(f.floodsub.getOrDefault(topic), RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
@@ -167,8 +161,6 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.peers = initTable[string, PubSubPeer]()
f.topics = initTable[string, Topic]()
f.floodsub = initTable[string, HashSet[PubSubPeer]]()
f.seen = newTimedCache[string](2.minutes)
f.init()

View File

@@ -82,7 +82,7 @@ method init*(g: GossipSub) =
g.handler = handler
g.codec = GossipSubCodec
proc replenishFanout(g: GossipSub, topic: string) =
proc replenishFanout(g: GossipSub, topic: string) {.profiled.} =
## get fanout peers for a topic
trace "about to replenish fanout"
@@ -100,7 +100,7 @@ proc replenishFanout(g: GossipSub, topic: string) =
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
proc rebalanceMesh(g: GossipSub, topic: string) {.async, profiled.} =
logScope:
topic
@@ -155,14 +155,14 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
.set(g.mesh.peers(topic).int64, labelValues = [topic])
# Send changes to peers after table updates to avoid stale state
for p in grafts:
await p.sendGraft(@[topic])
for p in prunes:
await p.sendPrune(@[topic])
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard g.broadcast(grafts, graft, DefaultSendTimeout)
discard g.broadcast(prunes, prune, DefaultSendTimeout)
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
proc dropFanoutPeers(g: GossipSub) =
proc dropFanoutPeers(g: GossipSub) {.profiled.} =
# drop peers that we haven't published to in
# GossipSubFanoutTTL seconds
let now = Moment.now()
@@ -177,7 +177,7 @@ proc dropFanoutPeers(g: GossipSub) =
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe, profiled.} =
## gossip iHave messages to peers
##
@@ -209,14 +209,16 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
if peer in gossipPeers:
continue
if peer.id notin result:
result[peer.id] = controlMsg
if peer notin result:
result[peer] = controlMsg
result[peer.id].ihave.add(ihave)
result[peer].ihave.add(ihave)
proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning:
try:
profile "heartbeat"
trace "running heartbeat"
for t in toSeq(g.topics.keys):
@@ -229,10 +231,10 @@ proc heartbeat(g: GossipSub) {.async.} =
g.replenishFanout(t)
let peers = g.getGossipPeers()
var sent: seq[Future[void]]
var sent: seq[Future[int]]
for peer, control in peers:
g.peers.withValue(peer, pubsubPeer) do:
sent &= pubsubPeer[].send(RPCMsg(control: some(control)))
g.peers.withValue(peer.peerId, pubsubPeer) do:
sent &= g.broadcast([pubsubPeer[]], RPCMsg(control: some(control)), DefaultSendTimeout)
checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache
@@ -243,47 +245,46 @@ proc heartbeat(g: GossipSub) {.async.} =
await sleepAsync(GossipSubHeartbeatInterval)
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
method unsubscribePeer*(g: GossipSub, peer: PeerID) {.profiled.} =
## handle peer disconnects
##
procCall FloodSub(g).handleDisconnect(peer)
##
if not(isNil(peer)) and peer.peerInfo notin g.conns:
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer)
trace "unsubscribing gossipsub peer", peer = $peer
let pubSubPeer = g.peers.getOrDefault(peer)
if pubSubPeer.isNil:
return
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys):
g.mesh.removePeer(t, pubSubPeer)
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, pubSubPeer)
method subscribePeer*(p: GossipSub,
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, GossipSubCodec)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
procCall FloodSub(g).unsubscribePeer(peer)
method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peerId: string) {.gcsafe, async.} =
peerId: PeerID) {.gcsafe, async, profiled.} =
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
logScope:
peer = peerId
peer = $peerId
topic
let peer = g.peers.getOrDefault(peerId)
@@ -320,7 +321,7 @@ method subscribeTopic*(g: GossipSub,
proc handleGraft(g: GossipSub,
peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] =
grafts: seq[ControlGraft]): seq[ControlPrune] {.profiled.} =
for graft in grafts:
let topic = graft.topicID
logScope:
@@ -353,7 +354,7 @@ proc handleGraft(g: GossipSub,
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.profiled.} =
for prune in prunes:
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
@@ -364,7 +365,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handleIHave(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
ihaves: seq[ControlIHave]): ControlIWant {.profiled.} =
for ihave in ihaves:
trace "peer sent ihave",
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
@@ -376,7 +377,7 @@ proc handleIHave(g: GossipSub,
proc handleIWant(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] =
iwants: seq[ControlIWant]): seq[Message] {.profiled.} =
for iwant in iwants:
for mid in iwant.messageIDs:
trace "peer sent iwant", peer = peer.id, messageID = mid
@@ -386,7 +387,7 @@ proc handleIWant(g: GossipSub,
method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
rpcMsgs: seq[RPCMsg]) {.async, profiled.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs)
for m in rpcMsgs: # for all RPC messages
@@ -404,7 +405,7 @@ method rpcHandler*(g: GossipSub,
g.seen.put(msgId) # add the message to the seen cache
if g.verifySignature and not msg.verify(peer.peerInfo):
if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification"
continue
@@ -437,7 +438,7 @@ method rpcHandler*(g: GossipSub,
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
let published = await g.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout)
trace "forwared message to peers", peers = published
@@ -454,8 +455,7 @@ method rpcHandler*(g: GossipSub,
respControl.ihave.len > 0:
try:
info "sending control message", msg = respControl
await peer.send(
RPCMsg(control: some(respControl), messages: messages))
let _ = await g.broadcast([peer], RPCMsg(control: some(respControl), messages: messages), DefaultSendTimeout)
except CancelledError as exc:
raise exc
except CatchableError as exc:
@@ -463,12 +463,12 @@ method rpcHandler*(g: GossipSub,
method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) {.async.} =
handler: TopicHandler) {.async, profiled.} =
await procCall PubSub(g).subscribe(topic, handler)
await g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} =
topics: seq[TopicPair]) {.async, profiled.} =
await procCall PubSub(g).unsubscribe(topics)
for (topic, handler) in topics:
@@ -478,27 +478,23 @@ method unsubscribe*(g: GossipSub,
let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic)
var pending = newSeq[Future[void]]()
for peer in peers:
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard g.broadcast(peers, prune, DefaultSendTimeout)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
method unsubscribeAll*(g: GossipSub, topic: string) {.async, profiled.} =
await procCall PubSub(g).unsubscribeAll(topic)
if topic in g.mesh:
let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic)
var pending = newSeq[Future[void]]()
for peer in peers:
pending.add(peer.sendPrune(@[topic]))
checkFutures(await allFinished(pending))
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard g.broadcast(peers, prune, DefaultSendTimeout)
method publish*(g: GossipSub,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
timeout: Duration = InfiniteDuration): Future[int] {.async, profiled.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog
@@ -534,7 +530,7 @@ method publish*(g: GossipSub,
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
let published = await g.publishHelper(peers, @[msg], timeout)
let published = await g.broadcast(peers, RPCMsg(messages: @[msg]), timeout)
when defined(libp2p_expensive_metrics):
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@@ -13,10 +13,10 @@ import pubsubpeer, ../../peerid
type
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
let peers = toSeq(t.getOrDefault(topic))
peers.any do (peer: PubSubPeer) -> bool:
peer.id == peerId
peer.peerId == peerId
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added,

View File

@@ -11,6 +11,7 @@ import std/[tables, sequtils, sets]
import chronos, chronicles, metrics
import pubsubpeer,
rpc/[message, messages],
../../switch,
../protocol,
../../stream/connection,
../../peerid,
@@ -47,61 +48,76 @@ type
handler*: seq[TopicHandler]
PubSub* = ref object of LPProtocol
switch*: Switch # the switch used to dial/connect to peers
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
peers*: Table[PeerID, PubSubPeer] # peerid to peer map
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
cleanupLock: AsyncLock
validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64
lifetimeFut*: Future[void] # pubsub liftime future
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## handle peer disconnects
##
if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id
peer.onConnect.fire() # Make sure all pending sends are unblocked
p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
trace "unsubscribing pubsub peer", peer = $peerId
if peerId in p.peers:
p.peers.del(peerId)
proc onConnClose(p: PubSub, conn: Connection) {.async.} =
try:
let peer = conn.peerInfo
await conn.closeEvent.wait()
libp2p_pubsub_peers.set(p.peers.len.int64)
if peer in p.conns:
p.conns[peer].excl(conn)
if p.conns[peer].len <= 0:
p.conns.del(peer)
proc broadcast*(p: PubSub,
sendPeers: HashSet[PubSubPeer] | seq[PubSubPeer] | array[1, PubSubPeer],
msg: RPCMsg,
timeout: Duration): Future[int] {.async.} =
profile "broadcast"
if peer.id in p.peers:
p.handleDisconnect(p.peers[peer.id])
# send messages and cleanup failed peers
var sent: seq[tuple[id: PeerID, fut: Future[void]]]
for sendPeer in sendPeers:
if sendPeer.isNil:
continue
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in onConnClose handler", exc = exc.msg
# avoid sending to self
if sendPeer.peerId == p.peerInfo.peerId:
continue
trace "broadcast to peer", peer = sendPeer.id, message = msg
sent.add((id: sendPeer.peerId, fut: sendPeer.send(msg, timeout)))
var broadcasted: seq[PeerID]
var failed: seq[PeerID]
let futs = await allFinished(sent.mapIt(it.fut))
for s in futs:
let f = sent.filterIt(it.fut == s)
if f.len > 0:
if s.failed:
trace "broadcast to peer failed", peer = f[0].id
p.unsubscribePeer(f[0].id)
failed.add(f[0].id)
else:
trace "broadcast to peer succeeded", peer = f[0].id
broadcasted.add(f[0].id)
return broadcasted.len
proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: seq[string],
subscribe: bool) {.async.} =
subscribe: bool) {.async, profiled.} =
## send subscriptions to remote peer
asyncCheck peer.sendSubOpts(topics, subscribe)
discard await p.broadcast([peer], RPCMsg(subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), DefaultSendTimeout)
method subscribeTopic*(p: PubSub,
topic: string,
subscribe: bool,
peerId: string) {.base, async.} =
peerId: PeerID) {.base, async.} =
# called when remote peer subscribes to a topic
discard
@@ -116,24 +132,24 @@ method rpcHandler*(p: PubSub,
if m.subscriptions.len > 0: # if there are any subscriptions
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
trace "about to subscribe to topic", topicId = s.topic
await p.subscribeTopic(s.topic, s.subscribe, peer.id)
await p.subscribeTopic(s.topic, s.subscribe, peer.peerId)
proc getOrCreatePeer(p: PubSub,
peerInfo: PeerInfo,
proto: string): PubSubPeer =
if peerInfo.id in p.peers:
return p.peers[peerInfo.id]
proc getOrCreatePeer*(
p: PubSub,
peer: PeerID,
proto: string): PubSubPeer {.profiled.} =
if peer in p.peers:
return p.peers[peer]
# create new pubsub peer
let peer = newPubSubPeer(peerInfo, proto)
trace "created new pubsub peer", peerId = peer.id
let pubSubPeer = newPubSubPeer(peer, p.switch, proto)
trace "created new pubsub peer", peerId = $peer
p.peers[peer.id] = peer
peer.observers = p.observers
p.peers[peer] = pubSubPeer
pubSubPeer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64)
return peer
return pubSubPeer
method handleConn*(p: PubSub,
conn: Connection,
@@ -154,19 +170,11 @@ method handleConn*(p: PubSub,
await conn.close()
return
# track connection
p.conns.mgetOrPut(conn.peerInfo,
initHashSet[Connection]())
.incl(conn)
asyncCheck p.onConnClose(conn)
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
# call pubsub rpc handler
await p.rpcHandler(peer, msgs)
let peer = p.getOrCreatePeer(conn.peerInfo, proto)
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
if p.topics.len > 0:
await p.sendSubs(peer, toSeq(p.topics.keys), true)
@@ -181,32 +189,16 @@ method handleConn*(p: PubSub,
finally:
await conn.close()
method subscribePeer*(p: PubSub, conn: Connection) {.base.} =
if not(isNil(conn)):
trace "subscribing to peer", peerId = conn.peerInfo.id
method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
## subscribe to remote peer to receive/send pubsub
## messages
##
# track connection
p.conns.mgetOrPut(conn.peerInfo,
initHashSet[Connection]())
.incl(conn)
let pubsubPeer = p.getOrCreatePeer(peer, p.codec)
if p.topics.len > 0:
asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
asyncCheck p.onConnClose(conn)
let peer = p.getOrCreatePeer(conn.peerInfo, p.codec)
if not peer.connected:
peer.conn = conn
method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
if peerInfo.id in p.peers:
let peer = p.peers[peerInfo.id]
trace "unsubscribing from peer", peerId = $peerInfo
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
proc connected*(p: PubSub, peerId: PeerID): bool =
p.peers.withValue($peerId, peer):
return peer[] != nil and peer[].connected
pubsubPeer.subscribed = true
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
@@ -261,40 +253,6 @@ method subscribe*(p: PubSub,
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message],
timeout: Duration): Future[int] {.async.} =
# send messages and cleanup failed peers
var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers:
# avoid sending to self
if sendPeer.peerInfo == p.peerInfo:
continue
trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
var published: seq[string]
var failed: seq[string]
let futs = await allFinished(sent.mapIt(it.fut))
for s in futs:
let f = sent.filterIt(it.fut == s)
if f.len > 0:
if s.failed:
trace "sending messages to peer failed", peer = f[0].id
failed.add(f[0].id)
else:
trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id)
for f in failed:
let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
return published.len
method publish*(p: PubSub,
topic: string,
data: seq[byte],
@@ -331,7 +289,7 @@ method stop*(p: PubSub) {.async, base.} =
method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
hook: ValidatorHandler) {.base, profiled.} =
for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
@@ -341,12 +299,12 @@ method addValidator*(p: PubSub,
method removeValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
hook: ValidatorHandler) {.base, profiled.} =
for t in topic:
if t in p.validators:
p.validators[t].excl(hook)
method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
method validate*(p: PubSub, message: Message): Future[bool] {.async, base, profiled.} =
var pending: seq[Future[bool]]
trace "about to validate message"
for topic in message.topicIDs:
@@ -364,16 +322,20 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
else:
libp2p_pubsub_validation_failure.inc()
proc newPubSub*(P: typedesc[PubSub],
peerInfo: PeerInfo,
triggerSelf: bool = false,
verifySignature: bool = true,
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P =
result = P(peerInfo: peerInfo,
proc init*(
P: typedesc[PubSub],
switch: Switch,
triggerSelf: bool = false,
verifySignature: bool = true,
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider): P =
result = P(switch: switch,
peerInfo: switch.peerInfo,
triggerSelf: triggerSelf,
verifySignature: verifySignature,
sign: sign,
peers: initTable[PeerID, PubSubPeer](),
topics: initTable[string, Topic](),
cleanupLock: newAsyncLock(),
msgIdProvider: msgIdProvider)
result.initPubSub()
@@ -385,6 +347,3 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)
proc connected*(p: PubSub, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
peerInfo != nil and connected(p, peerInfo.peerId)

View File

@@ -11,6 +11,7 @@ import std/[hashes, options, sequtils, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
timedcache,
../../switch,
../../peerid,
../../peerinfo,
../../stream/connection,
@@ -28,7 +29,6 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const
DefaultReadTimeout* = 1.minutes
DefaultSendTimeout* = 10.seconds
type
@@ -37,14 +37,16 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from
switch*: Switch # switch instance to dial peers
codec*: string # the protocol that this peer joined from
sendConn: Connection
peerInfo*: PeerInfo
peerId*: PeerID
handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer
sendLock*: AsyncLock # send connection lock
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@@ -52,19 +54,13 @@ func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash
proc id*(p: PubSubPeer): string = p.peerInfo.id
proc id*(p: PubSubPeer): string =
doAssert(not p.isNil, "nil pubsubpeer")
p.peerId.pretty
proc connected*(p: PubSubPeer): bool =
not(isNil(p.sendConn))
proc `conn=`*(p: PubSubPeer, conn: Connection) =
if not(isNil(conn)):
trace "attaching send connection for peer", peer = p.id
p.sendConn = conn
p.onConnect.fire()
proc conn*(p: PubSubPeer): Connection =
p.sendConn
not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof)
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
@@ -83,12 +79,13 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
logScope:
peer = p.id
debug "starting pubsub read loop for peer", closed = conn.closed
try:
try:
while not conn.atEof:
trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout)
let data = await conn.readLp(64 * 1024)
let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache:
@@ -124,12 +121,12 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
raise exc
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
proc send*(
p: PubSubPeer,
msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} =
logScope:
peer = p.id
rpcMsg = shortLog(msg)
@@ -155,90 +152,54 @@ proc send*(
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return
proc sendToRemote() {.async.} =
logScope:
peer = p.id
rpcMsg = shortLog(msg)
trace "about to send message"
if not p.onConnect.isSet:
await p.onConnect.wait()
if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
trace "about to send message"
if not p.connected:
try:
await p.sendLock.acquire()
trace "no send connection, dialing peer"
# get a send connection if there is none
p.sendConn = await p.switch.dial(
p.peerId, p.codec)
if not p.connected:
raise newException(CatchableError, "unable to get send pubsub stream")
# install a reader on the send connection
asyncCheck p.handle(p.sendConn)
finally:
if p.sendLock.locked:
p.sendLock.release()
trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded).wait(timeout)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:
sendFut.cancel()
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()
raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
try:
await p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
# the long timeout is mostly for cases where
# the connection is flaky at the beggingin
timeout = 3.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending subscriptions", exc = exc.msg
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending graft to peer", peer = p.id, topicIDs = topics
try:
await p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending grafts", exc = exc.msg
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune to peer", peer = p.id, topicIDs = topics
try:
await p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending prunes", exc = exc.msg
proc `$`*(p: PubSubPeer): string =
p.id
proc newPubSubPeer*(peerInfo: PeerInfo,
proto: string): PubSubPeer =
proc newPubSubPeer*(peerId: PeerID,
switch: Switch,
codec: string): PubSubPeer =
new result
result.proto = proto
result.peerInfo = peerInfo
result.switch = switch
result.codec = codec
result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes)
result.recvdRpcCache = newTimedCache[string](2.minutes)
result.onConnect = newAsyncEvent()
result.sendLock = newAsyncLock()

View File

@@ -10,7 +10,8 @@
{.push raises: [Defect].}
import chronicles, metrics, stew/[byteutils, endians2]
import ./messages, ./protobuf,
import ./messages,
./protobuf,
../../../peerid,
../../../peerinfo,
../../../crypto/crypto,
@@ -32,7 +33,7 @@ func defaultMsgIdProvider*(m: Message): string =
proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] =
ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc verify*(m: Message, p: PeerInfo): bool =
proc verify*(m: Message, p: PeerID): bool =
if m.signature.len > 0 and m.key.len > 0:
var msg = m
msg.signature = @[]
@@ -51,17 +52,17 @@ proc verify*(m: Message, p: PeerInfo): bool =
proc init*(
T: type Message,
p: PeerInfo,
peer: PeerInfo,
data: seq[byte],
topic: string,
seqno: uint64,
sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} =
result = Message(
fromPeer: p.peerId,
fromPeer: peer.peerId,
data: data,
seqno: @(seqno.toBytesBE), # unefficient, fine for now
topicIDs: @[topic])
if sign and p.publicKey.isSome:
result.signature = sign(result, p).tryGet()
result.key = p.publicKey.get().getBytes().tryGet()
if sign and peer.publicKey.isSome:
result.signature = sign(result, peer).tryGet()
result.key = peer.publicKey.get().getBytes().tryGet()

View File

@@ -60,7 +60,7 @@ proc handleConn*(s: Secure,
initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator)
if not isNil(sconn):
conn.closeEvent.wait()
conn.join()
.addCallback do(udata: pointer = nil):
asyncCheck sconn.close()

View File

@@ -1,16 +1,9 @@
# compile time options here
const
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import
options, tables, chronos, bearssl,
switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, mplex/types],
protocols/[identify, secure/secure],
protocols/pubsub/[pubsub, gossipsub, floodsub],
protocols/pubsub/rpc/message
protocols/[identify, secure/secure]
import
protocols/secure/noise,
@@ -26,17 +19,12 @@ type
proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
triggerSelf = false,
gossip = false,
secureManagers: openarray[SecureProtocol] = [
# array cos order matters
SecureProtocol.Secio,
SecureProtocol.Noise,
],
verifySignature = libp2p_pubsub_verify,
sign = libp2p_pubsub_sign,
transportFlags: set[ServerFlags] = {},
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
rng = newRng(),
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): Switch =
@@ -66,25 +54,11 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
of SecureProtocol.Secio:
secureManagerInstances &= newSecio(rng, seckey).Secure
let pubSub = if gossip:
newPubSub(GossipSub,
peerInfo = peerInfo,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
else:
newPubSub(FloodSub,
peerInfo = peerInfo,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
newSwitch(
let switch = newSwitch(
peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagerInstances,
pubSub = some(pubSub))
secureManagers = secureManagerInstances)
return switch

View File

@@ -45,7 +45,7 @@ template withExceptions(body: untyped) =
raise exc
except TransportIncompleteError:
# for all intents and purposes this is an EOF
raise newLPStreamEOFError()
raise newLPStreamIncompleteError()
except TransportLimitError:
raise newLPStreamLimitError()
except TransportUseClosedError:

View File

@@ -12,7 +12,8 @@ import chronicles, chronos, metrics
import ../varint,
../vbuffer,
../peerinfo,
../multiaddress
../multiaddress,
../errors
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
@@ -98,7 +99,7 @@ method readOnce*(s: LPStream,
proc readExactly*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[void] {.async.} =
Future[void] {.async, profiled.} =
if s.atEof:
raise newLPStreamEOFError()
@@ -146,7 +147,7 @@ proc readLine*(s: LPStream,
if len(result) == lim:
break
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe, profiled.} =
var
varint: uint64
length: int
@@ -162,7 +163,7 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, profiled.} =
## read length prefixed msg, with the length encoded as a varint
let
length = await s.readVarint()
@@ -191,7 +192,7 @@ method write*(s: LPStream, msg: seq[byte]) {.base, async.} =
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
proc write*(s: LPStream, msg: string): Future[void] =
proc write*(s: LPStream, msg: string): Future[void] {.profiled.} =
s.write(@(toOpenArrayByte(msg, 0, msg.high)))
# TODO: split `close` into `close` and `dispose/destroy`

View File

@@ -25,7 +25,6 @@ import stream/connection,
protocols/secure/secure,
peerinfo,
protocols/identify,
protocols/pubsub/pubsub,
muxers/muxer,
connmanager,
peerid,
@@ -44,9 +43,6 @@ declareCounter(libp2p_dialed_peers, "dialed peers")
declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrade, "peers failed upgrade")
const
MaxPubsubReconnectAttempts* = 10
type
NoPubSubException* = object of CatchableError
@@ -77,13 +73,8 @@ type
identity*: Identify
streamHandler*: StreamHandler
secureManagers*: seq[Secure]
pubSub*: Option[PubSub]
dialLock: Table[PeerID, AsyncLock]
ConnEvents: Table[ConnEventKind, HashSet[ConnEventHandler]]
pubsubMonitors: Table[PeerId, Future[void]]
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) =
@@ -110,23 +101,6 @@ proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsa
warn "exception in trigger ConnEvents", exc = exc.msg
proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try:
await conn.closeEvent.wait()
trace "about to cleanup pubsub peer"
if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
if not(isNil(fut)) and not(fut.finished):
fut.cancel()
await s.pubSub.get().unsubscribePeer(conn.peerInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception cleaning pubsub peer", exc = exc.msg
proc isConnected*(s: Switch, peerId: PeerID): bool =
## returns true if the peer has one or more
@@ -294,7 +268,8 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
proc internalConnect(s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} =
logScope: peer = peerId
logScope:
peer = peerId
if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!")
@@ -352,11 +327,11 @@ proc internalConnect(s: Switch,
libp2p_failed_upgrade.inc()
raise exc
doAssert not isNil(upgraded), "checked in upgradeOutgoing"
doAssert not isNil(upgraded), "connection died after upgradeOutgoing"
s.connManager.storeOutgoing(upgraded)
trace "dial successful",
oid = $conn.oid,
oid = $upgraded.oid,
peerInfo = shortLog(upgraded.peerInfo)
conn = upgraded
@@ -381,14 +356,31 @@ proc internalConnect(s: Switch,
# unworthy and disconnects it
raise newException(CatchableError, "Connection closed during handshake")
asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(peerId)
return conn
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
discard await s.internalConnect(peerId, addrs)
proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} =
trace "Attempting to select remote", proto = proto,
streamOid = $stream.oid,
oid = $stream.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
proc dial*(s: Switch,
peerId: PeerID,
proto: string): Future[Connection] {.async.} =
let stream = await s.connmanager.getMuxedStream(peerId)
if stream.isNil:
raise newException(CatchableError, "Couldn't get muxed stream")
return await s.negotiateStream(stream, proto)
proc dial*(s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress],
@@ -409,14 +401,7 @@ proc dial*(s: Switch,
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
trace "Attempting to select remote", proto = proto,
streamOid = $stream.oid,
oid = $conn.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
return await s.negotiateStream(stream, proto)
except CancelledError as exc:
trace "dial canceled"
await cleanup()
@@ -458,21 +443,12 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
s.peerInfo.addrs[i] = t.ma # update peer's address
startFuts.add(server)
if s.pubSub.isSome:
await s.pubSub.get().start()
debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} =
trace "stopping switch"
# we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up
if s.pubSub.isSome:
await s.pubSub.get().stop()
# close and cleanup all connections
await s.connManager.close()
@@ -486,139 +462,6 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
## Subscribe to pub sub peer
##
if s.pubSub.isSome and not s.pubSub.get().connected(peerId):
trace "about to subscribe to pubsub peer", peer = peerId
var stream: Connection
try:
stream = await s.connManager.getMuxedStream(peerId)
if isNil(stream):
trace "unable to subscribe to peer", peer = peerId
return
if not await s.ms.select(stream, s.pubSub.get().codec):
if not(isNil(stream)):
trace "couldn't select pubsub", codec = s.pubSub.get().codec
await stream.close()
return
s.pubSub.get().subscribePeer(stream)
await stream.closeEvent.wait()
except CancelledError as exc:
if not(isNil(stream)):
await stream.close()
raise exc
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerId,
exc = exc.msg
if not(isNil(stream)):
await stream.close()
proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} =
## while peer connected maintain a
## pubsub connection as well
##
while s.isConnected(peerId):
try:
trace "subscribing to pubsub peer", peer = peerId
await s.subscribePeerInternal(peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = peerId, exc = exc.msg
finally:
trace "sleeping before trying pubsub peer", peer = peerId
await sleepAsync(1.seconds) # allow the peer to cooldown
trace "exiting pubsub monitor", peer = peerId
proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
## Waits until ``server`` is not closed.
##
var retFuture = newFuture[void]("stream.transport.server.join")
let pubsubFut = s.pubsubMonitors.mgetOrPut(
peerId, s.pubsubMonitor(peerId))
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
pubsubFut.removeCallback(continuation, cast[pointer](retFuture))
if not(pubsubFut.finished()):
pubsubFut.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
proc subscribe*(s: Switch, topic: string,
handler: TopicHandler) {.async.} =
## subscribe to a pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]) {.async.} =
## unsubscribe from topics
##
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().unsubscribe(topics)
proc unsubscribeAll*(s: Switch, topic: string) {.async.} =
## unsubscribe from topics
if s.pubSub.isNone:
raise newNoPubSubException()
await s.pubSub.get().unsubscribeAll(topic)
proc publish*(s: Switch,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
## pubslish to pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
return await s.pubSub.get().publish(topic, data, timeout)
proc addValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
## add validator
##
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().addValidator(topics, hook)
proc removeValidator*(s: Switch,
topics: varargs[string],
hook: ValidatorHandler) =
## pubslish to pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
s.pubSub.get().removeValidator(topics, hook)
proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
var stream = await muxer.newStream()
defer:
@@ -654,10 +497,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true))
# try establishing a pubsub connection
asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(peerId)
except CancelledError as exc:
await muxer.close()
raise exc
@@ -670,8 +509,7 @@ proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
identity: Identify,
muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [],
pubSub: Option[PubSub] = none(PubSub)): Switch =
secureManagers: openarray[Secure] = []): Switch =
if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager")
@@ -704,24 +542,21 @@ proc newSwitch*(peerInfo: PeerInfo,
val.muxerHandler = proc(muxer: Muxer): Future[void] =
s.muxerHandler(muxer)
if pubSub.isSome:
result.pubSub = pubSub
result.mount(pubSub.get())
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool
{.deprecated: "Use PeerID version".} =
not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version", gcsafe.} =
disconnect(s, peerInfo.peerId)
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version".} =
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version".} =
connect(s, peerInfo.peerId, peerInfo.addrs)
proc dial*(s: Switch,
peerInfo: PeerInfo,
proto: string):
Future[Connection] {.deprecated: "Use PeerID version".} =
Future[Connection]
{.deprecated: "Use PeerID version".} =
dial(s, peerInfo.peerId, peerInfo.addrs, proto)
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
subscribePeer(s, peerInfo.peerId)

View File

@@ -29,9 +29,9 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# turn things deterministic
# this is for testing purposes only
var ceil = 15
let fsub = cast[FloodSub](sender.pubSub.get())
let fsub = cast[FloodSub](sender)
while not fsub.floodsub.hasKey(key) or
not fsub.floodsub.hasPeerID(key, receiver.peerInfo.id):
not fsub.floodsub.hasPeerID(key, receiver.peerInfo.peerId):
await sleepAsync(100.millis)
dec ceil
doAssert(ceil > 0, "waitSub timeout!")
@@ -43,7 +43,7 @@ suite "FloodSub":
check tracker.isLeaked() == false
test "FloodSub basic publish/subscribe A -> B":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@@ -51,19 +51,32 @@ suite "FloodSub":
let
nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(
nodes[0].start(),
nodes[1].start()
nodes[0].switch.start(),
nodes[1].switch.start(),
)
let subscribes = await subscribeNodes(nodes)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await completionFut.wait(5.seconds)) == true
result = await completionFut.wait(5.seconds)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
@@ -71,53 +84,80 @@ suite "FloodSub":
)
await allFuturesThrowing(nodesFut.concat())
await allFuturesThrowing(subscribes)
check:
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub basic publish/subscribe B -> A":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
completionFut.complete(true)
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2)
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await waitSub(nodes[1], nodes[0], "foobar")
check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0
result = await completionFut.wait(5.seconds)
check (await completionFut.wait(5.seconds)) == true
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
check:
waitFor(runTests()) == true
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub validation should succeed":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -131,30 +171,44 @@ suite "FloodSub":
nodes[1].addValidator("foobar", validator)
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await handlerFut) == true
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
await allFuturesThrowing(nodesFut)
check:
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub validation should fail":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2)
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -168,30 +222,44 @@ suite "FloodSub":
discard await nodes[0].publish("foobar", "Hello!".toBytes())
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
await allFuturesThrowing(nodesFut)
check:
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
handlerFut.complete(true)
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2)
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
await nodes[1].subscribe("bar", handler)
@@ -210,57 +278,21 @@ suite "FloodSub":
check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
,size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
await allFuturesThrowing(nodesFut)
check:
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub multiple peers, no self trigger":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@@ -279,15 +311,12 @@ suite "FloodSub":
counter
)
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<runs:
nodes.add newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let
nodes = generateNodes(runs, triggerSelf = false)
nodesFut = nodes.mapIt(it.switch.start())
var awaitters: seq[Future[void]]
for i in 0..<runs:
awaitters.add(await nodes[i].start())
let subscribes = await subscribeNodes(nodes)
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1])
@@ -305,17 +334,18 @@ suite "FloodSub":
await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
await allFuturesThrowing(nodesFut)
result = true
check:
waitFor(runTests()) == true
waitFor(runTests())
test "FloodSub multiple peers, with self trigger":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@@ -329,21 +359,17 @@ suite "FloodSub":
(proc(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
inc counter[]
if counter[] == runs:
if counter[] == runs - 1:
fut.complete()),
counter
)
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true, secureManagers = [SecureProtocol.Secio])
let
nodes = generateNodes(runs, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
var awaitters: seq[Future[void]]
for i in 0..<runs:
awaitters.add(await nodes[i].start())
let subscribes = await subscribeNodes(nodes)
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
for i in 0..<runs:
await nodes[i].subscribe("foobar", futs[i][1])
@@ -361,12 +387,12 @@ suite "FloodSub":
await allFuturesThrowing(pubs)
await allFuturesThrowing(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
await allFuturesThrowing(nodesFut)
result = true
check:
waitFor(runTests()) == true
waitFor(runTests())

View File

@@ -4,6 +4,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
import unittest, bearssl
import stew/byteutils
import ../../libp2p/standard_setup
import ../../libp2p/errors
import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
@@ -26,7 +27,7 @@ suite "GossipSub internal":
test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
@@ -38,9 +39,8 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
peer.conn = conn
gossipSub.peers[peerInfo.id] = peer
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer)
check gossipSub.peers.len == 15
@@ -48,7 +48,7 @@ suite "GossipSub internal":
check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
@@ -56,7 +56,7 @@ suite "GossipSub internal":
test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
@@ -69,9 +69,8 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
peer.conn = conn
gossipSub.peers[peerInfo.id] = peer
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15
@@ -79,6 +78,7 @@ suite "GossipSub internal":
check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -87,7 +87,7 @@ suite "GossipSub internal":
test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -101,7 +101,7 @@ suite "GossipSub internal":
conns &= conn
var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
gossipSub.gossipsub[topic].incl(peer)
@@ -110,6 +110,7 @@ suite "GossipSub internal":
check gossipSub.fanout[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -118,7 +119,7 @@ suite "GossipSub internal":
test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -134,7 +135,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
gossipSub.fanout[topic].incl(peer)
@@ -144,6 +145,7 @@ suite "GossipSub internal":
check topic notin gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -152,7 +154,7 @@ suite "GossipSub internal":
test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -171,7 +173,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peer)
@@ -184,6 +186,7 @@ suite "GossipSub internal":
check topic2 in gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -192,7 +195,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -209,7 +212,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
@@ -222,7 +225,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
gossipSub.gossipsub[topic].incl(peer)
@@ -244,10 +247,11 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD
for p in peers.keys:
check not gossipSub.fanout.hasPeerID(topic, p)
check not gossipSub.mesh.hasPeerID(topic, p)
check not gossipSub.fanout.hasPeerID(topic, p.peerId)
check not gossipSub.mesh.hasPeerID(topic, p.peerId)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -256,7 +260,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -270,7 +274,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec)
peer.handler = handler
if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer)
@@ -292,6 +296,7 @@ suite "GossipSub internal":
check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -300,7 +305,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in fanout":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -314,7 +319,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
peer.handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
@@ -336,6 +341,7 @@ suite "GossipSub internal":
check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
@@ -344,7 +350,7 @@ suite "GossipSub internal":
test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} =
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
discard
@@ -358,7 +364,7 @@ suite "GossipSub internal":
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo, GossipSubCodec)
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec)
peer.handler = handler
if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer)
@@ -380,6 +386,7 @@ suite "GossipSub internal":
check peers.len == 0
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true

View File

@@ -33,13 +33,13 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# this is for testing purposes only
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
var ceil = 15
let fsub = GossipSub(sender.pubSub.get())
let fsub = GossipSub(sender)
while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and
not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.mesh.hasKey(key) or
not fsub.mesh.hasPeerID(key, receiver.peerInfo.id)) and
not fsub.mesh.hasPeerID(key, receiver.peerInfo.peerId)) and
(not fsub.fanout.hasKey(key) or
not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)):
not fsub.fanout.hasPeerID(key , receiver.peerInfo.peerId)):
trace "waitSub sleeping..."
await sleepAsync(1.seconds)
dec ceil
@@ -63,18 +63,29 @@ suite "GossipSub":
check tracker.isLeaked() == false
test "GossipSub validation should succeed":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2, gossip = true)
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@@ -90,35 +101,44 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut)
check (await validatorFut) and (await handlerFut)
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(runTests()) == true
waitFor(runTests())
test "GossipSub validation should fail":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2, gossip = true)
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@@ -133,37 +153,54 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await validatorFut
check (await validatorFut) == true
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(runTests()) == true
waitFor(runTests())
test "GossipSub validation one fails and one succeeds":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
handlerFut.complete(true)
var nodes = generateNodes(2, true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foo", handler)
await nodes[1].subscribe("bar", handler)
@@ -182,10 +219,11 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
result = ((await passed) and (await failed) and (await handlerFut))
check ((await passed) and (await failed) and (await handlerFut))
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
"foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1
"foo" notin gossip2.mesh and "foo" notin gossip2.fanout
@@ -193,104 +231,95 @@ suite "GossipSub":
"bar" notin gossip2.mesh and "bar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "GossipSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2, gossip = true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
, size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
nodes[1].stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(runTests()) == true
waitFor(runTests())
test "e2e - GossipSub should add remote peer topic subscriptions":
proc testBasicGossipSub(): Future[bool] {.async.} =
proc testBasicGossipSub() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2:
nodes.add newStandardSwitch(gossip = true,
secureManagers = [SecureProtocol.Noise])
let
nodes = generateNodes(
2,
gossip = true,
secureManagers = [SecureProtocol.Noise])
var awaitters: seq[Future[void]]
for node in nodes:
awaitters.add(await node.start())
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await sleepAsync(10.seconds)
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
check:
"foobar" in gossip2.topics
"foobar" in gossip1.gossipsub
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id)
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId)
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
result = true
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(testBasicGossipSub()) == true
waitFor(testBasicGossipSub())
test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc testBasicGossipSub(): Future[bool] {.async.} =
proc testBasicGossipSub() {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes: seq[Switch] = newSeq[Switch]()
for i in 0..<2:
nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Secio])
let
nodes = generateNodes(
2,
gossip = true,
secureManagers = [SecureProtocol.Secio])
var awaitters: seq[Future[void]]
for node in nodes:
awaitters.add(await node.start())
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
let subscribes = await subscribeNodes(nodes)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@@ -302,8 +331,8 @@ suite "GossipSub":
await allFuturesThrowing(subs)
let
gossip1 = GossipSub(nodes[0].pubSub.get())
gossip2 = GossipSub(nodes[1].pubSub.get())
gossip1 = GossipSub(nodes[0])
gossip2 = GossipSub(nodes[1])
check:
"foobar" in gossip1.topics
@@ -312,35 +341,53 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.id) or
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId) or
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.id) or
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id)
gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.peerId) or
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId)
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
result = true
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(testBasicGossipSub()) == true
waitFor(testBasicGossipSub())
test "e2e - GossipSub send over fanout A -> B":
proc runTests(): Future[bool] {.async.} =
proc runTests() {.async.} =
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
passed.complete()
var nodes = generateNodes(2, true)
var wait = newSeq[Future[void]]()
wait.add(await nodes[0].start())
wait.add(await nodes[1].start())
let
nodes = generateNodes(
2,
gossip = true,
secureManagers = [SecureProtocol.Secio])
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@@ -353,18 +400,19 @@ suite "GossipSub":
obs2 = PubSubObserver(onSend: proc(peer: PubSubPeer; msgs: var RPCMsg) =
inc observed
)
nodes[1].pubsub.get().addObserver(obs1)
nodes[0].pubsub.get().addObserver(obs2)
# nodes[1].addObserver(obs1)
# nodes[0].addObserver(obs2)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get())
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get())
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
check:
"foobar" in gossip1.gossipsub
gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id)
not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId)
not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
await passed.wait(2.seconds)
@@ -373,14 +421,20 @@ suite "GossipSub":
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(subscribes)
await allFuturesThrowing(wait)
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
check observed == 2
result = true
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
check:
waitFor(runTests()) == true
await allFuturesThrowing(nodesFut.concat())
# check observed == 2
waitFor(runTests())
test "e2e - GossipSub send over mesh A -> B":
proc runTests(): Future[bool] {.async.} =
@@ -389,12 +443,26 @@ suite "GossipSub":
check topic == "foobar"
passed.complete(true)
var nodes = generateNodes(2, true)
var wait: seq[Future[void]]
wait.add(await nodes[0].start())
wait.add(await nodes[1].start())
let
nodes = generateNodes(
2,
gossip = true,
secureManagers = [SecureProtocol.Secio])
let subscribes = await subscribeNodes(nodes)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
@@ -404,39 +472,42 @@ suite "GossipSub":
result = await passed
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get())
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get())
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
check:
"foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id)
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id)
not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.id)
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId)
not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId)
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId)
not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.peerId)
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(subscribes)
await allFuturesThrowing(wait)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(runTests()) == true
test "e2e - GossipSub with multiple peers":
proc runTests(): Future[bool] {.async.} =
var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]]
proc runTests() {.async.} =
var runs = 10
for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true,
gossip = true,
secureManagers = [SecureProtocol.Noise])
awaitters.add((await nodes[i].start()))
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let subscribes = await subscribeRandom(nodes)
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int]
var subs: seq[Future[void]]
@@ -468,34 +539,33 @@ suite "GossipSub":
check: v >= 1
for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get())
var gossip = GossipSub(node)
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
await allFuturesThrowing(nodesFut)
check:
waitFor(runTests()) == true
waitFor(runTests())
test "e2e - GossipSub with multiple peers (sparse)":
proc runTests(): Future[bool] {.async.} =
var nodes: seq[Switch] = newSeq[Switch]()
var awaitters: seq[Future[void]]
proc runTests() {.async.} =
var runs = 10
for i in 0..<runs:
nodes.add newStandardSwitch(triggerSelf = true,
gossip = true,
secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start()))
let
nodes = generateNodes(runs, gossip = true, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())
let subscribes = await subscribeSparseNodes(nodes, 1)
await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)
var seen: Table[string, int]
var subs: seq[Future[void]]
@@ -528,17 +598,18 @@ suite "GossipSub":
check: v >= 1
for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get())
var gossip = GossipSub(node)
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaitters)
result = true
await allFuturesThrowing(nodesFut)
check:
waitFor(runTests()) == true
waitFor(runTests())

View File

@@ -16,4 +16,4 @@ suite "Message":
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[], "topic", seqno, sign = true)
check verify(msg, peer)
check verify(msg, peer.peerId)

View File

@@ -1,22 +1,61 @@
# compile time options here
const
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import random
import chronos
import ../../libp2p/standard_setup
import ../../libp2p/[standard_setup,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/gossipsub,
protocols/secure/secure]
export standard_setup
randomize()
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num:
result.add(newStandardSwitch(gossip = gossip))
proc generateNodes*(
num: Natural,
secureManagers: openarray[SecureProtocol] = [
# array cos order matters
SecureProtocol.Secio,
SecureProtocol.Noise,
],
msgIdProvider: MsgIdProvider = nil,
gossip: bool = false,
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
proc subscribeNodes*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} =
for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
let pubsub = if gossip:
GossipSub.init(
switch = switch,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
else:
FloodSub.init(
switch = switch,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
msgIdProvider = msgIdProvider).PubSub
switch.mount(pubsub)
result.add(pubsub)
proc subscribeNodes*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2): Future[seq[Future[void]]] {.async.} =
proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
if nodes.len < degree:
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
@@ -25,17 +64,17 @@ proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2): Future[seq[Futu
continue
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
if dialer.switch.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeRandom*(nodes: seq[Switch]): Future[seq[Future[void]]] {.async.} =
proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
var dialed: seq[string]
var dialed: seq[PeerID]
while dialed.len < nodes.len - 1:
let node = sample(nodes)
if node.peerInfo.id notin dialed:
if dialer.peerInfo.id != node.peerInfo.id:
await dialer.connect(node.peerInfo)
result.add(dialer.subscribePeer(node.peerInfo))
dialed.add(node.peerInfo.id)
if node.peerInfo.peerId notin dialed:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
dialed.add(node.peerInfo.peerId)

View File

@@ -72,11 +72,20 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Noise],
outTimeout = 5.minutes)
let pubsub = if gossip:
GossipSub.init(
switch = nativeNode).PubSub
else:
FloodSub.init(
switch = nativeNode).PubSub
nativeNode.mount(pubsub)
let awaiters = nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo
var finished = false
@@ -91,8 +100,8 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let peer = NativePeerInfo.init(
daemonPeer.peer,
daemonPeer.addresses)
await nativeNode.connect(peer)
let subscribeHanle = nativeNode.subscribePeer(peer)
await nativeNode.connect(peer.peerId, peer.addrs)
pubsub.subscribePeer(peer.peerId)
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@@ -103,7 +112,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
result = true # don't cancel subscription
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
await nativeNode.subscribe(testTopic, nativeHandler)
await pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(5.seconds)
proc publisher() {.async.} =
@@ -115,9 +124,9 @@ proc testPubSubDaemonPublish(gossip: bool = false,
result = true
await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters)
await daemonNode.close()
await subscribeHanle
proc testPubSubNodePublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} =
@@ -132,18 +141,27 @@ proc testPubSubNodePublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity()
let nativeNode = newStandardSwitch(
gossip = gossip,
secureManagers = [SecureProtocol.Secio],
outTimeout = 5.minutes)
let pubsub = if gossip:
GossipSub.init(
switch = nativeNode).PubSub
else:
FloodSub.init(
switch = nativeNode).PubSub
nativeNode.mount(pubsub)
let awaiters = nativeNode.start()
await pubsub.start()
let nativePeer = nativeNode.peerInfo
let peer = NativePeerInfo.init(
daemonPeer.peer,
daemonPeer.addresses)
await nativeNode.connect(peer)
let subscribeHandle = nativeNode.subscribePeer(peer)
pubsub.subscribePeer(peer.peerId)
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@@ -162,21 +180,21 @@ proc testPubSubNodePublish(gossip: bool = false,
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
await nativeNode.subscribe(testTopic, nativeHandler)
await pubsub.subscribe(testTopic, nativeHandler)
await sleepAsync(5.seconds)
proc publisher() {.async.} =
while not finished:
discard await nativeNode.publish(testTopic, msgData)
discard await pubsub.publish(testTopic, msgData)
await sleepAsync(500.millis)
await wait(publisher(), 5.minutes) # should be plenty of time
result = finished
await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters)
await daemonNode.close()
await subscribeHandle
suite "Interop":
# TODO: chronos transports are leaking,

View File

@@ -118,7 +118,7 @@ suite "Switch":
# plus 4 for the pubsub streams
check (BufferStreamTracker(bufferTracker).opened ==
(BufferStreamTracker(bufferTracker).closed + 4.uint64))
(BufferStreamTracker(bufferTracker).closed))
var connTracker = getTracker(ConnectionTrackerName)
# echo connTracker.dump()
@@ -127,7 +127,7 @@ suite "Switch":
# and the pubsub streams that won't clean up until
# `disconnect()` or `stop()`
check (ConnectionTracker(connTracker).opened ==
(ConnectionTracker(connTracker).closed + 8.uint64))
(ConnectionTracker(connTracker).closed + 4.uint64))
await allFuturesThrowing(
done.wait(5.seconds),