refactor(kad): Refine, and reduce, exception scope (#1627)

This commit is contained in:
Ben
2025-08-25 13:33:26 +02:00
committed by GitHub
parent 955e28ff70
commit 25a8ed4d07

View File

@@ -165,41 +165,35 @@ proc putValue*(
if not kad.entryValidator.isValid(entKey, value):
return err("invalid key/value pair")
let others: seq[EntryRecord] =
if entKey in kad.dataTable.entries:
@[kad.dataTable.entries.getOrDefault(entKey)]
else:
@[]
let candAsRec = EntryRecord.init(value, none(TimeStamp))
let confirmedRec = kad.entrySelector.select(candAsRec, others).valueOr:
error "application provided selector error (local)", msg = error
return err(error)
trace "local putval", candidate = candAsRec, others = others, selected = confirmedRec
let validEnt = ValidatedEntry.init(entKey, confirmedRec.value)
let peers = await kad.findNode(entKey.data.toKey())
# We first prime the sends so the data is ready to go
let rpcBatch = peers.mapIt(kad.dispatchPutVal(it, validEnt))
# then we do the `move`, as insert takes the data as `sink`
kad.dataTable.insert(validEnt, confirmedRec.time)
try:
let others: seq[EntryRecord] =
if entKey in kad.dataTable.entries:
@[kad.dataTable.entries[entKey]]
else:
@[]
let candAsRec = EntryRecord.init(value, none(TimeStamp))
let confirmedRec = kad.entrySelector.select(candAsRec, others).valueOr:
error "application provided selector error (local)", msg = error
return err(error)
trace "local putval",
candidate = candAsRec, others = others, selected = confirmedRec
let validEnt = ValidatedEntry.init(entKey, confirmedRec.value)
let peers = await kad.findNode(entKey.data.toKey())
# We first prime the sends so the data is ready to go
let rpcBatch = peers.mapIt(kad.dispatchPutVal(it, validEnt))
# then we do the `move`, as insert takes the data as `sink`
kad.dataTable.insert(validEnt, confirmedRec.time)
try:
# now that the all the data is where it needs to be in memory, we can dispatch the
# RPCs
await rpcBatch.allFutures().wait(chronos.seconds(timeout.get(5)))
# now that the all the data is where it needs to be in memory, we can dispatch the
# RPCs
await rpcBatch.allFutures().wait(chronos.seconds(timeout.get(5)))
# It's quite normal for the dispatch to timeout, as it would require all calls to get
# their response. Downstream users may desire some sort of functionality in the
# future to get rpc telemetry, but in the meantime, we just move on...
except AsyncTimeoutError:
discard
return results.ok()
except CatchableError as e:
return err("todo: refine exceptions - " & e.msg)
except AsyncTimeoutError:
discard
return results.ok()
# Helper function forward declaration
proc checkConvergence(state: LookupState, me: PeerId): bool {.raises: [], gcsafe.}
@@ -297,29 +291,38 @@ proc bootstrap*(
try:
await kad.switch.connect(b.peerId, b.addrs)
debug "connected to bootstrap peer", peerId = b.peerId
except CatchableError as e:
error "failed to connect to bootstrap peer", peerId = b.peerId, error = e.msg
except DialFailedError as e:
# at some point will want to bubble up a Result[void, SomeErrorEnum]
error "failed to dial to bootstrap peer", peerId = b.peerId, error = e.msg
continue
try:
let msg = await kad.sendFindNode(b.peerId, b.addrs, kad.rtable.selfId).wait(
chronos.seconds(5)
)
for peer in msg.closerPeers:
let p = PeerId.init(peer.id).tryGet()
discard kad.rtable.insert(p)
let msg =
try:
await kad.sendFindNode(b.peerId, b.addrs, kad.rtable.selfId).wait(
chronos.seconds(5)
)
except CatchableError as e:
debug "send find node exception during bootstrap",
target = b.peerId, addrs = b.addrs, err = e.msg
continue
for peer in msg.closerPeers:
let p = PeerId.init(peer.id).valueOr:
debug "invalid peer id received", error = error
continue
discard kad.rtable.insert(p)
try:
kad.switch.peerStore[AddressBook][p] = peer.addrs
except:
error "this is here because an ergonomic means of keying into a table without exceptions is unknown"
# bootstrap node replied succesfully. Adding to routing table
discard kad.rtable.insert(b.peerId)
except CatchableError as e:
error "bootstrap failed for peer", peerId = b.peerId, exc = e.msg
# bootstrap node replied succesfully. Adding to routing table
discard kad.rtable.insert(b.peerId)
try:
# Adding some random node to prepopulate the table
discard await kad.findNode(PeerId.random(kad.rng).tryGet().toKey())
info "bootstrap lookup complete"
except CatchableError as e:
error "bootstrap lookup failed", error = e.msg
let key = PeerId.random(kad.rng).valueOr:
doAssert(false, "this should never happen")
return
discard await kad.findNode(key.toKey())
info "bootstrap lookup complete"
proc refreshBuckets(kad: KadDHT) {.async: (raises: [CancelledError]).} =
for i in 0 ..< kad.rtable.buckets.len:
@@ -352,77 +355,87 @@ proc new*(
kad.handler = proc(
conn: Connection, proto: string
) {.async: (raises: [CancelledError]).} =
try:
defer:
await conn.close()
while not conn.atEof:
let
buf = await conn.readLp(MaxMsgSize)
msg = Message.decode(buf).tryGet()
defer:
await conn.close()
while not conn.atEof:
let buf =
try:
await conn.readLp(MaxMsgSize)
except LPStreamError as e:
debug "Read error when handling kademlia RPC", conn = conn, err = e.msg
return
let msg = Message.decode(buf).valueOr:
debug "msg decode error handling kademlia RPC", err = error
return
case msg.msgType
of MessageType.findNode:
let targetIdBytes = msg.key.valueOr:
error "findNode message without key data present", msg = msg, conn = conn
return
let targetId = PeerId.init(targetIdBytes).valueOr:
error "findNode message without valid key data", msg = msg, conn = conn
return
let closerPeers = kad.rtable
.findClosest(targetId.toKey(), DefaultReplic)
# exclude the node requester because telling a peer about itself does not reduce the distance,
.filterIt(it != conn.peerId.toKey())
case msg.msgType
of MessageType.findNode:
let targetIdBytes = msg.key.valueOr:
error "findNode message without key data present", msg = msg, conn = conn
return
let targetId = PeerId.init(targetIdBytes).valueOr:
error "findNode message without valid key data", msg = msg, conn = conn
return
let closerPeers = kad.rtable
.findClosest(targetId.toKey(), DefaultReplic)
# exclude the node requester because telling a peer about itself does not reduce the distance,
.filterIt(it != conn.peerId.toKey())
let responsePb = encodeFindNodeReply(closerPeers, switch)
let responsePb = encodeFindNodeReply(closerPeers, switch)
try:
await conn.writeLp(responsePb.buffer)
except LPStreamError as e:
debug "write error when writing kad find-node RPC reply",
conn = conn, err = e.msg
return
# Peer is useful. adding to rtable
discard kad.rtable.insert(conn.peerId)
of MessageType.putValue:
let record = msg.record.valueOr:
error "no record in message buffer", msg = msg, conn = conn
# Peer is useful. adding to rtable
discard kad.rtable.insert(conn.peerId)
of MessageType.putValue:
let record = msg.record.valueOr:
error "no record in message buffer", msg = msg, conn = conn
return
let (skey, svalue) =
if record.key.isSome() and record.value.isSome():
(record.key.unsafeGet(), record.value.unsafeGet())
else:
error "no key or no value in rpc buffer", msg = msg, conn = conn
return
let (skey, svalue) =
if record.key.isSome() and record.value.isSome():
(record.key.unsafeGet(), record.value.unsafeGet())
else:
error "no key or no value in rpc buffer", msg = msg, conn = conn
return
let key = EntryKey.init(skey)
let value = EntryValue.init(svalue)
let key = EntryKey.init(skey)
let value = EntryValue.init(svalue)
# Value sanatisation done. Start insertion process
if not kad.entryValidator.isValid(key, value):
return
# Value sanitisation done. Start insertion process
if not kad.entryValidator.isValid(key, value):
return
let others =
if kad.dataTable.entries.contains(key):
@[kad.dataTable.entries[key]]
else:
@[]
let candRec = EntryRecord.init(value, none(TimeStamp))
let selectedRec = kad.entrySelector.select(candRec, others).valueOr:
error "application provided selector error", msg = error, conn = conn
return
trace "putval handler selection",
cand = candRec, others = others, selected = selectedRec
let others =
if kad.dataTable.entries.contains(key):
# need to do this shenans in order to avoid exceptions.
@[kad.dataTable.entries.getOrDefault(key)]
else:
@[]
let candRec = EntryRecord.init(value, none(TimeStamp))
let selectedRec = kad.entrySelector.select(candRec, others).valueOr:
error "application provided selector error", msg = error, conn = conn
return
trace "putval handler selection",
cand = candRec, others = others, selected = selectedRec
# Assume that if selection goes with another value, that it is valid
let validated = ValidatedEntry(key: key, value: selectedRec.value)
# Assume that if selection goes with another value, that it is valid
let validated = ValidatedEntry(key: key, value: selectedRec.value)
kad.dataTable.insert(validated, selectedRec.time)
# consistent with following link, echo message without change
# https://github.com/libp2p/js-libp2p/blob/cf9aab5c841ec08bc023b9f49083c95ad78a7a07/packages/kad-dht/src/rpc/handlers/put-value.ts#L22
kad.dataTable.insert(validated, selectedRec.time)
# consistent with following link, echo message without change
# https://github.com/libp2p/js-libp2p/blob/cf9aab5c841ec08bc023b9f49083c95ad78a7a07/packages/kad-dht/src/rpc/handlers/put-value.ts#L22
try:
await conn.writeLp(buf)
else:
raise newException(LPError, "unhandled kad-dht message type")
except CancelledError as exc:
raise exc
except CatchableError:
discard
# TODO: figure out why this fails:
# error "could not handle request",
# peerId = conn.PeerId, err = getCurrentExceptionMsg()
except LPStreamError as e:
debug "write error when writing kad find-node RPC reply",
conn = conn, err = e.msg
return
else:
error "unhandled kad-dht message type", msg = msg
return
return kad
proc setSelector*(kad: KadDHT, selector: EntrySelector) =