feat(kad-dht): protobuffers (#1453)

This commit is contained in:
richΛrd
2025-06-11 08:56:02 -04:00
committed by GitHub
parent 1d4c261d2a
commit 888cb78331
3 changed files with 303 additions and 0 deletions

View File

@@ -0,0 +1,159 @@
import ../../protobuf/minprotobuf
import ../../varint
import ../../utility
import results
import ../../multiaddress
import stew/objects
import stew/assign2
import options
type
Record* {.public.} = object
key*: Option[seq[byte]]
value*: Option[seq[byte]]
timeReceived*: Option[string]
MessageType* = enum
putValue = 0
getValue = 1
addProvider = 2
getProviders = 3
findNode = 4
ping = 5 # Deprecated
ConnectionType* = enum
notConnected = 0
connected = 1
canConnect = 2 # Unused
cannotConnect = 3 # Unused
Peer* {.public.} = object
id*: seq[byte]
addrs*: seq[MultiAddress]
connection*: ConnectionType
Message* {.public.} = object
msgType*: MessageType
key*: Option[seq[byte]]
record*: Option[Record]
closerPeers*: seq[Peer]
providerPeers*: seq[Peer]
proc write*(pb: var ProtoBuffer, field: int, value: Record) {.raises: [].}
proc writeOpt*[T](pb: var ProtoBuffer, field: int, opt: Option[T]) {.raises: [].}
proc encode*(record: Record): ProtoBuffer {.raises: [].} =
var pb = initProtoBuffer()
pb.writeOpt(1, record.key)
pb.writeOpt(2, record.value)
pb.writeOpt(5, record.timeReceived)
pb.finish()
return pb
proc encode*(peer: Peer): ProtoBuffer {.raises: [].} =
var pb = initProtoBuffer()
pb.write(1, peer.id)
for address in peer.addrs:
pb.write(2, address.data.buffer)
pb.write(3, uint32(ord(peer.connection)))
pb.finish()
return pb
proc encode*(msg: Message): ProtoBuffer {.raises: [].} =
var pb = initProtoBuffer()
pb.write(1, uint32(ord(msg.msgType)))
pb.writeOpt(2, msg.key)
msg.record.withValue(record):
pb.writeOpt(3, msg.record)
for peer in msg.closerPeers:
pb.write(8, peer.encode())
for peer in msg.providerPeers:
pb.write(9, peer.encode())
pb.finish()
return pb
proc writeOpt*[T](pb: var ProtoBuffer, field: int, opt: Option[T]) {.raises: [].} =
opt.withValue(v):
pb.write(field, v)
proc write*(pb: var ProtoBuffer, field: int, value: Record) {.raises: [].} =
pb.write(field, value.encode())
proc getOptionField[T: ProtoScalar | string | seq[byte]](
pb: ProtoBuffer, field: int, output: var Option[T]
): ProtoResult[void] =
var f: T
if ?pb.getField(field, f):
assign(output, some(f))
ok()
proc decode*(T: type Record, pb: ProtoBuffer): ProtoResult[Option[T]] =
var r: Record
?pb.getOptionField(1, r.key)
?pb.getOptionField(2, r.value)
?pb.getOptionField(5, r.timeReceived)
return ok(some(r))
proc decode*(T: type Peer, pb: ProtoBuffer): ProtoResult[Option[T]] =
var
p: Peer
id: seq[byte]
?pb.getRequiredField(1, p.id)
discard ?pb.getRepeatedField(2, p.addrs)
var connVal: uint32
if ?pb.getField(3, connVal):
var connType: ConnectionType
if not checkedEnumAssign(connType, connVal):
return err(ProtoError.BadWireType)
p.connection = connType
return ok(some(p))
proc decode*(T: type Message, buf: seq[byte]): ProtoResult[Option[T]] =
var
m: Message
key: seq[byte]
recPb: seq[byte]
closerPbs: seq[seq[byte]]
providerPbs: seq[seq[byte]]
var pb = initProtoBuffer(buf)
var msgTypeVal: uint32
?pb.getRequiredField(1, msgTypeVal)
var msgType: MessageType
if not checkedEnumAssign(msgType, msgTypeVal):
return err(ProtoError.BadWireType)
m.msgType = msgType
?pb.getOptionField(2, m.key)
if ?pb.getField(3, recPb):
assign(m.record, ?Record.decode(initProtoBuffer(recPb)))
discard ?pb.getRepeatedField(8, closerPbs)
for ppb in closerPbs:
let peerOpt = ?Peer.decode(initProtoBuffer(ppb))
peerOpt.withValue(peer):
m.closerPeers.add(peer)
discard ?pb.getRepeatedField(9, providerPbs)
for ppb in providerPbs:
let peer = ?Peer.decode(initProtoBuffer(ppb))
peer.withValue(peer):
m.providerPeers.add(peer)
return ok(some(m))

View File

@@ -0,0 +1,142 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import ../../libp2p/protobuf/minprotobuf
import ../../libp2p/protocols/kademlia/protobuf
import ../../libp2p/multiaddress
import options
import results
suite "kademlia protobuffers":
const invalidType = uint32(999)
proc valFromResultOption[T](res: ProtoResult[Option[T]]): T =
assert res.isOk()
assert res.value().isSome()
return res.value().unsafeGet()
test "record encode/decode":
let rec = Record(
key: some(@[1'u8, 2, 3]),
value: some(@[4'u8, 5, 6]),
timeReceived: some("2025-05-12T12:00:00Z"),
)
let encoded = rec.encode()
let decoded = Record.decode(encoded).valFromResultOption
check:
decoded.key.get() == rec.key.get()
decoded.value.get() == rec.value.get()
decoded.timeReceived.get() == rec.timeReceived.get()
test "peer encode/decode":
let maddr = MultiAddress.init("/ip4/127.0.0.1/tcp/9000").tryGet()
let peer =
Peer(id: @[1'u8, 2, 3], addrs: @[maddr], connection: ConnectionType.connected)
let encoded = peer.encode()
var decoded = Peer.decode(initProtoBuffer(encoded.buffer)).valFromResultOption
check:
decoded == peer
test "message encode/decode roundtrip":
let maddr = MultiAddress.init("/ip4/10.0.0.1/tcp/4001").tryGet()
let peer = Peer(id: @[9'u8], addrs: @[maddr], connection: canConnect)
let r = Record(key: some(@[1'u8]), value: some(@[2'u8]), timeReceived: some("t"))
let msg = Message(
msgType: MessageType.findNode,
key: some(@[7'u8]),
record: some(r),
closerPeers: @[peer],
providerPeers: @[peer],
)
let encoded = msg.encode()
let decoded = Message.decode(encoded.buffer).valFromResultOption
check:
decoded == msg
test "decode record with missing fields":
var pb = initProtoBuffer()
# no fields written
let rec = Record.decode(pb).valFromResultOption
check:
rec.key.isNone()
rec.value.isNone()
rec.timeReceived.isNone()
test "decode peer with missing id (invalid)":
var pb = initProtoBuffer()
check:
Peer.decode(pb).isErr()
test "decode peer with invalid connection type":
var pb = initProtoBuffer()
pb.write(1, @[1'u8, 2, 3]) # id field
pb.write(3, invalidType) # bogus connection type
check:
Peer.decode(pb).isErr()
test "decode message with invalid msgType":
var pb = initProtoBuffer()
pb.write(1, invalidType) # invalid MessageType
check:
Message.decode(pb.buffer).isErr()
test "decode message with invalid peer in closerPeers":
let badPeerBuf = @[0'u8, 1, 2] # junk
var pb = initProtoBuffer()
pb.write(8, badPeerBuf) # closerPeers field
check:
Message.decode(pb.buffer).isErr()
test "decode message with invalid embedded record":
# encode junk data into field 3 (record)
var pb = initProtoBuffer()
pb.write(1, uint32(MessageType.putValue)) # valid msgType
pb.write(3, @[0x00'u8, 0xFF, 0xAB]) # broken protobuf for record
check:
Message.decode(pb.buffer).isErr()
test "decode message with empty embedded record":
var recordPb = initProtoBuffer() # no fields
var pb = initProtoBuffer()
pb.write(1, uint32(MessageType.getValue))
pb.write(3, recordPb.buffer)
let decoded = Message.decode(pb.buffer).valFromResultOption
check:
decoded.record.isSome()
decoded.record.get().key.isNone()
test "peer with empty addr list and no connection":
let peer = Peer(id: @[0x42'u8], addrs: @[], connection: ConnectionType.notConnected)
let encoded = peer.encode()
let decoded = Peer.decode(initProtoBuffer(encoded.buffer)).valFromResultOption
check:
decoded == peer
test "message with empty closer/provider peers":
let msg = Message(
msgType: MessageType.ping,
key: none[seq[byte]](),
record: none[Record](),
closerPeers: @[],
providerPeers: @[],
)
let encoded = msg.encode()
let decoded = Message.decode(encoded.buffer).valFromResultOption
check:
decoded == msg
test "peer with addr but missing id":
var pb = initProtoBuffer()
let maddr = MultiAddress.init("/ip4/1.2.3.4/tcp/1234").tryGet()
pb.write(2, maddr.data.buffer)
check:
Peer.decode(pb).isErr()

View File

@@ -33,3 +33,5 @@ import
testpeerstore, testping, testmplex, testrelayv1, testrelayv2, testrendezvous,
testdiscovery, testyamux, testautonat, testautonatservice, testautorelay, testdcutr,
testhpservice, testutility, testhelpers, testwildcardresolverservice
import kademlia/testencoding