Compare commits

...

3 Commits

Author SHA1 Message Date
Etan Kissling
23acaece69 X 2024-03-12 00:20:04 +01:00
Etan Kissling
417922cd7a example also needs a change, not sure if acceptable 2024-03-11 16:08:58 +01:00
Etan Kissling
14fee3b754 add support for setting protocol handlers with {.raises.} annotation
All of the internal protocol handlers are restricted to `{.raises.}` of
`[CancelledError]`. However, the `LPProtoHandler` type is part of the
public interface, and example / test code raises more errors than just
`[CancelledError]`. The caller is aware of that and `CatchableError` is
caught.

To allow annotating the internal handlers with the proper `{.raises`.}
annotation, support for an extra `LPProtoHandler2` is added. This is
backward compatible as the old `LPProtoHandler` is still allowed.
Examples still compile fine. There is one test that needs a slight
adjustment as it accesses the internal `handler` field directly. That
field needs to be renamed so that the `template` is used instead.

Eventually, `LPProtoHandler` may be phased out, with appropriate notices
to users who define custom handlers and the migration requiring errors
to be handled inside the handler instead of raising them. At this time,
such a deprecation is not yet applied, especially while the internal
logic is still relying on the previous handler flow.
2024-03-11 15:21:50 +01:00
23 changed files with 332 additions and 265 deletions

View File

@@ -20,7 +20,7 @@ proc new(T: typedesc[TestProto]): T =
# We must close the connections ourselves when we're done with it
await conn.close()
return T(codecs: @[TestCodec], handler: handle)
return T.new(codecs = @[TestCodec], handler = handle)
##
# Helper to create a switch/node

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -21,56 +21,60 @@ type
Dial* = ref object of RootObj
method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
) {.async: (raises: [CancelledError, LPError], raw: true), base.} =
## connect remote peer without negotiating
## a protocol
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method connect*(
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async, base.} =
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## Connects to a peer and retrieve its PeerId
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream over an
## existing connection
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method addTransport*(
self: Dial,
transport: Transport) {.base.} =
doAssert(false, "Not implemented!")
self: Dial,
transport: Transport) {.base.} =
raiseAssert("Not implemented!")
method tryDial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async, base.} =
doAssert(false, "Not implemented!")
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
raiseAssert("Not implemented!")

View File

@@ -163,15 +163,15 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] =
return Opt.some(muxer)
proc internalConnect(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
Future[Muxer] {.async.} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
raise newException(DialFailedError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -230,7 +230,7 @@ proc addHandler*(m: MultistreamSelect,
proc addHandler*(m: MultistreamSelect,
codec: string,
handler: LPProtoHandler,
handler: LPProtoHandler|LPProtoHandler2,
matcher: Matcher = nil) =
## helper to allow registering pure handlers
trace "registering proto handler", proto = codec

View File

@@ -140,7 +140,8 @@ proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[voi
proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T =
let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
let msg = AutonatMsg.decode(await conn.readLp(1024)).valueOr:
raise newException(AutonatError, "Received malformed message")

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -19,7 +19,6 @@ import ../../protocol,
../../../switch,
../../../utils/future
export DcutrError
export chronicles
type Dcutr* = ref object of LPProtocol
@@ -29,7 +28,8 @@ logScope:
proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDialableAddrs = 8): T =
proc handleStream(stream: Connection, proto: string) {.async.} =
proc handleStream(
stream: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var peerDialableAddrs: seq[MultiAddress]
try:
let connectMsg = DcutrMsg.decode(await stream.readLp(1024))
@@ -65,14 +65,14 @@ proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDi
except CancelledError as err:
raise err
except AllFuturesFailedError as err:
debug "Dcutr receiver could not connect to the remote peer, all connect attempts failed", peerDialableAddrs, msg = err.msg
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts failed", err)
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts failed", peerDialableAddrs, msg = err.msg
except AsyncTimeoutError as err:
debug "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", peerDialableAddrs, msg = err.msg
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", err)
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts timed out", peerDialableAddrs, msg = err.msg
except CatchableError as err:
warn "Unexpected error when Dcutr receiver tried to connect to the remote peer", msg = err.msg
raise newException(DcutrError, "Unexpected error when Dcutr receiver tried to connect to the remote peer", err)
warn "Unexpected error when Dcutr receiver tried to connect " &
"to the remote peer", msg = err.msg
let self = T()
self.handler = handleStream

View File

@@ -266,7 +266,8 @@ proc new*(T: typedesc[RelayClient], canHop: bool = false,
maxCircuitPerPeer: maxCircuitPerPeer,
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
case proto:
of RelayV1Codec: await cl.handleStreamV1(conn)

View File

@@ -336,7 +336,8 @@ proc new*(T: typedesc[Relay],
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
case proto:
of RelayV2HopCodec: await r.handleHopStreamV2(conn)

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -45,14 +45,14 @@ type
IdentifyNoPubKeyError* = object of IdentifyError
IdentifyInfo* {.public.} = object
pubkey*: Option[PublicKey]
pubkey*: Opt[PublicKey]
peerId*: PeerId
addrs*: seq[MultiAddress]
observedAddr*: Option[MultiAddress]
protoVersion*: Option[string]
agentVersion*: Option[string]
observedAddr*: Opt[MultiAddress]
protoVersion*: Opt[string]
agentVersion*: Opt[string]
protos*: seq[string]
signedPeerRecord*: Option[Envelope]
signedPeerRecord*: Opt[Envelope]
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
@@ -60,10 +60,9 @@ type
observedAddrManager*: ObservedAddrManager
IdentifyPushHandler* = proc (
peer: PeerId,
newInfo: IdentifyInfo):
Future[void]
{.gcsafe, raises: [], public.}
peer: PeerId,
newInfo: IdentifyInfo
): Future[void] {.async: (raises: [CancelledError]), public.}
IdentifyPush* = ref object of LPProtocol
identifyHandler: IdentifyPushHandler
@@ -81,8 +80,10 @@ chronicles.expandIt(IdentifyInfo):
if it.signedPeerRecord.isSome(): "Some"
else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: bool): ProtoBuffer
{.raises: [].} =
proc encodeMsg(
peerInfo: PeerInfo,
observedAddr: Opt[MultiAddress],
sendSpr: bool): ProtoBuffer =
result = initProtoBuffer()
let pkey = peerInfo.publicKey
@@ -121,27 +122,26 @@ proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] =
var pb = initProtoBuffer(buf)
if ? pb.getField(1, pubkey).toOpt():
iinfo.pubkey = some(pubkey)
iinfo.pubkey = Opt.some(pubkey)
if ? pb.getField(8, signedPeerRecord).toOpt() and
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = Opt.some(signedPeerRecord.envelope)
discard ? pb.getRepeatedField(2, iinfo.addrs).toOpt()
discard ? pb.getRepeatedField(3, iinfo.protos).toOpt()
if ? pb.getField(4, oaddr).toOpt():
iinfo.observedAddr = some(oaddr)
iinfo.observedAddr = Opt.some(oaddr)
if ? pb.getField(5, protoVersion).toOpt():
iinfo.protoVersion = some(protoVersion)
iinfo.protoVersion = Opt.some(protoVersion)
if ? pb.getField(6, agentVersion).toOpt():
iinfo.agentVersion = some(agentVersion)
iinfo.agentVersion = Opt.some(agentVersion)
Opt.some(iinfo)
proc new*(
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new(),
): T =
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new()): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord,
@@ -151,14 +151,15 @@ proc new*(
identify
method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
trace "handling identify request", conn
var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord)
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in identify handler", exc = exc.msg, conn
finally:
trace "exiting identify handler", conn
@@ -167,36 +168,46 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async.} =
proc identify*(
self: Identify,
conn: Connection,
remotePeerId: PeerId
): Future[IdentifyInfo] {.async: (raises: [
CancelledError, IdentifyError, LPStreamError]).} =
trace "initiating identify", conn
var message = await conn.readLp(64*1024)
if len(message) == 0:
trace "identify: Empty message received!", conn
raise newException(IdentityInvalidMsgError, "Empty message received!")
raise (ref IdentityInvalidMsgError)(msg: "Empty message received!")
var info = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!")
var info = decodeMsg(message).valueOr:
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify: decoded message", conn, info
let
pubkey = info.pubkey.valueOr: raise newException(IdentityInvalidMsgError, "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr: raise newException(IdentityInvalidMsgError, $error)
pubkey = info.pubkey.valueOr:
raise (ref IdentityInvalidMsgError)(msg: "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr:
raise (ref IdentityInvalidMsgError)(msg: $error)
if peer != remotePeerId:
trace "Peer ids don't match", remote = peer, local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
info.peerId = peer
info.observedAddr.withValue(observed):
# Currently, we use the ObservedAddrManager only to find our dialable external NAT address. Therefore, addresses
# Currently, we use the ObservedAddrManager only to find our
# dialable external NAT address. Therefore, addresses
# like "...\p2p-circuit\p2p\..." and "\p2p\..." are not useful to us.
if observed.contains(multiCodec("p2p-circuit")).get(false) or P2PPattern.matchPartial(observed):
if observed.contains(multiCodec("p2p-circuit")).get(false) or
P2PPattern.matchPartial(observed):
trace "Not adding address to ObservedAddrManager.", observed
elif not self.observedAddrManager.addObservation(observed):
trace "Observed address is not valid.", observedAddr = observed
return info
proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
proc new*(
T: typedesc[IdentifyPush],
handler: IdentifyPushHandler = nil): T {.public.} =
## Create a IdentifyPush protocol. `handler` will be called every time
## a peer sends us new `PeerInfo`
let identifypush = T(identifyHandler: handler)
@@ -204,27 +215,28 @@ proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.pu
identifypush
proc init*(p: IdentifyPush) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
trace "handling identify push", conn
try:
var message = await conn.readLp(64*1024)
var identInfo = decodeMsg(message).valueOr:
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify push: decoded message", conn, identInfo
identInfo.pubkey.withValue(pubkey):
let receivedPeerId = PeerId.init(pubkey).tryGet()
if receivedPeerId != conn.peerId:
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
identInfo.peerId = receivedPeerId
trace "triggering peer event", peerInfo = conn.peerId
if not isNil(p.identifyHandler):
if p.identifyHandler != nil:
await p.identifyHandler(conn.peerId, identInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPError as exc:
info "exception in identify push handler", exc = exc.msg, conn
finally:
trace "exiting identify push handler", conn
@@ -233,7 +245,11 @@ proc init*(p: IdentifyPush) =
p.handler = handle
p.codec = IdentifyPushCodec
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, public.} =
proc push*(
p: IdentifyPush,
peerInfo: PeerInfo,
conn: Connection
) {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Send new `peerInfo`s to a connection
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer)

View File

@@ -27,7 +27,8 @@ type Perf* = ref object of LPProtocol
proc new*(T: typedesc[Perf]): T {.public.} =
var p = T()
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var bytesRead = 0
try:
trace "Received benchmark performance check", conn

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -36,44 +36,45 @@ type
PingError* = object of LPError
WrongPingAckError* = object of PingError
PingHandler* {.public.} = proc (
peer: PeerId):
Future[void]
{.gcsafe, raises: [].}
PingHandler* {.public.} =
proc (peer: PeerId): Future[void] {.async: (raises: [CancelledError]).}
Ping* = ref object of LPProtocol
pingHandler*: PingHandler
rng: ref HmacDrbgContext
proc new*(T: typedesc[Ping], handler: PingHandler = nil, rng: ref HmacDrbgContext = newRng()): T {.public.} =
proc new*(
T: typedesc[Ping],
handler: PingHandler = nil,
rng: ref HmacDrbgContext = newRng()): T {.public.} =
let ping = Ping(pinghandler: handler, rng: rng)
ping.init()
ping
method init*(p: Ping) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
trace "handling ping", conn
var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn
await conn.write(@buf)
if not isNil(p.pingHandler):
if p.pingHandler != nil:
await p.pingHandler(conn.peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in ping handler", exc = exc.msg, conn
p.handler = handle
p.codec = PingCodec
proc ping*(
p: Ping,
conn: Connection,
): Future[Duration] {.async, public.} =
p: Ping,
conn: Connection
): Future[Duration] {.async: (raises: [CancelledError, LPError]), public.} =
## Sends ping to `conn`, returns the delay
trace "initiating ping", conn
var

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -19,14 +19,16 @@ const
type
LPProtoHandler* = proc (
conn: Connection,
proto: string):
Future[void]
{.gcsafe, raises: [].}
conn: Connection,
proto: string): Future[void] {.async.}
LPProtoHandler2* = proc (
conn: Connection,
proto: string): Future[void] {.async: (raises: [CancelledError]).}
LPProtocol* = ref object of RootObj
codecs*: seq[string]
handler*: LPProtoHandler ## this handler gets invoked by the protocol negotiator
handlerImpl: LPProtoHandler ## invoked by the protocol negotiator
started*: bool
maxIncomingStreams: Opt[int]
@@ -41,7 +43,7 @@ proc `maxIncomingStreams=`*(p: LPProtocol, val: int) =
p.maxIncomingStreams = Opt.some(val)
func codec*(p: LPProtocol): string =
assert(p.codecs.len > 0, "Codecs sequence was empty!")
doAssert(p.codecs.len > 0, "Codecs sequence was empty!")
p.codecs[0]
func `codec=`*(p: LPProtocol, codec: string) =
@@ -49,15 +51,39 @@ func `codec=`*(p: LPProtocol, codec: string) =
# if we use this abstraction
p.codecs.insert(codec, 0)
template `handler`*(p: LPProtocol): LPProtoHandler =
p.handlerImpl
template `handler`*(
p: LPProtocol, conn: Connection, proto: string): Future[void] =
p.handlerImpl(conn, proto)
func `handler=`*(p: LPProtocol, handler: LPProtoHandler) =
p.handlerImpl = handler
func `handler=`*(p: LPProtocol, handler: LPProtoHandler2) =
proc wrap(conn: Connection, proto: string): Future[void] {.async.} =
await handler(conn, proto)
p.handlerImpl = wrap
proc new*(
T: type LPProtocol,
codecs: seq[string],
handler: LPProtoHandler,
maxIncomingStreams: Opt[int] | int = Opt.none(int)): T =
T: type LPProtocol,
codecs: seq[string],
handler: LPProtoHandler,
maxIncomingStreams: Opt[int] | int = Opt.none(int)): T =
T(
codecs: codecs,
handler: handler,
handlerImpl: handler,
maxIncomingStreams:
when maxIncomingStreams is int: Opt.some(maxIncomingStreams)
else: maxIncomingStreams
)
proc new*(
T: type LPProtocol,
codecs: seq[string],
handler: LPProtoHandler2,
maxIncomingStreams: Opt[int] | int = Opt.none(int)): T =
proc wrap(conn: Connection, proto: string): Future[void] {.async.} =
await handler(conn, proto)
T.new(codec, wrap, maxIncomingStreams)

View File

@@ -163,7 +163,8 @@ method rpcHandler*(f: FloodSub,
f.updateMetrics(rpcMsg)
method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
proc handler(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...

View File

@@ -138,7 +138,8 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
ok()
method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} =
proc handler(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...

View File

@@ -360,7 +360,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
method handleConn*(p: PubSub,
conn: Connection,
proto: string) {.base, async.} =
proto: string) {.base, async: (raises: [CancelledError]).} =
## handle incoming connections
##
## this proc will:

View File

@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -162,49 +162,40 @@ proc encode(msg: Message): ProtoBuffer =
proc decode(_: typedesc[Cookie], buf: seq[byte]): Opt[Cookie] =
var c: Cookie
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, c.offset)
r2 = pb.getRequiredField(2, c.ns)
if r1.isErr() or r2.isErr(): return Opt.none(Cookie)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, c.offset).toOpt()
? pb.getRequiredField(2, c.ns).toOpt()
Opt.some(c)
proc decode(_: typedesc[Register], buf: seq[byte]): Opt[Register] =
var
r: Register
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, r.ns)
r2 = pb.getRequiredField(2, r.signedPeerRecord)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr(): return Opt.none(Register)
if r3.get(false): r.ttl = Opt.some(ttl)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, r.ns).toOpt()
? pb.getRequiredField(2, r.signedPeerRecord).toOpt()
if ? pb.getField(3, ttl).toOpt(): r.ttl = Opt.some(ttl)
Opt.some(r)
proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] =
proc decode(
_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] =
var
rr: RegisterResponse
statusOrd: uint
text: string
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, statusOrd)
r2 = pb.getField(2, text)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr() or
not checkedEnumAssign(rr.status, statusOrd): return Opt.none(RegisterResponse)
if r2.get(false): rr.text = Opt.some(text)
if r3.get(false): rr.ttl = Opt.some(ttl)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, statusOrd).toOpt()
if not checkedEnumAssign(rr.status, statusOrd):
return Opt.none(RegisterResponse)
if ? pb.getField(2, text).toOpt(): rr.text = Opt.some(text)
if ? pb.getField(3, ttl).toOpt(): rr.ttl = Opt.some(ttl)
Opt.some(rr)
proc decode(_: typedesc[Unregister], buf: seq[byte]): Opt[Unregister] =
var u: Unregister
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, u.ns)
if r1.isErr(): return Opt.none(Unregister)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, u.ns).toOpt()
Opt.some(u)
proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
@@ -212,38 +203,29 @@ proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
d: Discover
limit: uint64
cookie: seq[byte]
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, d.ns)
r2 = pb.getField(2, limit)
r3 = pb.getField(3, cookie)
if r1.isErr() or r2.isErr() or r3.isErr: return Opt.none(Discover)
if r2.get(false): d.limit = Opt.some(limit)
if r3.get(false): d.cookie = Opt.some(cookie)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, d.ns).toOpt()
if ? pb.getField(2, limit).toOpt(): d.limit = Opt.some(limit)
if ? pb.getField(3, cookie).toOpt(): d.cookie = Opt.some(cookie)
Opt.some(d)
proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] =
proc decode(
_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] =
var
dr: DiscoverResponse
registrations: seq[seq[byte]]
cookie: seq[byte]
statusOrd: uint
text: string
let
pb = initProtoBuffer(buf)
r1 = pb.getRepeatedField(1, registrations)
r2 = pb.getField(2, cookie)
r3 = pb.getRequiredField(3, statusOrd)
r4 = pb.getField(4, text)
if r1.isErr() or r2.isErr() or r3.isErr or r4.isErr() or
not checkedEnumAssign(dr.status, statusOrd): return Opt.none(DiscoverResponse)
let pb = initProtoBuffer(buf)
discard ? pb.getRepeatedField(1, registrations).toOpt()
for reg in registrations:
var r: Register
let regOpt = Register.decode(reg).valueOr:
return
dr.registrations.add(regOpt)
if r2.get(false): dr.cookie = Opt.some(cookie)
if r4.get(false): dr.text = Opt.some(text)
dr.registrations.add(? Register.decode(reg))
if ? pb.getField(2, cookie).toOpt(): dr.cookie = Opt.some(cookie)
? pb.getRequiredField(3, statusOrd).toOpt()
if not checkedEnumAssign(dr.status, statusOrd):
return Opt.none(DiscoverResponse)
if ? pb.getField(4, text).toOpt(): dr.text = Opt.some(text)
Opt.some(dr)
proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
@@ -252,33 +234,31 @@ proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
statusOrd: uint
pbr, pbrr, pbu, pbd, pbdr: ProtoBuffer
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, statusOrd).toOpt
? pb.getRequiredField(1, statusOrd).toOpt()
if not checkedEnumAssign(msg.msgType, statusOrd): return Opt.none(Message)
if ? pb.getField(2, pbr).optValue:
if ? pb.getField(2, pbr).toOpt():
msg.register = Register.decode(pbr.buffer)
if msg.register.isNone(): return Opt.none(Message)
if ? pb.getField(3, pbrr).optValue:
if ? pb.getField(3, pbrr).toOpt():
msg.registerResponse = RegisterResponse.decode(pbrr.buffer)
if msg.registerResponse.isNone(): return Opt.none(Message)
if ? pb.getField(4, pbu).optValue:
if ? pb.getField(4, pbu).toOpt():
msg.unregister = Unregister.decode(pbu.buffer)
if msg.unregister.isNone(): return Opt.none(Message)
if ? pb.getField(5, pbd).optValue:
if ? pb.getField(5, pbd).toOpt():
msg.discover = Discover.decode(pbd.buffer)
if msg.discover.isNone(): return Opt.none(Message)
if ? pb.getField(6, pbdr).optValue:
if ? pb.getField(6, pbdr).toOpt():
msg.discoverResponse = DiscoverResponse.decode(pbdr.buffer)
if msg.discoverResponse.isNone(): return Opt.none(Message)
Opt.some(msg)
type
RendezVousError* = object of LPError
RegisteredData = object
@@ -297,7 +277,7 @@ type
rng: ref HmacDrbgContext
salt: string
defaultDT: Moment
registerDeletionLoop: Future[void]
registerDeletionLoop: Future[void].Raising([])
#registerEvent: AsyncEvent # TODO: to raise during the heartbeat
# + make the heartbeat sleep duration "smarter"
sema: AsyncSemaphore
@@ -312,41 +292,61 @@ proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
return err("Bad Peer ID")
return ok()
proc sendRegisterResponse(conn: Connection,
ttl: uint64) {.async.} =
proc sendRegisterResponse(
conn: Connection,
ttl: uint64
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: Ok, ttl: Opt.some(ttl)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(
status: Ok,
ttl: Opt.some(ttl)
))
))
conn.writeLp(msg.buffer)
proc sendRegisterResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
proc sendRegisterResponseError(
conn: Connection,
status: ResponseStatus,
text: string = ""
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(
status: status,
text: Opt.some(text)
))
))
conn.writeLp(msg.buffer)
proc sendDiscoverResponse(conn: Connection,
s: seq[Register],
cookie: Cookie) {.async.} =
proc sendDiscoverResponse(
conn: Connection,
s: seq[Register],
cookie: Cookie
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)
))
))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)
))
))
conn.writeLp(msg.buffer)
proc sendDiscoverResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
proc sendDiscoverResponseError(
conn: Connection,
status: ResponseStatus,
text: string = ""
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: status,
text: Opt.some(text)
))
))
conn.writeLp(msg.buffer)
proc countRegister(rdv: RendezVous, peerId: PeerId): int =
let n = Moment.now()
@@ -360,9 +360,8 @@ proc save(rdv: RendezVous,
r: Register,
update: bool = true) =
let nsSalted = ns & rdv.salt
discard rdv.namespaces.hasKeyOrPut(nsSalted, newSeq[int]())
try:
for index in rdv.namespaces[nsSalted]:
for index in rdv.namespaces.mgetOrPut(nsSalted, newSeq[int]()):
if rdv.registered[index].peerId == peerId:
if update == false: return
rdv.registered[index].expiration = rdv.defaultDT
@@ -376,9 +375,14 @@ proc save(rdv: RendezVous,
rdv.namespaces[nsSalted].add(rdv.registered.high)
# rdv.registerEvent.fire()
except KeyError:
doAssert false, "Should have key"
raiseAssert("Should have key")
proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
proc register(
rdv: RendezVous,
conn: Connection,
r: Register
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
trace "Received Register", peerId = conn.peerId, ns = r.ns
libp2p_rendezvous_register.inc()
if r.ns.len notin 1..255:
@@ -390,7 +394,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
if pr.isErr():
return conn.sendRegisterResponseError(InvalidSignedPeerRecord, pr.error())
if rdv.countRegister(conn.peerId) >= RegistrationLimitPerPeer:
return conn.sendRegisterResponseError(NotAuthorized, "Registration limit reached")
return conn.sendRegisterResponseError(
NotAuthorized, "Registration limit reached")
rdv.save(r.ns, conn.peerId, r)
libp2p_rendezvous_registered.inc()
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))
@@ -407,12 +412,15 @@ proc unregister(rdv: RendezVous, conn: Connection, u: Unregister) =
except KeyError:
return
proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
proc discover(
rdv: RendezVous,
conn: Connection,
d: Discover
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "Received Discover", peerId = conn.peerId, ns = d.ns
libp2p_rendezvous_discover.inc()
if d.ns.len notin 0..255:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
var
cookie =
@@ -420,11 +428,10 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
try:
Cookie.decode(d.cookie.tryGet()).tryGet()
except CatchableError:
await conn.sendDiscoverResponseError(InvalidCookie)
return
return conn.sendDiscoverResponseError(InvalidCookie)
else: Cookie(offset: rdv.registered.low().uint64 - 1)
if cookie.ns != d.ns or
cookie.offset notin rdv.registered.low().uint64..rdv.registered.high().uint64:
if cookie.ns != d.ns or cookie.offset notin
rdv.registered.low().uint64 .. rdv.registered.high().uint64:
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
let
nsSalted = d.ns & rdv.salt
@@ -433,31 +440,30 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
try:
rdv.namespaces[nsSalted]
except KeyError:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
else: toSeq(cookie.offset.int..rdv.registered.high())
if namespaces.len() == 0:
await conn.sendDiscoverResponse(@[], Cookie())
return
return conn.sendDiscoverResponse(@[], Cookie())
var offset = namespaces[^1]
let n = Moment.now()
var s = collect(newSeq()):
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset: continue
limit.dec()
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset: continue
limit.dec()
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
rdv.rng.shuffle(s)
await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
proc advertisePeer(rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async.} =
proc advertiseWrap() {.async.} =
proc advertisePeer(
rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async: (raises: [CancelledError]).} =
proc advertiseWrap() {.async: (raises: []).} =
try:
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer: await conn.close()
@@ -471,12 +477,14 @@ proc advertisePeer(rdv: RendezVous,
trace "Refuse to register", peer, response = msgRecv.registerResponse
else:
trace "Successfully registered", peer, response = msgRecv.registerResponse
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in the advertise", error = exc.msg
finally:
rdv.sema.release()
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
defer: rdv.sema.release()
try:
await advertiseWrap().wait(5.seconds)
except AsyncTimeoutError:
discard
method advertise*(rdv: RendezVous,
ns: string,
@@ -636,7 +644,8 @@ proc new*(T: typedesc[RendezVous],
sema: newAsyncSemaphore(SemaphoreDefaultSize)
)
logScope: topics = "libp2p discovery rendezvous"
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
let
buf = await conn.readLp(4096)
@@ -651,7 +660,7 @@ proc new*(T: typedesc[RendezVous],
trace "Got an unexpected Discover Response", response = msg.discoverResponse
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in rendezvous handler", error = exc.msg
finally:
await conn.close()

View File

@@ -18,9 +18,10 @@ type
PlainText* = ref object of Secure
method init(p: PlainText) {.gcsafe.} =
proc handle(conn: Connection, proto: string)
{.async.} = discard
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## plain text doesn't do anything
discard
p.codec = PlainTextCodec
p.handler = handle

View File

@@ -134,7 +134,8 @@ proc handleConn(
method init*(s: Secure) =
procCall LPProtocol(s).init()
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
trace "handling connection upgrade", proto, conn
try:
# We don't need the result but we

View File

@@ -1,5 +1,5 @@
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -32,19 +32,19 @@ proc tryAcquire*(s: AsyncSemaphore): bool =
## Attempts to acquire a resource, if successful
## returns true, otherwise false
##
if s.count > 0 and s.queue.len == 0:
s.count.dec
trace "Acquired slot", available = s.count, queue = s.queue.len
return true
proc acquire*(s: AsyncSemaphore): Future[void] =
proc acquire*(
s: AsyncSemaphore
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
## Acquire a resource and decrement the resource
## counter. If no more resources are available,
## the returned future will not complete until
## the resource count goes above 0.
##
let fut = newFuture[void]("AsyncSemaphore.acquire")
if s.tryAcquire():
fut.complete()
@@ -75,7 +75,6 @@ proc release*(s: AsyncSemaphore) =
## and completing it and incrementing the
## internal resource count
##
doAssert(s.count <= s.size)
if s.count < s.size:

BIN
tests/testdaemon Executable file

Binary file not shown.

View File

@@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -164,9 +164,13 @@ suite "Identify":
switch1 = newStandardSwitch(sendSignedPeerRecord=true)
switch2 = newStandardSwitch(sendSignedPeerRecord=true)
proc updateStore1(peerId: PeerId, info: IdentifyInfo) {.async.} =
proc updateStore1(
peerId: PeerId,
info: IdentifyInfo) {.async: (raises: [CancelledError]).} =
switch1.peerStore.updatePeerInfo(info)
proc updateStore2(peerId: PeerId, info: IdentifyInfo) {.async.} =
proc updateStore2(
peerId: PeerId,
info: IdentifyInfo) {.async: (raises: [CancelledError]).} =
switch2.peerStore.updatePeerInfo(info)
identifyPush1 = IdentifyPush.new(updateStore1)

View File

@@ -42,7 +42,8 @@ suite "Ping":
transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.new(upgrade = Upgrade())
proc handlePing(peer: PeerId) {.async, closure.} =
proc handlePing(
peer: PeerId) {.async: (raises: [CancelledError]), closure.} =
inc pingReceivedCount
pingProto1 = Ping.new()
pingProto2 = Ping.new(handlePing)

View File

@@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -88,7 +88,6 @@ suite "Tor transport":
# every incoming connections will be in handled in this closure
proc handle(conn: Connection, proto: string) {.async.} =
var resp: array[6, byte]
await conn.readExactly(addr resp, 6)
check string.fromBytes(resp) == "client"
@@ -97,7 +96,7 @@ suite "Tor transport":
# We must close the connections ourselves when we're done with it
await conn.close()
return T(codecs: @[TestCodec], handler: handle)
return T.new(codecs = @[TestCodec], handler = handle)
let rng = newRng()