mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 11:48:15 -05:00
Compare commits
21 Commits
staggertcp
...
webrtc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f90721d12 | ||
|
|
8b9f34959b | ||
|
|
c327762f47 | ||
|
|
abd3653d56 | ||
|
|
afe2b08129 | ||
|
|
03ff023e94 | ||
|
|
60d48e644b | ||
|
|
58294ce156 | ||
|
|
359a448c1b | ||
|
|
7945cc754e | ||
|
|
284188a74f | ||
|
|
dab487eeb3 | ||
|
|
ad43f41ad7 | ||
|
|
f350479824 | ||
|
|
c6460ea7ce | ||
|
|
30e93e7c0a | ||
|
|
e0f2b00f9a | ||
|
|
6ab779d30a | ||
|
|
75871817ee | ||
|
|
61929aed6c | ||
|
|
56599f5b9d |
9
.pinned
9
.pinned
@@ -1,17 +1,20 @@
|
||||
bearssl;https://github.com/status-im/nim-bearssl@#e4157639db180e52727712a47deaefcbbac6ec86
|
||||
binary_serialization;https://github.com/status-im/nim-binary-serialization.git@#38a73a70fd43f3835ca01a877353858b19e39d70
|
||||
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
|
||||
chronos;https://github.com/status-im/nim-chronos@#ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c
|
||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8
|
||||
faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309
|
||||
httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18
|
||||
httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec
|
||||
json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df
|
||||
mbedtls;https://github.com/status-im/nim-mbedtls.git@#308f3edaa0edcc880b54ce22156fb2f4e2a2bcc7
|
||||
metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288
|
||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#a079df92424968d46a6ac258299ce9380aa153f2
|
||||
results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad
|
||||
secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be
|
||||
serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35
|
||||
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
|
||||
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
|
||||
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
|
||||
webrtc;https://github.com/status-im/nim-webrtc.git@#d525da3d62ed65e989d782e4cbb7edf221128568
|
||||
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
|
||||
zlib;https://github.com/status-im/nim-zlib@#38b72eda9d70067df4a953f56b5ed59630f2a17b
|
||||
zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5
|
||||
|
||||
@@ -17,6 +17,7 @@ requires "nim >= 1.6.0",
|
||||
"secp256k1",
|
||||
"stew#head",
|
||||
"websock",
|
||||
"https://github.com/status-im/nim-webrtc.git",
|
||||
"unittest2 >= 0.0.5 & <= 0.1.0"
|
||||
|
||||
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
|
||||
|
||||
@@ -122,20 +122,15 @@ proc request*[T](dm: DiscoveryManager, value: T): DiscoveryQuery =
|
||||
pa.add(value)
|
||||
return dm.request(pa)
|
||||
|
||||
proc advertise*(dm: DiscoveryManager, pa: PeerAttributes) =
|
||||
proc advertise*[T](dm: DiscoveryManager, value: T) =
|
||||
for i in dm.interfaces:
|
||||
i.toAdvertise = pa
|
||||
i.toAdvertise.add(value)
|
||||
if i.advertiseLoop.isNil:
|
||||
i.advertisementUpdated = newAsyncEvent()
|
||||
i.advertiseLoop = i.advertise()
|
||||
else:
|
||||
i.advertisementUpdated.fire()
|
||||
|
||||
proc advertise*[T](dm: DiscoveryManager, value: T) =
|
||||
var pa: PeerAttributes
|
||||
pa.add(value)
|
||||
dm.advertise(pa)
|
||||
|
||||
template forEach*(query: DiscoveryQuery, code: untyped) =
|
||||
## Will execute `code` for each discovered peer. The
|
||||
## peer attritubtes are available through the variable
|
||||
|
||||
@@ -19,6 +19,7 @@ type
|
||||
rdv*: RendezVous
|
||||
timeToRequest: Duration
|
||||
timeToAdvertise: Duration
|
||||
ttl: Duration
|
||||
|
||||
RdvNamespace* = distinct string
|
||||
|
||||
@@ -62,12 +63,16 @@ method advertise*(self: RendezVousInterface) {.async.} =
|
||||
|
||||
self.advertisementUpdated.clear()
|
||||
for toAdv in toAdvertise:
|
||||
await self.rdv.advertise(toAdv, self.timeToAdvertise)
|
||||
try:
|
||||
await self.rdv.advertise(toAdv, self.ttl)
|
||||
except CatchableError as error:
|
||||
debug "RendezVous advertise error: ", msg = error.msg
|
||||
|
||||
await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait()
|
||||
|
||||
proc new*(T: typedesc[RendezVousInterface],
|
||||
rdv: RendezVous,
|
||||
ttr: Duration = 1.minutes,
|
||||
tta: Duration = MinimumDuration): RendezVousInterface =
|
||||
T(rdv: rdv, timeToRequest: ttr, timeToAdvertise: tta)
|
||||
tta: Duration = 1.minutes,
|
||||
ttl: Duration = MinimumDuration): RendezVousInterface =
|
||||
T(rdv: rdv, timeToRequest: ttr, timeToAdvertise: tta, ttl: ttl)
|
||||
|
||||
@@ -297,6 +297,33 @@ proc dnsVB(vb: var VBuffer): bool =
|
||||
if s.find('/') == -1:
|
||||
result = true
|
||||
|
||||
proc certHashStB(s: string, vb: var VBuffer): bool =
|
||||
## CertHash address stringToBuffer() implementation.
|
||||
var data = MultiBase.decode(s).valueOr:
|
||||
return false
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(data, mh).isOk:
|
||||
vb.writeSeq(data)
|
||||
result = true
|
||||
|
||||
proc certHashBtS(vb: var VBuffer, s: var string): bool =
|
||||
## CertHash address bufferToString() implementation.
|
||||
var address = newSeq[byte]()
|
||||
if vb.readSeq(address) > 0:
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(address, mh).isOk:
|
||||
s = MultiBase.encode("base64url", address).valueOr:
|
||||
return false
|
||||
result = true
|
||||
|
||||
proc certHashVB(vb: var VBuffer): bool =
|
||||
## CertHash address validateBuffer() implementation.
|
||||
var address = newSeq[byte]()
|
||||
if vb.readSeq(address) > 0:
|
||||
var mh: MultiHash
|
||||
if MultiHash.decode(address, mh).isOk:
|
||||
result = true
|
||||
|
||||
proc mapEq*(codec: string): MaPattern =
|
||||
## ``Equal`` operator for pattern
|
||||
result.operator = Eq
|
||||
@@ -358,6 +385,11 @@ const
|
||||
bufferToString: dnsBtS,
|
||||
validateBuffer: dnsVB
|
||||
)
|
||||
TranscoderCertHash* = Transcoder(
|
||||
stringToBuffer: certHashStB,
|
||||
bufferToString: certHashBtS,
|
||||
validateBuffer: certHashVB
|
||||
)
|
||||
ProtocolsList = [
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("ip4"), kind: Fixed, size: 4,
|
||||
@@ -458,7 +490,17 @@ const
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0
|
||||
)
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("webrtc"), kind: Marker, size: 0
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("certhash"), kind: Length, size: 0,
|
||||
coder: TranscoderCertHash
|
||||
),
|
||||
]
|
||||
|
||||
DNSANY* = mapEq("dns")
|
||||
@@ -489,6 +531,7 @@ const
|
||||
WebSockets_DNS* = mapOr(WS_DNS, WSS_DNS)
|
||||
WebSockets_IP* = mapOr(WS_IP, WSS_IP)
|
||||
WebSockets* = mapOr(WS, WSS)
|
||||
WebRtcDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash"))
|
||||
Onion3* = mapEq("onion3")
|
||||
TcpOnion3* = mapAnd(TCP, Onion3)
|
||||
|
||||
@@ -512,7 +555,7 @@ const
|
||||
mapAnd(DNS, mapEq("https"))
|
||||
)
|
||||
|
||||
WebRTCDirect* = mapOr(
|
||||
WebRTCDirect* {.deprecated.} = mapOr(
|
||||
mapAnd(HTTP, mapEq("p2p-webrtc-direct")),
|
||||
mapAnd(HTTPS, mapEq("p2p-webrtc-direct"))
|
||||
)
|
||||
|
||||
@@ -193,11 +193,14 @@ const MultiCodecList = [
|
||||
("https", 0x01BB),
|
||||
("tls", 0x01C0),
|
||||
("quic", 0x01CC),
|
||||
("certhash", 0x01D2),
|
||||
("ws", 0x01DD),
|
||||
("wss", 0x01DE),
|
||||
("p2p-websocket-star", 0x01DF), # not in multicodec list
|
||||
("p2p-webrtc-star", 0x0113), # not in multicodec list
|
||||
("p2p-webrtc-direct", 0x0114), # not in multicodec list
|
||||
("webrtc-direct", 0x0118),
|
||||
("webrtc", 0x0119),
|
||||
("onion", 0x01BC),
|
||||
("onion3", 0x01BD),
|
||||
("p2p-circuit", 0x0122),
|
||||
|
||||
@@ -15,7 +15,7 @@ import ./pubsub,
|
||||
./pubsubpeer,
|
||||
./timedcache,
|
||||
./peertable,
|
||||
./rpc/[message, messages],
|
||||
./rpc/[message, messages, protobuf],
|
||||
../../crypto/crypto,
|
||||
../../stream/connection,
|
||||
../../peerid,
|
||||
@@ -95,7 +95,16 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =
|
||||
|
||||
method rpcHandler*(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
rpcMsg: RPCMsg) {.async.} =
|
||||
data: seq[byte]) {.async.} =
|
||||
|
||||
var rpcMsg = decodeRpcMsg(data).valueOr:
|
||||
debug "failed to decode msg from peer", peer, err = error
|
||||
raise newException(CatchableError, "")
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
# trigger hooks
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
|
||||
template sub: untyped = rpcMsg.subscriptions[i]
|
||||
f.handleSubscribe(peer, sub.topic, sub.subscribe)
|
||||
|
||||
@@ -13,13 +13,14 @@
|
||||
|
||||
import std/[sets, sequtils]
|
||||
import chronos, chronicles, metrics
|
||||
import chronos/ratelimit
|
||||
import ./pubsub,
|
||||
./floodsub,
|
||||
./pubsubpeer,
|
||||
./peertable,
|
||||
./mcache,
|
||||
./timedcache,
|
||||
./rpc/[messages, message],
|
||||
./rpc/[messages, message, protobuf],
|
||||
../protocol,
|
||||
../../stream/connection,
|
||||
../../peerinfo,
|
||||
@@ -78,7 +79,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||
disconnectBadPeers: false,
|
||||
enablePX: false,
|
||||
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
|
||||
iwantTimeout: 3 * GossipSubHeartbeatInterval
|
||||
iwantTimeout: 3 * GossipSubHeartbeatInterval,
|
||||
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration])
|
||||
)
|
||||
|
||||
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
|
||||
@@ -151,7 +153,7 @@ method init*(g: GossipSub) =
|
||||
g.codecs &= GossipSubCodec
|
||||
g.codecs &= GossipSubCodec_10
|
||||
|
||||
method onNewPeer(g: GossipSub, peer: PubSubPeer) =
|
||||
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
|
||||
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
|
||||
# Make sure stats and peer information match, even when reloading peer stats
|
||||
# from a previous connection
|
||||
@@ -160,7 +162,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
|
||||
peer.behaviourPenalty = stats.behaviourPenalty
|
||||
|
||||
# Check if the score is below the threshold and disconnect the peer if necessary
|
||||
g.disconnectBadPeerCheck(peer, stats.score)
|
||||
g.disconnectIfBadScorePeer(peer, stats.score)
|
||||
|
||||
peer.iHaveBudget = IHavePeerBudget
|
||||
peer.pingBudget = PingsPeerBudget
|
||||
@@ -316,7 +318,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
of ValidationResult.Reject:
|
||||
debug "Dropping message after validation, reason: reject",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg.topicIds)
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
return
|
||||
of ValidationResult.Ignore:
|
||||
debug "Dropping message after validation, reason: ignore",
|
||||
@@ -379,9 +381,57 @@ proc validateAndRelay(g: GossipSub,
|
||||
except CatchableError as exc:
|
||||
info "validateAndRelay failed", msg=exc.msg
|
||||
|
||||
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
||||
msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0)
|
||||
|
||||
proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.raises:[PeerRateLimitError, CatchableError].} =
|
||||
# In this way we count even ignored fields by protobuf
|
||||
|
||||
var rmsg = rpcMsgOpt.valueOr:
|
||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
||||
if not overheadRateLimit.tryConsume(msgSize):
|
||||
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
||||
debug "Peer sent a msg that couldn't be decoded and it's above rate limit", peer, uselessAppBytesNum = msgSize
|
||||
# discard g.disconnectPeer(peer)
|
||||
# debug "Peer disconnected", peer, uselessAppBytesNum = msgSize
|
||||
# raise newException(PeerRateLimitError, "Peer sent a msg that couldn't be decoded and it's above rate limit")
|
||||
|
||||
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
||||
|
||||
let usefulMsgBytesNum =
|
||||
if g.verifySignature:
|
||||
byteSize(rmsg.messages)
|
||||
else:
|
||||
dataAndTopicsIdSize(rmsg.messages)
|
||||
|
||||
var uselessAppBytesNum = msgSize - usefulMsgBytesNum
|
||||
rmsg.control.withValue(control):
|
||||
uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant))
|
||||
|
||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
||||
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
|
||||
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
||||
debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg
|
||||
# discard g.disconnectPeer(peer)
|
||||
# debug "Peer disconnected", peer, msgSize, uselessAppBytesNum
|
||||
# raise newException(PeerRateLimitError, "Peer sent too much useless application data and it's above rate limit.")
|
||||
|
||||
method rpcHandler*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
rpcMsg: RPCMsg) {.async.} =
|
||||
data: seq[byte]) {.async.} =
|
||||
|
||||
let msgSize = data.len
|
||||
var rpcMsg = decodeRpcMsg(data).valueOr:
|
||||
debug "failed to decode msg from peer", peer, err = error
|
||||
rateLimit(g, peer, Opt.none(RPCMsg), msgSize)
|
||||
return
|
||||
|
||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||
rateLimit(g, peer, Opt.some(rpcMsg), msgSize)
|
||||
|
||||
# trigger hooks
|
||||
peer.recvObservers(rpcMsg)
|
||||
|
||||
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
||||
g.send(peer, RPCMsg(pong: rpcMsg.ping))
|
||||
peer.pingBudget.dec
|
||||
@@ -445,14 +495,14 @@ method rpcHandler*(g: GossipSub,
|
||||
# always validate if signature is present or required
|
||||
debug "Dropping message due to failed signature verification",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg.topicIds)
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
continue
|
||||
|
||||
if msg.seqno.len > 0 and msg.seqno.len != 8:
|
||||
# if we have seqno should be 8 bytes long
|
||||
debug "Dropping message due to invalid seqno length",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg.topicIds)
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
continue
|
||||
|
||||
# g.anonymize needs no evaluation when receiving messages
|
||||
@@ -676,3 +726,13 @@ method initPubSub*(g: GossipSub)
|
||||
|
||||
# init gossip stuff
|
||||
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
|
||||
|
||||
method getOrCreatePeer*(
|
||||
g: GossipSub,
|
||||
peerId: PeerId,
|
||||
protos: seq[string]): PubSubPeer =
|
||||
|
||||
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
|
||||
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
|
||||
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
|
||||
return peer
|
||||
|
||||
@@ -11,9 +11,12 @@
|
||||
|
||||
import std/[tables, sets]
|
||||
import chronos, chronicles, metrics
|
||||
import chronos/ratelimit
|
||||
import "."/[types]
|
||||
import ".."/[pubsubpeer]
|
||||
import ../rpc/messages
|
||||
import "../../.."/[peerid, multiaddress, switch, utils/heartbeat]
|
||||
import ../pubsub
|
||||
|
||||
logScope:
|
||||
topics = "libp2p gossipsub"
|
||||
@@ -27,6 +30,7 @@ declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed go
|
||||
declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declarePublicCounter(libp2p_gossipsub_peers_rate_limit_disconnections, "The number of peer disconnections by gossipsub because of rate limit", labels = ["agent"])
|
||||
|
||||
proc init*(_: type[TopicParams]): TopicParams =
|
||||
TopicParams(
|
||||
@@ -85,27 +89,18 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
|
||||
|
||||
{.pop.}
|
||||
|
||||
proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
let agent =
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])
|
||||
|
||||
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
await g.switch.disconnect(peer.peerId)
|
||||
except CatchableError as exc: # Never cancelled
|
||||
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
|
||||
|
||||
proc disconnectBadPeerCheck*(g: GossipSub, peer: PubSubPeer, score: float64) =
|
||||
proc disconnectIfBadScorePeer*(g: GossipSub, peer: PubSubPeer, score: float64) =
|
||||
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and
|
||||
peer.peerId notin g.parameters.directPeers:
|
||||
debug "disconnecting bad score peer", peer, score = peer.score
|
||||
asyncSpawn(g.disconnectPeer(peer))
|
||||
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [peer.getAgent()])
|
||||
|
||||
proc updateScores*(g: GossipSub) = # avoid async
|
||||
## https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#the-score-function
|
||||
@@ -175,14 +170,7 @@ proc updateScores*(g: GossipSub) = # avoid async
|
||||
score += topicScore * topicParams.topicWeight
|
||||
|
||||
# Score metrics
|
||||
let agent =
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
let agent = peer.getAgent()
|
||||
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
|
||||
@@ -219,14 +207,7 @@ proc updateScores*(g: GossipSub) = # avoid async
|
||||
score += colocationFactor * g.parameters.ipColocationFactorWeight
|
||||
|
||||
# Score metrics
|
||||
let agent =
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
let agent = peer.getAgent()
|
||||
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
|
||||
@@ -246,8 +227,7 @@ proc updateScores*(g: GossipSub) = # avoid async
|
||||
|
||||
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
|
||||
|
||||
g.disconnectBadPeerCheck(peer, stats.score)
|
||||
|
||||
g.disconnectIfBadScorePeer(peer, stats.score)
|
||||
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
|
||||
|
||||
for peer in evicting:
|
||||
@@ -260,8 +240,18 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
|
||||
trace "running scoring heartbeat", instance = cast[int](g)
|
||||
g.updateScores()
|
||||
|
||||
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||
for tt in topics:
|
||||
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
|
||||
let uselessAppBytesNum = msg.data.len
|
||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
||||
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
|
||||
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
|
||||
libp2p_gossipsub_peers_rate_limit_disconnections.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
||||
# discard g.disconnectPeer(peer)
|
||||
# debug "Peer disconnected", peer, uselessAppBytesNum
|
||||
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")
|
||||
|
||||
|
||||
for tt in msg.topicIds:
|
||||
let t = tt
|
||||
if t notin g.topics:
|
||||
continue
|
||||
|
||||
@@ -145,6 +145,8 @@ type
|
||||
bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
|
||||
iwantTimeout*: Duration
|
||||
|
||||
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
|
||||
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
import std/[tables, sequtils, sets, strutils]
|
||||
import chronos, chronicles, metrics
|
||||
import chronos/ratelimit
|
||||
import ./errors as pubsub_errors,
|
||||
./pubsubpeer,
|
||||
./rpc/[message, messages, protobuf],
|
||||
@@ -263,7 +264,7 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) =
|
||||
|
||||
method rpcHandler*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
rpcMsg: RPCMsg): Future[void] {.base, async.} =
|
||||
data: seq[byte]): Future[void] {.base, async.} =
|
||||
## Handler that must be overridden by concrete implementation
|
||||
raiseAssert "Unimplemented"
|
||||
|
||||
@@ -278,10 +279,11 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) {
|
||||
of PubSubPeerEventKind.Disconnected:
|
||||
discard
|
||||
|
||||
proc getOrCreatePeer*(
|
||||
method getOrCreatePeer*(
|
||||
p: PubSub,
|
||||
peerId: PeerId,
|
||||
protos: seq[string]): PubSubPeer =
|
||||
protos: seq[string]): PubSubPeer {.base, gcsafe.} =
|
||||
|
||||
p.peers.withValue(peerId, peer):
|
||||
return peer[]
|
||||
|
||||
@@ -354,9 +356,9 @@ method handleConn*(p: PubSub,
|
||||
## that we're interested in
|
||||
##
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] =
|
||||
# call pubsub rpc handler
|
||||
p.rpcHandler(peer, msg)
|
||||
p.rpcHandler(peer, data)
|
||||
|
||||
let peer = p.getOrCreatePeer(conn.peerId, @[proto])
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
import std/[sequtils, strutils, tables, hashes, options, sets, deques]
|
||||
import stew/results
|
||||
import chronos, chronicles, nimcrypto/sha2, metrics
|
||||
import chronos/ratelimit
|
||||
import rpc/[messages, message, protobuf],
|
||||
../../peerid,
|
||||
../../peerinfo,
|
||||
@@ -32,6 +33,8 @@ when defined(libp2p_expensive_metrics):
|
||||
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
||||
|
||||
type
|
||||
PeerRateLimitError* = object of CatchableError
|
||||
|
||||
PubSubObserver* = ref object
|
||||
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}
|
||||
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}
|
||||
@@ -66,8 +69,9 @@ type
|
||||
maxMessageSize: int
|
||||
appScore*: float64 # application specific score
|
||||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
|
||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||
{.gcsafe, raises: [].}
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
@@ -107,7 +111,7 @@ func outbound*(p: PubSubPeer): bool =
|
||||
else:
|
||||
false
|
||||
|
||||
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||
proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) =
|
||||
# trigger hooks
|
||||
if not(isNil(p.observers)) and p.observers[].len > 0:
|
||||
for obs in p.observers[]:
|
||||
@@ -134,26 +138,19 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
conn, peer = p, closed = conn.closed,
|
||||
data = data.shortLog
|
||||
|
||||
var rmsg = decodeRpcMsg(data).valueOr:
|
||||
debug "failed to decode msg from peer",
|
||||
conn, peer = p, closed = conn.closed,
|
||||
err = error
|
||||
break
|
||||
data = newSeq[byte]() # Release memory
|
||||
|
||||
trace "decoded msg from peer",
|
||||
conn, peer = p, closed = conn.closed,
|
||||
msg = rmsg.shortLog
|
||||
# trigger hooks
|
||||
p.recvObservers(rmsg)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
for m in rmsg.messages:
|
||||
for t in m.topicIDs:
|
||||
# metrics
|
||||
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
|
||||
|
||||
await p.handler(p, rmsg)
|
||||
await p.handler(p, data)
|
||||
data = newSeq[byte]() # Release memory
|
||||
except PeerRateLimitError as exc:
|
||||
debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg
|
||||
except CatchableError as exc:
|
||||
debug "Exception occurred in PubSubPeer.handle",
|
||||
conn, peer = p, closed = conn.closed, exc = exc.msg
|
||||
finally:
|
||||
await conn.close()
|
||||
except CancelledError:
|
||||
@@ -245,7 +242,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =
|
||||
return
|
||||
|
||||
if msg.len > p.maxMessageSize:
|
||||
info "trying to send a too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||
return
|
||||
|
||||
if p.sendConn == nil:
|
||||
@@ -272,9 +269,42 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =
|
||||
|
||||
await conn.close() # This will clean up the send connection
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
|
||||
## exceeds the `maxSize` when trying to fit into an empty `RPCMsg`, the latter is skipped as too large to send.
|
||||
## Every constructed `RPCMsg` is then encoded, optionally anonymized, and yielded as a sequence of bytes.
|
||||
|
||||
var currentRPCMsg = rpcMsg
|
||||
currentRPCMsg.messages = newSeq[Message]()
|
||||
|
||||
var currentSize = byteSize(currentRPCMsg)
|
||||
|
||||
for msg in rpcMsg.messages:
|
||||
let msgSize = byteSize(msg)
|
||||
|
||||
# Check if adding the next message will exceed maxSize
|
||||
if float(currentSize + msgSize) * 1.1 > float(maxSize): # Guessing 10% protobuf overhead
|
||||
if currentRPCMsg.messages.len == 0:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
continue # Skip this message
|
||||
|
||||
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
yield encodeRpcMsg(currentRPCMsg, anonymize)
|
||||
currentRPCMsg = RPCMsg()
|
||||
currentSize = 0
|
||||
|
||||
currentRPCMsg.messages.add(msg)
|
||||
currentSize += msgSize
|
||||
|
||||
# Check if there is a non-empty currentRPCMsg left to be added
|
||||
if currentSize > 0 and currentRPCMsg.messages.len > 0:
|
||||
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
yield encodeRpcMsg(currentRPCMsg, anonymize)
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
# When sending messages, we take care to re-encode them with the right
|
||||
# anonymization flag to ensure that we're not penalized for sending invalid
|
||||
# or malicious data on the wire - in particular, re-encoding protects against
|
||||
@@ -292,7 +322,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
||||
sendMetrics(msg)
|
||||
encodeRpcMsg(msg, anonymize)
|
||||
|
||||
asyncSpawn p.sendEncoded(encoded)
|
||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||
asyncSpawn p.sendEncoded(encodedSplitMsg)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
asyncSpawn p.sendEncoded(encoded)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
for sentIHave in p.sentIHaves.mitems():
|
||||
@@ -307,7 +343,8 @@ proc new*(
|
||||
getConn: GetConn,
|
||||
onEvent: OnEvent,
|
||||
codec: string,
|
||||
maxMessageSize: int): T =
|
||||
maxMessageSize: int,
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
|
||||
|
||||
result = T(
|
||||
getConn: getConn,
|
||||
@@ -315,7 +352,18 @@ proc new*(
|
||||
codec: codec,
|
||||
peerId: peerId,
|
||||
connectedFut: newFuture[void](),
|
||||
maxMessageSize: maxMessageSize
|
||||
maxMessageSize: maxMessageSize,
|
||||
overheadRateLimitOpt: overheadRateLimitOpt
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
|
||||
proc getAgent*(peer: PubSubPeer): string =
|
||||
return
|
||||
when defined(libp2p_agents_metrics):
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
else:
|
||||
"unknown"
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import options, sequtils
|
||||
import options, sequtils, sugar
|
||||
import "../../.."/[
|
||||
peerid,
|
||||
routing_record,
|
||||
@@ -18,6 +18,14 @@ import "../../.."/[
|
||||
|
||||
export options
|
||||
|
||||
proc expectedFields[T](t: typedesc[T], existingFieldNames: seq[string]) {.raises: [CatchableError].} =
|
||||
var fieldNames: seq[string]
|
||||
for name, _ in fieldPairs(T()):
|
||||
fieldNames &= name
|
||||
if fieldNames != existingFieldNames:
|
||||
fieldNames.keepIf(proc(it: string): bool = it notin existingFieldNames)
|
||||
raise newException(CatchableError, $T & " fields changed, please search for and revise all relevant procs. New fields: " & $fieldNames)
|
||||
|
||||
type
|
||||
PeerInfoMsg* = object
|
||||
peerId*: PeerId
|
||||
@@ -116,3 +124,54 @@ func shortLog*(m: RPCMsg): auto =
|
||||
messages: mapIt(m.messages, it.shortLog),
|
||||
control: m.control.get(ControlMessage()).shortLog
|
||||
)
|
||||
|
||||
static: expectedFields(PeerInfoMsg, @["peerId", "signedPeerRecord"])
|
||||
proc byteSize(peerInfo: PeerInfoMsg): int =
|
||||
peerInfo.peerId.len + peerInfo.signedPeerRecord.len
|
||||
|
||||
static: expectedFields(SubOpts, @["subscribe", "topic"])
|
||||
proc byteSize(subOpts: SubOpts): int =
|
||||
1 + subOpts.topic.len # 1 byte for the bool
|
||||
|
||||
static: expectedFields(Message, @["fromPeer", "data", "seqno", "topicIds", "signature", "key"])
|
||||
proc byteSize*(msg: Message): int =
|
||||
msg.fromPeer.len + msg.data.len + msg.seqno.len +
|
||||
msg.signature.len + msg.key.len + msg.topicIds.foldl(a + b.len, 0)
|
||||
|
||||
proc byteSize*(msgs: seq[Message]): int =
|
||||
msgs.foldl(a + b.byteSize, 0)
|
||||
|
||||
static: expectedFields(ControlIHave, @["topicId", "messageIds"])
|
||||
proc byteSize(controlIHave: ControlIHave): int =
|
||||
controlIHave.topicId.len + controlIHave.messageIds.foldl(a + b.len, 0)
|
||||
|
||||
proc byteSize*(ihaves: seq[ControlIHave]): int =
|
||||
ihaves.foldl(a + b.byteSize, 0)
|
||||
|
||||
static: expectedFields(ControlIWant, @["messageIds"])
|
||||
proc byteSize(controlIWant: ControlIWant): int =
|
||||
controlIWant.messageIds.foldl(a + b.len, 0)
|
||||
|
||||
proc byteSize*(iwants: seq[ControlIWant]): int =
|
||||
iwants.foldl(a + b.byteSize, 0)
|
||||
|
||||
static: expectedFields(ControlGraft, @["topicId"])
|
||||
proc byteSize(controlGraft: ControlGraft): int =
|
||||
controlGraft.topicId.len
|
||||
|
||||
static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"])
|
||||
proc byteSize(controlPrune: ControlPrune): int =
|
||||
controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64
|
||||
|
||||
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||
proc byteSize(control: ControlMessage): int =
|
||||
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
|
||||
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
|
||||
control.idontwant.foldl(a + b.byteSize, 0)
|
||||
|
||||
static: expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
|
||||
proc byteSize*(rpc: RPCMsg): int =
|
||||
result = rpc.subscriptions.foldl(a + b.byteSize, 0) + byteSize(rpc.messages) +
|
||||
rpc.ping.len + rpc.pong.len
|
||||
rpc.control.withValue(ctrl):
|
||||
result += ctrl.byteSize
|
||||
|
||||
@@ -469,6 +469,8 @@ proc advertisePeer(rdv: RendezVous,
|
||||
trace "Unexpected register response", peer, msgType = msgRecv.msgType
|
||||
elif msgRecv.registerResponse.tryGet().status != ResponseStatus.Ok:
|
||||
trace "Refuse to register", peer, response = msgRecv.registerResponse
|
||||
else:
|
||||
trace "Successfully registered", peer, response = msgRecv.registerResponse
|
||||
except CatchableError as exc:
|
||||
trace "exception in the advertise", error = exc.msg
|
||||
finally:
|
||||
@@ -476,9 +478,9 @@ proc advertisePeer(rdv: RendezVous,
|
||||
await rdv.sema.acquire()
|
||||
discard await advertiseWrap().withTimeout(5.seconds)
|
||||
|
||||
proc advertise*(rdv: RendezVous,
|
||||
method advertise*(rdv: RendezVous,
|
||||
ns: string,
|
||||
ttl: Duration = MinimumDuration) {.async.} =
|
||||
ttl: Duration = MinimumDuration) {.async, base.} =
|
||||
let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr:
|
||||
raise newException(RendezVousError, "Wrong Signed Peer Record")
|
||||
if ns.len notin 1..255:
|
||||
|
||||
@@ -82,7 +82,7 @@ type
|
||||
localPrivateKey: PrivateKey
|
||||
localPublicKey: seq[byte]
|
||||
noiseKeys: KeyPair
|
||||
commonPrologue: seq[byte]
|
||||
commonPrologue*: seq[byte]
|
||||
outgoing: bool
|
||||
|
||||
NoiseConnection* = ref object of SecureConn
|
||||
|
||||
@@ -172,7 +172,7 @@ method start*(
|
||||
|
||||
self.servers &= server
|
||||
|
||||
trace "Listening on", address = ma
|
||||
trace "Listening on", address = self.addrs[i]
|
||||
|
||||
method stop*(self: TcpTransport) {.async, gcsafe.} =
|
||||
## stop the transport
|
||||
|
||||
532
libp2p/transports/webrtctransport.nim
Normal file
532
libp2p/transports/webrtctransport.nim
Normal file
@@ -0,0 +1,532 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
## WebRtc transport implementation
|
||||
## For now, only support WebRtc direct (ie browser to server)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[sequtils]
|
||||
import stew/[endians2, byteutils, objects, results]
|
||||
import chronos, chronicles
|
||||
import transport,
|
||||
../errors,
|
||||
../wire,
|
||||
../multicodec,
|
||||
../multihash,
|
||||
../multibase,
|
||||
../protobuf/minprotobuf,
|
||||
../connmanager,
|
||||
../muxers/muxer,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../upgrademngrs/upgrade,
|
||||
../protocols/secure/noise,
|
||||
../utility
|
||||
|
||||
import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls
|
||||
|
||||
logScope:
|
||||
topics = "libp2p webrtctransport"
|
||||
|
||||
export transport, results
|
||||
|
||||
const charset = toSeq("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789/+".items)
|
||||
proc genUfrag*(rng: ref HmacDrbgContext, size: int): seq[byte] =
|
||||
# https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md?plain=1#L73-L77
|
||||
result = newSeq[byte](size)
|
||||
for resultIndex in 0..<size:
|
||||
let charsetIndex = rng[].generate(uint) mod charset.len()
|
||||
result[resultIndex] = charset[charsetIndex].ord().uint8
|
||||
|
||||
const
|
||||
WebRtcTransportTrackerName* = "libp2p.webrtctransport"
|
||||
MaxMessageSize = 16384 # 16KiB; from the WebRtc-direct spec
|
||||
|
||||
# -- Message --
|
||||
# Implementation of the libp2p's WebRTC message defined here:
|
||||
# https://github.com/libp2p/specs/blob/master/webrtc/README.md?plain=1#L60-L79
|
||||
|
||||
type
|
||||
MessageFlag = enum
|
||||
## Flags to support half-closing and reset of streams.
|
||||
## - Fin: Sender will no longer send messages
|
||||
## - StopSending: Sender will no longer read messages.
|
||||
## Received messages are discarded
|
||||
## - ResetStream: Sender abruptly terminates the sending part of the stream.
|
||||
## Receiver MAY discard any data that it already received on that stream
|
||||
## - FinAck: Acknowledges the previous receipt of a message with the Fin flag set.
|
||||
Fin = 0
|
||||
StopSending = 1
|
||||
ResetStream = 2
|
||||
FinAck = 3
|
||||
|
||||
WebRtcMessage = object
|
||||
flag: Opt[MessageFlag]
|
||||
data: seq[byte]
|
||||
|
||||
proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] =
|
||||
## Decoding WebRTC Message from raw data
|
||||
var
|
||||
pb = initProtoBuffer(bytes)
|
||||
flagOrd: uint32
|
||||
res: WebRtcMessage
|
||||
if ? pb.getField(1, flagOrd).toOpt():
|
||||
var flag: MessageFlag
|
||||
if flag.checkedEnumAssign(flagOrd):
|
||||
res.flag = Opt.some(flag)
|
||||
|
||||
discard ? pb.getField(2, res.data).toOpt()
|
||||
Opt.some(res)
|
||||
|
||||
proc encode(msg: WebRtcMessage): seq[byte] =
|
||||
## Encoding WebRTC Message to raw data
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
msg.flag.withValue(val):
|
||||
pb.write(1, uint32(val))
|
||||
|
||||
if msg.data.len > 0:
|
||||
pb.write(2, msg.data)
|
||||
|
||||
pb.finish()
|
||||
pb.buffer
|
||||
|
||||
# -- Raw WebRTC Stream --
|
||||
# All the data written to or read from a WebRtcStream should be length-prefixed
|
||||
# so `readOnce`/`write` WebRtcStream implementation must either recode
|
||||
# `readLP`/`writeLP`, or implement a `RawWebRtcStream` on which we can
|
||||
# directly use `readLP` and `writeLP`. The second solution is the less redundant,
|
||||
# so it's the one we've chosen.
|
||||
|
||||
type
|
||||
RawWebRtcStream = ref object of Connection
|
||||
dataChannel: DataChannelStream
|
||||
readData: seq[byte]
|
||||
|
||||
proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream =
|
||||
RawWebRtcStream(dataChannel: dataChannel)
|
||||
|
||||
method closeImpl*(s: RawWebRtcStream): Future[void] =
|
||||
# TODO: close datachannel
|
||||
discard
|
||||
|
||||
method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] =
|
||||
trace "RawWebrtcStream write", msg, len=msg.len()
|
||||
s.dataChannel.write(msg)
|
||||
|
||||
method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
# TODO:
|
||||
# if s.isClosed:
|
||||
# raise newLPStreamEOFError()
|
||||
|
||||
if s.readData.len() == 0:
|
||||
let rawData = await s.dataChannel.read()
|
||||
s.readData = rawData
|
||||
trace "readOnce RawWebRtcStream", data = s.readData, nbytes
|
||||
|
||||
result = min(nbytes, s.readData.len)
|
||||
copyMem(pbytes, addr s.readData[0], result)
|
||||
s.readData = s.readData[result..^1]
|
||||
|
||||
# -- Stream --
|
||||
|
||||
type
|
||||
WebRtcState = enum
|
||||
Sending, Closing, Closed
|
||||
|
||||
WebRtcStream = ref object of Connection
|
||||
rawStream: RawWebRtcStream
|
||||
sendQueue: seq[(seq[byte], Future[void])]
|
||||
sendLoop: Future[void]
|
||||
readData: seq[byte]
|
||||
txState: WebRtcState # Transmission
|
||||
rxState: WebRtcState # Reception
|
||||
|
||||
proc new(
|
||||
_: type WebRtcStream,
|
||||
dataChannel: DataChannelStream,
|
||||
oaddr: Opt[MultiAddress],
|
||||
peerId: PeerId): WebRtcStream =
|
||||
let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel),
|
||||
observedAddr: oaddr, peerId: peerId)
|
||||
procCall Connection(stream).initStream()
|
||||
stream
|
||||
|
||||
proc sender(s: WebRtcStream) {.async.} =
|
||||
while s.sendQueue.len > 0:
|
||||
let (message, fut) = s.sendQueue.pop()
|
||||
#TODO handle exceptions
|
||||
await s.rawStream.writeLp(message)
|
||||
if not fut.isNil: fut.complete()
|
||||
|
||||
proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) =
|
||||
let wrappedMessage = msg.encode()
|
||||
s.sendQueue.insert((wrappedMessage, fut))
|
||||
|
||||
if s.sendLoop == nil or s.sendLoop.finished:
|
||||
s.sendLoop = s.sender()
|
||||
|
||||
method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] =
|
||||
# We need to make sure we send all of our data before another write
|
||||
# Otherwise, two concurrent writes could get intertwined
|
||||
# We avoid this by filling the s.sendQueue synchronously
|
||||
var msg = msg2
|
||||
trace "WebrtcStream write", msg, len=msg.len()
|
||||
let retFuture = newFuture[void]("WebRtcStream.write")
|
||||
if s.txState != Sending:
|
||||
retFuture.fail(newLPStreamClosedError())
|
||||
return retFuture
|
||||
|
||||
var messages: seq[seq[byte]]
|
||||
while msg.len > MaxMessageSize - 16:
|
||||
let
|
||||
endOfMessage = MaxMessageSize - 16
|
||||
wrappedMessage = WebRtcMessage(data: msg[0 ..< endOfMessage])
|
||||
s.send(wrappedMessage)
|
||||
msg = msg[endOfMessage .. ^1]
|
||||
|
||||
let
|
||||
wrappedMessage = WebRtcMessage(data: msg)
|
||||
s.send(wrappedMessage, retFuture)
|
||||
|
||||
return retFuture
|
||||
|
||||
proc actuallyClose(s: WebRtcStream) {.async.} =
|
||||
debug "stream closed", rxState=s.rxState, txState=s.txState
|
||||
if s.rxState == Closed and s.txState == Closed and s.readData.len == 0:
|
||||
#TODO add support to DataChannel
|
||||
#await s.dataChannel.close()
|
||||
await procCall Connection(s).closeImpl()
|
||||
|
||||
method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
if s.rxState == Closed:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
while s.readData.len == 0 or nbytes == 0:
|
||||
# Check if there's no data left in readData or if nbytes is equal to 0
|
||||
# in order to read an eventual Fin or FinAck
|
||||
if s.rxState == Closed:
|
||||
await s.actuallyClose()
|
||||
return 0
|
||||
|
||||
let
|
||||
#TODO handle exceptions
|
||||
message = await s.rawStream.readLp(MaxMessageSize)
|
||||
decoded = WebRtcMessage.decode(message).tryGet()
|
||||
|
||||
s.readData = s.readData.concat(decoded.data)
|
||||
|
||||
decoded.flag.withValue(flag):
|
||||
case flag:
|
||||
of Fin:
|
||||
# Peer won't send any more data
|
||||
s.rxState = Closed
|
||||
s.send(WebRtcMessage(flag: Opt.some(FinAck)))
|
||||
of FinAck:
|
||||
s.txState = Closed
|
||||
await s.actuallyClose()
|
||||
if nbytes == 0:
|
||||
return 0
|
||||
else: discard
|
||||
|
||||
result = min(nbytes, s.readData.len)
|
||||
copyMem(pbytes, addr s.readData[0], result)
|
||||
s.readData = s.readData[result..^1]
|
||||
|
||||
method closeImpl*(s: WebRtcStream) {.async.} =
|
||||
s.send(WebRtcMessage(flag: Opt.some(Fin)))
|
||||
s.txState = Closing
|
||||
while s.txState != Closed:
|
||||
discard await s.readOnce(nil, 0)
|
||||
|
||||
# -- Connection --
|
||||
|
||||
type WebRtcConnection = ref object of Connection
|
||||
connection: DataChannelConnection
|
||||
remoteAddress: MultiAddress
|
||||
|
||||
method close*(conn: WebRtcConnection) {.async.} =
|
||||
#TODO
|
||||
discard
|
||||
|
||||
proc new(
|
||||
_: type WebRtcConnection,
|
||||
conn: DataChannelConnection,
|
||||
observedAddr: Opt[MultiAddress]
|
||||
): WebRtcConnection =
|
||||
let co = WebRtcConnection(connection: conn, observedAddr: observedAddr)
|
||||
procCall Connection(co).initStream()
|
||||
co
|
||||
|
||||
proc getStream*(conn: WebRtcConnection,
|
||||
direction: Direction,
|
||||
noiseHandshake: bool = false): Future[WebRtcStream] {.async.} =
|
||||
var datachannel =
|
||||
case direction:
|
||||
of Direction.In:
|
||||
await conn.connection.accept()
|
||||
of Direction.Out:
|
||||
await conn.connection.openStream(noiseHandshake)
|
||||
return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId)
|
||||
|
||||
# -- Muxer --
|
||||
|
||||
type WebRtcMuxer = ref object of Muxer
|
||||
webRtcConn: WebRtcConnection
|
||||
handleFut: Future[void]
|
||||
|
||||
method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} =
|
||||
return await m.webRtcConn.getStream(Direction.Out)
|
||||
|
||||
proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} =
|
||||
try:
|
||||
await m.streamHandler(chann)
|
||||
trace "finished handling stream"
|
||||
doAssert(chann.closed, "connection not closed by handler!")
|
||||
except CatchableError as exc:
|
||||
trace "Exception in mplex stream handler", msg = exc.msg
|
||||
await chann.close()
|
||||
|
||||
#TODO add atEof
|
||||
|
||||
method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} =
|
||||
try:
|
||||
#while not m.webRtcConn.atEof:
|
||||
while true:
|
||||
let incomingStream = await m.webRtcConn.getStream(Direction.In)
|
||||
asyncSpawn m.handleStream(incomingStream)
|
||||
finally:
|
||||
await m.webRtcConn.close()
|
||||
|
||||
method close*(m: WebRtcMuxer) {.async, gcsafe.} =
|
||||
m.handleFut.cancel()
|
||||
await m.webRtcConn.close()
|
||||
|
||||
# -- Upgrader --
|
||||
|
||||
type
|
||||
WebRtcStreamHandler = proc(conn: Connection): Future[void] {.gcsafe, raises: [].}
|
||||
WebRtcUpgrade = ref object of Upgrade
|
||||
streamHandler: WebRtcStreamHandler
|
||||
|
||||
method upgrade*(
|
||||
self: WebRtcUpgrade,
|
||||
conn: Connection,
|
||||
direction: Direction,
|
||||
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
|
||||
|
||||
let webRtcConn = WebRtcConnection(conn)
|
||||
result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn)
|
||||
|
||||
# Noise handshake
|
||||
let noiseHandler = self.secureManagers.filterIt(it of Noise)
|
||||
assert noiseHandler.len > 0
|
||||
|
||||
let xx = "libp2p-webrtc-noise:".toBytes()
|
||||
let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCertificate()).get().data.buffer
|
||||
let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.remoteCertificate()).get().data.buffer
|
||||
((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert
|
||||
echo "=> ", ((Noise)noiseHandler[0]).commonPrologue
|
||||
|
||||
let
|
||||
stream = await webRtcConn.getStream(Out, true)
|
||||
secureStream = await noiseHandler[0].handshake(
|
||||
stream,
|
||||
initiator = true, # we are always the initiator in webrtc-direct
|
||||
peerId = peerId
|
||||
)
|
||||
|
||||
# Peer proved its identity, we can close this
|
||||
await secureStream.close()
|
||||
await stream.close()
|
||||
|
||||
result.streamHandler = self.streamHandler
|
||||
result.handler = result.handle()
|
||||
|
||||
# -- Transport --
|
||||
|
||||
type
|
||||
WebRtcTransport* = ref object of Transport
|
||||
connectionsTimeout: Duration
|
||||
servers: seq[WebRtc]
|
||||
acceptFuts: seq[Future[DataChannelConnection]]
|
||||
clients: array[Direction, seq[DataChannelConnection]]
|
||||
|
||||
WebRtcTransportTracker* = ref object of TrackerBase
|
||||
opened*: uint64
|
||||
closed*: uint64
|
||||
|
||||
WebRtcTransportError* = object of transport.TransportError
|
||||
|
||||
proc setupWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe, raises: [].}
|
||||
|
||||
proc getWebRtcTransportTracker(): WebRtcTransportTracker {.gcsafe.} =
|
||||
result = cast[WebRtcTransportTracker](getTracker(WebRtcTransportTrackerName))
|
||||
if isNil(result):
|
||||
result = setupWebRtcTransportTracker()
|
||||
|
||||
proc dumpTracking(): string {.gcsafe.} =
|
||||
var tracker = getWebRtcTransportTracker()
|
||||
result = "Opened tcp transports: " & $tracker.opened & "\n" &
|
||||
"Closed tcp transports: " & $tracker.closed
|
||||
|
||||
proc leakTransport(): bool {.gcsafe.} =
|
||||
var tracker = getWebRtcTransportTracker()
|
||||
result = (tracker.opened != tracker.closed)
|
||||
|
||||
proc setupWebRtcTransportTracker(): WebRtcTransportTracker =
|
||||
result = new WebRtcTransportTracker
|
||||
result.opened = 0
|
||||
result.closed = 0
|
||||
result.dump = dumpTracking
|
||||
result.isLeaked = leakTransport
|
||||
addTracker(WebRtcTransportTrackerName, result)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WebRtcTransport],
|
||||
upgrade: Upgrade,
|
||||
connectionsTimeout = 10.minutes): T {.public.} =
|
||||
|
||||
let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers)
|
||||
upgrader.streamHandler = proc(conn: Connection)
|
||||
{.async, gcsafe, raises: [].} =
|
||||
# TODO: replace echo by trace and find why it fails compiling
|
||||
echo "Starting stream handler"#, conn
|
||||
try:
|
||||
await upgrader.ms.handle(conn) # handle incoming connection
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
echo "exception in stream handler", exc.msg#, conn, msg = exc.msg
|
||||
finally:
|
||||
await conn.closeWithEOF()
|
||||
echo "Stream handler done"#, conn
|
||||
|
||||
let
|
||||
transport = T(
|
||||
upgrader: upgrader,
|
||||
connectionsTimeout: connectionsTimeout)
|
||||
|
||||
return transport
|
||||
|
||||
method start*(
|
||||
self: WebRtcTransport,
|
||||
addrs: seq[MultiAddress]) {.async.} =
|
||||
## listen on the transport
|
||||
##
|
||||
|
||||
if self.running:
|
||||
warn "WebRtc transport already running"
|
||||
return
|
||||
|
||||
await procCall Transport(self).start(addrs)
|
||||
trace "Starting WebRtc transport"
|
||||
inc getWebRtcTransportTracker().opened
|
||||
|
||||
for i, ma in addrs:
|
||||
if not self.handles(ma):
|
||||
trace "Invalid address detected, skipping!", address = ma
|
||||
continue
|
||||
|
||||
let
|
||||
transportAddress = initTAddress(ma[0..1].tryGet()).tryGet()
|
||||
server = WebRtc.new(transportAddress)
|
||||
server.listen()
|
||||
|
||||
self.servers &= server
|
||||
|
||||
let
|
||||
cert = server.dtls.localCertificate()
|
||||
certHash = MultiHash.digest("sha2-256", cert).get().data.buffer
|
||||
encodedCertHash = MultiBase.encode("base64", certHash).get()
|
||||
self.addrs[i] = MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() &
|
||||
MultiAddress.init(multiCodec("webrtc-direct")).tryGet() &
|
||||
MultiAddress.init(multiCodec("certhash"), certHash).tryGet()
|
||||
|
||||
trace "Listening on", address = self.addrs[i]
|
||||
|
||||
proc connHandler(
|
||||
self: WebRtcTransport,
|
||||
client: DataChannelConnection,
|
||||
observedAddr: Opt[MultiAddress],
|
||||
dir: Direction
|
||||
): Future[Connection] {.async.} =
|
||||
trace "Handling webrtc connection", address = $observedAddr, dir = $dir,
|
||||
clients = self.clients[Direction.In].len + self.clients[Direction.Out].len
|
||||
|
||||
let conn: Connection =
|
||||
WebRtcConnection.new(
|
||||
conn = client,
|
||||
# dir = dir,
|
||||
observedAddr = observedAddr
|
||||
# timeout = self.connectionsTimeout
|
||||
)
|
||||
|
||||
proc onClose() {.async.} =
|
||||
try:
|
||||
let futs = @[conn.join(), conn.join()] #TODO that's stupid
|
||||
await futs[0] or futs[1]
|
||||
for f in futs:
|
||||
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
||||
|
||||
trace "Cleaning up client"# TODO ?: , addrs = $client.remoteAddress,
|
||||
# conn
|
||||
|
||||
self.clients[dir].keepItIf( it != client )
|
||||
#TODO
|
||||
#await allFuturesThrowing(
|
||||
# conn.close(), client.closeWait())
|
||||
|
||||
except CatchableError as exc:
|
||||
let useExc {.used.} = exc
|
||||
debug "Error cleaning up client", errMsg = exc.msg, conn
|
||||
|
||||
self.clients[dir].add(client)
|
||||
asyncSpawn onClose()
|
||||
|
||||
return conn
|
||||
|
||||
method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} =
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
#TODO handle errors
|
||||
if self.acceptFuts.len <= 0:
|
||||
self.acceptFuts = self.servers.mapIt(it.accept())
|
||||
|
||||
if self.acceptFuts.len <= 0:
|
||||
return
|
||||
|
||||
let
|
||||
finished = await one(self.acceptFuts)
|
||||
index = self.acceptFuts.find(finished)
|
||||
|
||||
self.acceptFuts[index] = self.servers[index].accept()
|
||||
trace "Accept WebRTC Transport"
|
||||
|
||||
let transp = await finished
|
||||
try:
|
||||
#TODO add remoteAddress to DataChannelConnection
|
||||
#let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct
|
||||
let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet()
|
||||
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
||||
except CancelledError as exc:
|
||||
#TODO
|
||||
#transp.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Failed to handle connection", exc = exc.msg
|
||||
#TODO
|
||||
#transp.close()
|
||||
|
||||
method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return WebRtcDirect2.match(address)
|
||||
@@ -171,8 +171,6 @@ method start*(
|
||||
|
||||
trace "Listening on", addresses = self.addrs
|
||||
|
||||
self.running = true
|
||||
|
||||
method stop*(self: WsTransport) {.async, gcsafe.} =
|
||||
## stop the transport
|
||||
##
|
||||
|
||||
@@ -1,41 +1,31 @@
|
||||
include ../../libp2p/protocols/pubsub/gossipsub
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.used.}
|
||||
|
||||
import std/[options, deques]
|
||||
import std/[options, deques, sequtils, enumerate, algorithm]
|
||||
import stew/byteutils
|
||||
import ../../libp2p/builders
|
||||
import ../../libp2p/errors
|
||||
import ../../libp2p/crypto/crypto
|
||||
import ../../libp2p/stream/bufferstream
|
||||
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable]
|
||||
import ../../libp2p/protocols/pubsub/rpc/[message, messages]
|
||||
import ../../libp2p/switch
|
||||
import ../../libp2p/muxers/muxer
|
||||
import ../../libp2p/protocols/pubsub/rpc/protobuf
|
||||
import utils
|
||||
|
||||
import ../helpers
|
||||
|
||||
type
|
||||
TestGossipSub = ref object of GossipSub
|
||||
|
||||
proc noop(data: seq[byte]) {.async, gcsafe.} = discard
|
||||
|
||||
proc getPubSubPeer(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||
proc getConn(): Future[Connection] =
|
||||
p.switch.dial(peerId, GossipSubCodec)
|
||||
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024)
|
||||
debug "created new pubsub peer", peerId
|
||||
|
||||
p.peers[peerId] = pubSubPeer
|
||||
|
||||
onNewPeer(p, pubSubPeer)
|
||||
pubSubPeer
|
||||
|
||||
proc randomPeerId(): PeerId =
|
||||
try:
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
const MsgIdSuccess = "msg id gen success"
|
||||
|
||||
suite "GossipSub internal":
|
||||
@@ -170,7 +160,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`replenishFanout` Degree Lo":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -197,7 +187,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`dropFanoutPeers` drop expired fanout topics":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -227,7 +217,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic1 = "foobar1"
|
||||
@@ -264,7 +254,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -325,7 +315,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -365,7 +355,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -406,7 +396,7 @@ suite "GossipSub internal":
|
||||
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -447,7 +437,7 @@ suite "GossipSub internal":
|
||||
asyncTest "Drop messages of topics without subscription":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
check false
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -470,7 +460,7 @@ suite "GossipSub internal":
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
inc seqno
|
||||
let msg = Message.init(peerId, ("bar" & $i).toBytes(), topic, some(seqno))
|
||||
await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg]))
|
||||
await gossipSub.rpcHandler(peer, encodeRpcMsg(RPCMsg(messages: @[msg]), false))
|
||||
|
||||
check gossipSub.mcache.msgs.len == 0
|
||||
|
||||
@@ -481,7 +471,7 @@ suite "GossipSub internal":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
gossipSub.parameters.disconnectBadPeers = true
|
||||
gossipSub.parameters.appSpecificWeight = 1.0
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
check false
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -525,7 +515,7 @@ suite "GossipSub internal":
|
||||
conn.peerId = peerId
|
||||
let peer = gossipSub.getPubSubPeer(peerId)
|
||||
|
||||
await gossipSub.rpcHandler(peer, lotOfSubs)
|
||||
await gossipSub.rpcHandler(peer, encodeRpcMsg(lotOfSubs, false))
|
||||
|
||||
check:
|
||||
gossipSub.gossipsub.len == gossipSub.topicsHigh
|
||||
@@ -656,7 +646,7 @@ suite "GossipSub internal":
|
||||
asyncTest "handleIHave/Iwant tests":
|
||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
check false
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
@@ -733,7 +723,7 @@ suite "GossipSub internal":
|
||||
|
||||
var iwantCount = 0
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
|
||||
check false
|
||||
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
@@ -779,7 +769,7 @@ suite "GossipSub internal":
|
||||
data: actualMessageData
|
||||
)]
|
||||
)
|
||||
await gossipSub.rpcHandler(firstPeer, rpcMsg)
|
||||
await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false))
|
||||
|
||||
check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())
|
||||
|
||||
@@ -792,7 +782,7 @@ suite "GossipSub internal":
|
||||
gossipSub.parameters.iwantTimeout = 10.milliseconds
|
||||
await gossipSub.start()
|
||||
|
||||
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard
|
||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard
|
||||
proc handler2(topic: string, data: seq[byte]) {.async.} = discard
|
||||
|
||||
let topic = "foobar"
|
||||
@@ -825,3 +815,130 @@ suite "GossipSub internal":
|
||||
|
||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||
await gossipSub.switch.stop()
|
||||
|
||||
proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
|
||||
let
|
||||
nodes = generateNodes(2, gossip = true, verifySignature = false)
|
||||
discard await allFinished(
|
||||
nodes[0].switch.start(),
|
||||
nodes[1].switch.start()
|
||||
)
|
||||
|
||||
await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
|
||||
|
||||
var receivedMessages = new(HashSet[seq[byte]])
|
||||
|
||||
proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
receivedMessages[].incl(data)
|
||||
|
||||
proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
discard
|
||||
|
||||
nodes[0].subscribe("foobar", handlerA)
|
||||
nodes[1].subscribe("foobar", handlerB)
|
||||
await waitSubGraph(nodes, "foobar")
|
||||
|
||||
var gossip0: GossipSub = GossipSub(nodes[0])
|
||||
var gossip1: GossipSub = GossipSub(nodes[1])
|
||||
|
||||
return (gossip0, gossip1, receivedMessages)
|
||||
|
||||
proc teardownTest(gossip0: GossipSub, gossip1: GossipSub) {.async.} =
|
||||
await allFuturesThrowing(
|
||||
gossip0.switch.stop(),
|
||||
gossip1.switch.stop()
|
||||
)
|
||||
|
||||
proc createMessages(gossip0: GossipSub, gossip1: GossipSub, size1: int, size2: int): tuple[iwantMessageIds: seq[MessageId], sentMessages: HashSet[seq[byte]]] =
|
||||
var iwantMessageIds = newSeq[MessageId]()
|
||||
var sentMessages = initHashSet[seq[byte]]()
|
||||
|
||||
for i, size in enumerate([size1, size2]):
|
||||
let data = newSeqWith[byte](size, i.byte)
|
||||
sentMessages.incl(data)
|
||||
|
||||
let msg = Message.init(gossip1.peerInfo.peerId, data, "foobar", some(uint64(i + 1)))
|
||||
let iwantMessageId = gossip1.msgIdProvider(msg).expect(MsgIdSuccess)
|
||||
iwantMessageIds.add(iwantMessageId)
|
||||
gossip1.mcache.put(iwantMessageId, msg)
|
||||
|
||||
let peer = gossip1.peers[(gossip0.peerInfo.peerId)]
|
||||
peer.sentIHaves[^1].incl(iwantMessageId)
|
||||
|
||||
return (iwantMessageIds, sentMessages)
|
||||
|
||||
asyncTest "e2e - Split IWANT replies when individual messages are below maxSize but combined exceed maxSize":
|
||||
# This test checks if two messages, each below the maxSize, are correctly split when their combined size exceeds maxSize.
|
||||
# Expected: Both messages should be received.
|
||||
let (gossip0, gossip1, receivedMessages) = await setupTest()
|
||||
|
||||
let messageSize = gossip1.maxMessageSize div 2 + 1
|
||||
let (iwantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
|
||||
))))
|
||||
|
||||
checkExpiring: receivedMessages[] == sentMessages
|
||||
check receivedMessages[].len == 2
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
asyncTest "e2e - Discard IWANT replies when both messages individually exceed maxSize":
|
||||
# This test checks if two messages, each exceeding the maxSize, are discarded and not sent.
|
||||
# Expected: No messages should be received.
|
||||
let (gossip0, gossip1, receivedMessages) = await setupTest()
|
||||
|
||||
let messageSize = gossip1.maxMessageSize + 10
|
||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, messageSize, messageSize)
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
|
||||
await sleepAsync(300.milliseconds)
|
||||
checkExpiring: receivedMessages[].len == 0
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
asyncTest "e2e - Process IWANT replies when both messages are below maxSize":
|
||||
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
|
||||
# Expected: Both messages should be received.
|
||||
let (gossip0, gossip1, receivedMessages) = await setupTest()
|
||||
let size1 = gossip1.maxMessageSize div 2
|
||||
let size2 = gossip1.maxMessageSize div 3
|
||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
|
||||
checkExpiring: receivedMessages[] == sentMessages
|
||||
check receivedMessages[].len == 2
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
asyncTest "e2e - Split IWANT replies when one message is below maxSize and the other exceeds maxSize":
|
||||
# This test checks if, when given two messages where one is below maxSize and the other exceeds it, only the smaller message is processed and sent.
|
||||
# Expected: Only the smaller message should be received.
|
||||
let (gossip0, gossip1, receivedMessages) = await setupTest()
|
||||
let maxSize = gossip1.maxMessageSize
|
||||
let size1 = maxSize div 2
|
||||
let size2 = maxSize + 10
|
||||
let (bigIWantMessageIds, sentMessages) = createMessages(gossip0, gossip1, size1, size2)
|
||||
|
||||
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
|
||||
))))
|
||||
|
||||
var smallestSet: HashSet[seq[byte]]
|
||||
let seqs = toSeq(sentMessages)
|
||||
if seqs[0] < seqs[1]:
|
||||
smallestSet.incl(seqs[0])
|
||||
else:
|
||||
smallestSet.incl(seqs[1])
|
||||
|
||||
checkExpiring: receivedMessages[] == smallestSet
|
||||
check receivedMessages[].len == 1
|
||||
|
||||
await teardownTest(gossip0, gossip1)
|
||||
|
||||
@@ -2,10 +2,10 @@ import unittest2
|
||||
|
||||
{.used.}
|
||||
|
||||
import options
|
||||
import options, strutils
|
||||
import stew/byteutils
|
||||
import ../../libp2p/[peerid, peerinfo,
|
||||
crypto/crypto,
|
||||
crypto/crypto as crypto,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/rpc/message,
|
||||
protocols/pubsub/rpc/messages]
|
||||
@@ -28,7 +28,7 @@ suite "Message":
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
seckey = PrivateKey.init(crypto.fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", some(seqno), sign = true)
|
||||
@@ -46,7 +46,7 @@ suite "Message":
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
seckey = PrivateKey.init(crypto.fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
|
||||
@@ -64,7 +64,7 @@ suite "Message":
|
||||
"""08011240B9EA7F0357B5C1247E4FCB5AD09C46818ECB07318CA84711875F4C6C
|
||||
E6B946186A4EB44E0D714B2A2D48263D75CF52D30BEF9D9AE2A9FEB7DAF1775F
|
||||
E731065A"""
|
||||
seckey = PrivateKey.init(fromHex(stripSpaces(pkHex)))
|
||||
seckey = PrivateKey.init(crypto.fromHex(stripSpaces(pkHex)))
|
||||
.expect("valid private key bytes")
|
||||
peer = PeerInfo.new(seckey)
|
||||
msg = Message.init(some(peer), @[], "topic", uint64.none, sign = true)
|
||||
@@ -73,3 +73,55 @@ suite "Message":
|
||||
check:
|
||||
msgIdResult.isErr
|
||||
msgIdResult.error == ValidationResult.Reject
|
||||
|
||||
test "byteSize for RPCMsg":
|
||||
var msg = Message(
|
||||
fromPeer: PeerId(data: @['a'.byte, 'b'.byte]), # 2 bytes
|
||||
data: @[1'u8, 2, 3], # 3 bytes
|
||||
seqno: @[4'u8, 5], # 2 bytes
|
||||
signature: @['c'.byte, 'd'.byte], # 2 bytes
|
||||
key: @[6'u8, 7], # 2 bytes
|
||||
topicIds: @["abc", "defgh"] # 3 + 5 = 8 bytes
|
||||
)
|
||||
|
||||
var peerInfo = PeerInfoMsg(
|
||||
peerId: PeerId(data: @['e'.byte]), # 1 byte
|
||||
signedPeerRecord: @['f'.byte, 'g'.byte] # 2 bytes
|
||||
)
|
||||
|
||||
var controlIHave = ControlIHave(
|
||||
topicId: "ijk", # 3 bytes
|
||||
messageIds: @[ @['l'.byte], @['m'.byte, 'n'.byte] ] # 1 + 2 = 3 bytes
|
||||
)
|
||||
|
||||
var controlIWant = ControlIWant(
|
||||
messageIds: @[ @['o'.byte, 'p'.byte], @['q'.byte] ] # 2 + 1 = 3 bytes
|
||||
)
|
||||
|
||||
var controlGraft = ControlGraft(
|
||||
topicId: "rst" # 3 bytes
|
||||
)
|
||||
|
||||
var controlPrune = ControlPrune(
|
||||
topicId: "uvw", # 3 bytes
|
||||
peers: @[peerInfo, peerInfo], # (1 + 2) * 2 = 6 bytes
|
||||
backoff: 12345678 # 8 bytes for uint64
|
||||
)
|
||||
|
||||
var control = ControlMessage(
|
||||
ihave: @[controlIHave, controlIHave], # (3 + 3) * 2 = 12 bytes
|
||||
iwant: @[controlIWant], # 3 bytes
|
||||
graft: @[controlGraft], # 3 bytes
|
||||
prune: @[controlPrune], # 3 + 6 + 8 = 17 bytes
|
||||
idontwant: @[controlIWant] # 3 bytes
|
||||
)
|
||||
|
||||
var rpcMsg = RPCMsg(
|
||||
subscriptions: @[SubOpts(subscribe: true, topic: "a".repeat(12)), SubOpts(subscribe: false, topic: "b".repeat(14))], # 1 + 12 + 1 + 14 = 28 bytes
|
||||
messages: @[msg, msg], # 19 * 2 = 38 bytes
|
||||
ping: @[1'u8, 2], # 2 bytes
|
||||
pong: @[3'u8, 4], # 2 bytes
|
||||
control: some(control) # 12 + 3 + 3 + 17 + 3 = 38 bytes
|
||||
)
|
||||
|
||||
check byteSize(rpcMsg) == 28 + 38 + 2 + 2 + 38 # Total: 108 bytes
|
||||
|
||||
@@ -9,16 +9,39 @@ import chronos, stew/[byteutils, results]
|
||||
import ../../libp2p/[builders,
|
||||
protocols/pubsub/errors,
|
||||
protocols/pubsub/pubsub,
|
||||
protocols/pubsub/pubsubpeer,
|
||||
protocols/pubsub/gossipsub,
|
||||
protocols/pubsub/floodsub,
|
||||
protocols/pubsub/rpc/messages,
|
||||
protocols/secure/secure]
|
||||
import ../helpers
|
||||
import chronicles
|
||||
|
||||
export builders
|
||||
|
||||
randomize()
|
||||
|
||||
type
|
||||
TestGossipSub* = ref object of GossipSub
|
||||
|
||||
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
|
||||
proc getConn(): Future[Connection] =
|
||||
p.switch.dial(peerId, GossipSubCodec)
|
||||
|
||||
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024)
|
||||
debug "created new pubsub peer", peerId
|
||||
|
||||
p.peers[peerId] = pubSubPeer
|
||||
|
||||
onNewPeer(p, pubSubPeer)
|
||||
pubSubPeer
|
||||
|
||||
proc randomPeerId*(): PeerId =
|
||||
try:
|
||||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
func defaultMsgIdProvider*(m: Message): Result[MessageId, ValidationResult] =
|
||||
let mid =
|
||||
if m.seqno.len > 0 and m.fromPeer.data.len > 0:
|
||||
|
||||
@@ -147,7 +147,9 @@ const
|
||||
"/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234",
|
||||
"/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
|
||||
"/p2p-webrtc-star/ip4/127.0.0.1/tcp/9090/ws/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
|
||||
"/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
|
||||
"/ip4/127.0.0.1/tcp/9090/p2p-circuit/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
|
||||
"/ip4/127.0.0.1/udp/1234/webrtc-direct",
|
||||
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
|
||||
]
|
||||
|
||||
RustSuccessExpects = [
|
||||
@@ -177,7 +179,9 @@ const
|
||||
"047F000001A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B0604D2",
|
||||
"29200108A07AC542013AC986FFFE317095061F40DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
|
||||
"9302047F000001062382DD03A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
|
||||
"047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B"
|
||||
"047F000001062382A202A503221220D52EBB89D85B02A284948203A62FF28389C57C9F42BEEC4EC20DB76A68911C0B",
|
||||
"047F000001910204D29802",
|
||||
"047F000001910204D29802D203221220C3AB8FF13720E8AD9047DD39466B3C8974E592C2FA383D4A3960714CAEF0C4F2"
|
||||
]
|
||||
|
||||
RustFailureVectors = [
|
||||
@@ -211,7 +215,9 @@ const
|
||||
"/ip4/127.0.0.1/tcp",
|
||||
"/ip4/127.0.0.1/ipfs",
|
||||
"/ip4/127.0.0.1/ipfs/tcp",
|
||||
"/p2p-circuit/50"
|
||||
"/p2p-circuit/50",
|
||||
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash/b2uaraocy6yrdblb4sfptaddgimjmmp",
|
||||
"/ip4/127.0.0.1/udp/1234/webrtc-direct/certhash"
|
||||
]
|
||||
|
||||
PathVectors = [
|
||||
|
||||
@@ -14,6 +14,7 @@ import chronos
|
||||
import ../libp2p/[protocols/rendezvous,
|
||||
switch,
|
||||
builders,]
|
||||
import ../libp2p/discovery/[rendezvousinterface, discoverymngr]
|
||||
import ./helpers
|
||||
|
||||
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
|
||||
73
tests/testrendezvousinterface.nim
Normal file
73
tests/testrendezvousinterface.nim
Normal file
@@ -0,0 +1,73 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import sequtils, strutils
|
||||
import chronos
|
||||
import ../libp2p/[protocols/rendezvous,
|
||||
switch,
|
||||
builders,]
|
||||
import ../libp2p/discovery/[rendezvousinterface, discoverymngr]
|
||||
import ./helpers
|
||||
|
||||
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
SwitchBuilder.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
|
||||
.withTcpTransport()
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
.withRendezVous(rdv)
|
||||
.build()
|
||||
|
||||
type
|
||||
MockRendezVous = ref object of RendezVous
|
||||
numAdvertiseNs1: int
|
||||
numAdvertiseNs2: int
|
||||
|
||||
MockErrorRendezVous = ref object of MockRendezVous
|
||||
|
||||
method advertise*(self: MockRendezVous, namespace: string, ttl: Duration) {.async.} =
|
||||
if namespace == "ns1":
|
||||
self.numAdvertiseNs1 += 1
|
||||
elif namespace == "ns2":
|
||||
self.numAdvertiseNs2 += 1
|
||||
# Forward the call to the actual implementation
|
||||
await procCall RendezVous(self).advertise(namespace, ttl)
|
||||
|
||||
method advertise*(self: MockErrorRendezVous, namespace: string, ttl: Duration) {.async.} =
|
||||
await procCall MockRendezVous(self).advertise(namespace, ttl)
|
||||
raise newException(CatchableError, "MockErrorRendezVous.advertise")
|
||||
|
||||
suite "RendezVous Interface":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
proc baseTimeToAdvertiseTest(rdv: MockRendezVous) {.async.} =
|
||||
let
|
||||
tta = 100.milliseconds
|
||||
ttl = 2.hours
|
||||
client = createSwitch(rdv)
|
||||
dm = DiscoveryManager()
|
||||
|
||||
await client.start()
|
||||
dm.add(RendezVousInterface.new(rdv = rdv, tta = tta, ttl = ttl))
|
||||
dm.advertise(RdvNamespace("ns1"))
|
||||
dm.advertise(RdvNamespace("ns2"))
|
||||
|
||||
checkExpiring: rdv.numAdvertiseNs1 >= 5
|
||||
checkExpiring: rdv.numAdvertiseNs2 >= 5
|
||||
await client.stop()
|
||||
|
||||
asyncTest "Check timeToAdvertise interval":
|
||||
await baseTimeToAdvertiseTest(MockRendezVous.new(newRng()))
|
||||
|
||||
asyncTest "Check timeToAdvertise interval when there is an error":
|
||||
await baseTimeToAdvertiseTest(MockErrorRendezVous.new(newRng()))
|
||||
42
testwebrtc.nim
Normal file
42
testwebrtc.nim
Normal file
@@ -0,0 +1,42 @@
|
||||
import chronos, libp2p, libp2p/transports/webrtctransport
|
||||
import stew/byteutils
|
||||
|
||||
proc echoHandler(conn: Connection, proto: string) {.async.} =
|
||||
defer: await conn.close()
|
||||
while true:
|
||||
try:
|
||||
echo "\e[35;1m => Echo Handler <=\e[0m"
|
||||
var xx = newSeq[byte](1024)
|
||||
let aa = await conn.readOnce(addr xx[0], 1024)
|
||||
xx = xx[0..<aa]
|
||||
let msg = string.fromBytes(xx)
|
||||
echo " => Echo Handler Receive: ", msg, " <="
|
||||
echo " => Echo Handler Try Send: ", msg & "1", " <="
|
||||
await conn.write(msg & "1")
|
||||
except CatchableError as e:
|
||||
echo " => Echo Handler Error: ", e.msg, " <="
|
||||
break
|
||||
|
||||
proc main {.async.} =
|
||||
let switch =
|
||||
SwitchBuilder.new()
|
||||
.withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withYamux()
|
||||
.withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr))
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
let
|
||||
codec = "/echo/1.0.0"
|
||||
proto = new LPProtocol
|
||||
proto.handler = echoHandler
|
||||
proto.codec = codec
|
||||
|
||||
switch.mount(proto)
|
||||
await switch.start()
|
||||
echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m"
|
||||
await sleepAsync(1.hours)
|
||||
|
||||
waitFor main()
|
||||
Reference in New Issue
Block a user