From 25a8ed4d07fc7e9a80a2794d6e78ced9642a7acc Mon Sep 17 00:00:00 2001 From: Ben Date: Mon, 25 Aug 2025 13:33:26 +0200 Subject: [PATCH] refactor(kad): Refine, and reduce, exception scope (#1627) --- libp2p/protocols/kademlia/kademlia.nim | 233 +++++++++++++------------ 1 file changed, 123 insertions(+), 110 deletions(-) diff --git a/libp2p/protocols/kademlia/kademlia.nim b/libp2p/protocols/kademlia/kademlia.nim index 7a233a3b3..e95f32b73 100644 --- a/libp2p/protocols/kademlia/kademlia.nim +++ b/libp2p/protocols/kademlia/kademlia.nim @@ -165,41 +165,35 @@ proc putValue*( if not kad.entryValidator.isValid(entKey, value): return err("invalid key/value pair") + let others: seq[EntryRecord] = + if entKey in kad.dataTable.entries: + @[kad.dataTable.entries.getOrDefault(entKey)] + else: + @[] + + let candAsRec = EntryRecord.init(value, none(TimeStamp)) + let confirmedRec = kad.entrySelector.select(candAsRec, others).valueOr: + error "application provided selector error (local)", msg = error + return err(error) + trace "local putval", candidate = candAsRec, others = others, selected = confirmedRec + let validEnt = ValidatedEntry.init(entKey, confirmedRec.value) + + let peers = await kad.findNode(entKey.data.toKey()) + # We first prime the sends so the data is ready to go + let rpcBatch = peers.mapIt(kad.dispatchPutVal(it, validEnt)) + # then we do the `move`, as insert takes the data as `sink` + kad.dataTable.insert(validEnt, confirmedRec.time) try: - let others: seq[EntryRecord] = - if entKey in kad.dataTable.entries: - @[kad.dataTable.entries[entKey]] - else: - @[] - - let candAsRec = EntryRecord.init(value, none(TimeStamp)) - let confirmedRec = kad.entrySelector.select(candAsRec, others).valueOr: - error "application provided selector error (local)", msg = error - return err(error) - trace "local putval", - candidate = candAsRec, others = others, selected = confirmedRec - - let validEnt = ValidatedEntry.init(entKey, confirmedRec.value) - - let peers = await kad.findNode(entKey.data.toKey()) - # We first prime the sends so the data is ready to go - let rpcBatch = peers.mapIt(kad.dispatchPutVal(it, validEnt)) - # then we do the `move`, as insert takes the data as `sink` - kad.dataTable.insert(validEnt, confirmedRec.time) - try: - # now that the all the data is where it needs to be in memory, we can dispatch the - # RPCs - await rpcBatch.allFutures().wait(chronos.seconds(timeout.get(5))) + # now that the all the data is where it needs to be in memory, we can dispatch the + # RPCs + await rpcBatch.allFutures().wait(chronos.seconds(timeout.get(5))) # It's quite normal for the dispatch to timeout, as it would require all calls to get # their response. Downstream users may desire some sort of functionality in the # future to get rpc telemetry, but in the meantime, we just move on... - except AsyncTimeoutError: - discard - - return results.ok() - except CatchableError as e: - return err("todo: refine exceptions - " & e.msg) + except AsyncTimeoutError: + discard + return results.ok() # Helper function forward declaration proc checkConvergence(state: LookupState, me: PeerId): bool {.raises: [], gcsafe.} @@ -297,29 +291,38 @@ proc bootstrap*( try: await kad.switch.connect(b.peerId, b.addrs) debug "connected to bootstrap peer", peerId = b.peerId - except CatchableError as e: - error "failed to connect to bootstrap peer", peerId = b.peerId, error = e.msg + except DialFailedError as e: + # at some point will want to bubble up a Result[void, SomeErrorEnum] + error "failed to dial to bootstrap peer", peerId = b.peerId, error = e.msg + continue - try: - let msg = await kad.sendFindNode(b.peerId, b.addrs, kad.rtable.selfId).wait( - chronos.seconds(5) - ) - for peer in msg.closerPeers: - let p = PeerId.init(peer.id).tryGet() - discard kad.rtable.insert(p) + let msg = + try: + await kad.sendFindNode(b.peerId, b.addrs, kad.rtable.selfId).wait( + chronos.seconds(5) + ) + except CatchableError as e: + debug "send find node exception during bootstrap", + target = b.peerId, addrs = b.addrs, err = e.msg + continue + for peer in msg.closerPeers: + let p = PeerId.init(peer.id).valueOr: + debug "invalid peer id received", error = error + continue + discard kad.rtable.insert(p) + try: kad.switch.peerStore[AddressBook][p] = peer.addrs + except: + error "this is here because an ergonomic means of keying into a table without exceptions is unknown" - # bootstrap node replied succesfully. Adding to routing table - discard kad.rtable.insert(b.peerId) - except CatchableError as e: - error "bootstrap failed for peer", peerId = b.peerId, exc = e.msg + # bootstrap node replied succesfully. Adding to routing table + discard kad.rtable.insert(b.peerId) - try: - # Adding some random node to prepopulate the table - discard await kad.findNode(PeerId.random(kad.rng).tryGet().toKey()) - info "bootstrap lookup complete" - except CatchableError as e: - error "bootstrap lookup failed", error = e.msg + let key = PeerId.random(kad.rng).valueOr: + doAssert(false, "this should never happen") + return + discard await kad.findNode(key.toKey()) + info "bootstrap lookup complete" proc refreshBuckets(kad: KadDHT) {.async: (raises: [CancelledError]).} = for i in 0 ..< kad.rtable.buckets.len: @@ -352,77 +355,87 @@ proc new*( kad.handler = proc( conn: Connection, proto: string ) {.async: (raises: [CancelledError]).} = - try: - defer: - await conn.close() - while not conn.atEof: - let - buf = await conn.readLp(MaxMsgSize) - msg = Message.decode(buf).tryGet() + defer: + await conn.close() + while not conn.atEof: + let buf = + try: + await conn.readLp(MaxMsgSize) + except LPStreamError as e: + debug "Read error when handling kademlia RPC", conn = conn, err = e.msg + return + let msg = Message.decode(buf).valueOr: + debug "msg decode error handling kademlia RPC", err = error + return - case msg.msgType - of MessageType.findNode: - let targetIdBytes = msg.key.valueOr: - error "findNode message without key data present", msg = msg, conn = conn - return - let targetId = PeerId.init(targetIdBytes).valueOr: - error "findNode message without valid key data", msg = msg, conn = conn - return - let closerPeers = kad.rtable - .findClosest(targetId.toKey(), DefaultReplic) - # exclude the node requester because telling a peer about itself does not reduce the distance, - .filterIt(it != conn.peerId.toKey()) + case msg.msgType + of MessageType.findNode: + let targetIdBytes = msg.key.valueOr: + error "findNode message without key data present", msg = msg, conn = conn + return + let targetId = PeerId.init(targetIdBytes).valueOr: + error "findNode message without valid key data", msg = msg, conn = conn + return + let closerPeers = kad.rtable + .findClosest(targetId.toKey(), DefaultReplic) + # exclude the node requester because telling a peer about itself does not reduce the distance, + .filterIt(it != conn.peerId.toKey()) - let responsePb = encodeFindNodeReply(closerPeers, switch) + let responsePb = encodeFindNodeReply(closerPeers, switch) + try: await conn.writeLp(responsePb.buffer) + except LPStreamError as e: + debug "write error when writing kad find-node RPC reply", + conn = conn, err = e.msg + return - # Peer is useful. adding to rtable - discard kad.rtable.insert(conn.peerId) - of MessageType.putValue: - let record = msg.record.valueOr: - error "no record in message buffer", msg = msg, conn = conn + # Peer is useful. adding to rtable + discard kad.rtable.insert(conn.peerId) + of MessageType.putValue: + let record = msg.record.valueOr: + error "no record in message buffer", msg = msg, conn = conn + return + let (skey, svalue) = + if record.key.isSome() and record.value.isSome(): + (record.key.unsafeGet(), record.value.unsafeGet()) + else: + error "no key or no value in rpc buffer", msg = msg, conn = conn return - let (skey, svalue) = - if record.key.isSome() and record.value.isSome(): - (record.key.unsafeGet(), record.value.unsafeGet()) - else: - error "no key or no value in rpc buffer", msg = msg, conn = conn - return - let key = EntryKey.init(skey) - let value = EntryValue.init(svalue) + let key = EntryKey.init(skey) + let value = EntryValue.init(svalue) - # Value sanatisation done. Start insertion process - if not kad.entryValidator.isValid(key, value): - return + # Value sanitisation done. Start insertion process + if not kad.entryValidator.isValid(key, value): + return - let others = - if kad.dataTable.entries.contains(key): - @[kad.dataTable.entries[key]] - else: - @[] - let candRec = EntryRecord.init(value, none(TimeStamp)) - let selectedRec = kad.entrySelector.select(candRec, others).valueOr: - error "application provided selector error", msg = error, conn = conn - return - trace "putval handler selection", - cand = candRec, others = others, selected = selectedRec + let others = + if kad.dataTable.entries.contains(key): + # need to do this shenans in order to avoid exceptions. + @[kad.dataTable.entries.getOrDefault(key)] + else: + @[] + let candRec = EntryRecord.init(value, none(TimeStamp)) + let selectedRec = kad.entrySelector.select(candRec, others).valueOr: + error "application provided selector error", msg = error, conn = conn + return + trace "putval handler selection", + cand = candRec, others = others, selected = selectedRec - # Assume that if selection goes with another value, that it is valid - let validated = ValidatedEntry(key: key, value: selectedRec.value) + # Assume that if selection goes with another value, that it is valid + let validated = ValidatedEntry(key: key, value: selectedRec.value) - kad.dataTable.insert(validated, selectedRec.time) - # consistent with following link, echo message without change - # https://github.com/libp2p/js-libp2p/blob/cf9aab5c841ec08bc023b9f49083c95ad78a7a07/packages/kad-dht/src/rpc/handlers/put-value.ts#L22 + kad.dataTable.insert(validated, selectedRec.time) + # consistent with following link, echo message without change + # https://github.com/libp2p/js-libp2p/blob/cf9aab5c841ec08bc023b9f49083c95ad78a7a07/packages/kad-dht/src/rpc/handlers/put-value.ts#L22 + try: await conn.writeLp(buf) - else: - raise newException(LPError, "unhandled kad-dht message type") - except CancelledError as exc: - raise exc - except CatchableError: - discard - # TODO: figure out why this fails: - # error "could not handle request", - # peerId = conn.PeerId, err = getCurrentExceptionMsg() + except LPStreamError as e: + debug "write error when writing kad find-node RPC reply", + conn = conn, err = e.msg + return + else: + error "unhandled kad-dht message type", msg = msg + return return kad proc setSelector*(kad: KadDHT, selector: EntrySelector) =