chore: add localAddr to Connection (#1651)

This commit is contained in:
Gabriel Cruz
2025-09-01 15:39:08 -03:00
committed by GitHub
parent 6059ee8332
commit 7ed62461d7
8 changed files with 75 additions and 30 deletions

View File

@@ -62,8 +62,15 @@ proc init*(
dir: Direction,
timeout = DefaultChronosStreamTimeout,
observedAddr: Opt[MultiAddress],
localAddr: Opt[MultiAddress],
): ChronosStream =
result = C(client: client, timeout: timeout, dir: dir, observedAddr: observedAddr)
result = C(
client: client,
timeout: timeout,
dir: dir,
observedAddr: observedAddr,
localAddr: localAddr,
)
result.initStream()
template withExceptions(body: untyped) =

View File

@@ -33,6 +33,7 @@ type
timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId
observedAddr*: Opt[MultiAddress]
localAddr*: Opt[MultiAddress]
protocol*: string # protocol used by the connection, used as metrics tag
transportDir*: Direction # underlying transport (usually socket) direction
when defined(libp2p_agents_metrics):
@@ -148,7 +149,8 @@ proc new*(
C: type Connection,
peerId: PeerId,
dir: Direction,
observedAddr: Opt[MultiAddress],
observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
localAddr: Opt[MultiAddress] = Opt.none(MultiAddress),
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil,
): Connection =
@@ -158,6 +160,7 @@ proc new*(
timeout: timeout,
timeoutHandler: timeoutHandler,
observedAddr: observedAddr,
localAddr: localAddr,
)
result.initStream()

View File

@@ -36,9 +36,14 @@ type QuicStream* = ref object of P2PConnection
cached: seq[byte]
proc new(
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId
_: type QuicStream,
stream: Stream,
oaddr: Opt[MultiAddress],
laddr: Opt[MultiAddress],
peerId: PeerId,
): QuicStream =
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId)
let quicstream =
QuicStream(stream: stream, observedAddr: oaddr, localAddr: laddr, peerId: peerId)
procCall P2PConnection(quicstream).initStream()
quicstream
@@ -120,7 +125,8 @@ proc getStream*(
stream = await session.connection.openStream()
await stream.write(@[]) # QUIC streams do not exist until data is sent
let qs = QuicStream.new(stream, session.observedAddr, session.peerId)
let qs =
QuicStream.new(stream, session.observedAddr, session.localAddr, session.peerId)
when defined(libp2p_agents_metrics):
qs.shortAgent = session.shortAgent
@@ -300,11 +306,17 @@ proc wrapConnection(
transport: QuicTransport, connection: QuicConnection
): QuicSession {.raises: [TransportOsError, MaError].} =
let
remoteAddr = connection.remoteAddress()
observedAddr =
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() &
MultiAddress.init(connection.remoteAddress(), IPPROTO_UDP).get() &
MultiAddress.init("/quic-v1").get()
session = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
localAddr =
MultiAddress.init(connection.localAddress(), IPPROTO_UDP).get() &
MultiAddress.init("/quic-v1").get()
session = QuicSession(
connection: connection,
observedAddr: Opt.some(observedAddr),
localAddr: Opt.some(localAddr),
)
session.initStream()

View File

@@ -47,6 +47,7 @@ proc connHandler*(
self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
localAddr: Opt[MultiAddress],
dir: Direction,
): Connection =
trace "Handling tcp connection",
@@ -59,6 +60,7 @@ proc connHandler*(
client = client,
dir = dir,
observedAddr = observedAddr,
localAddr = localAddr,
timeout = self.connectionsTimeout,
)
)
@@ -267,18 +269,22 @@ method accept*(
safeCloseWait(transp)
raise newTransportClosedError()
let remote =
let (localAddr, observedAddr) =
try:
transp.remoteAddress
(
MultiAddress.init(transp.localAddress).expect(
"Can initialize from local address"
),
MultiAddress.init(transp.remoteAddress).expect(
"Can initialize from remote address"
),
)
except TransportOsError as exc:
# The connection had errors / was closed before `await` returned control
safeCloseWait(transp)
debug "Cannot read remote address", description = exc.msg
debug "Cannot read address", description = exc.msg
return nil
let observedAddr =
MultiAddress.init(remote).expect("Can initialize from remote address")
self.connHandler(transp, Opt.some(observedAddr), Direction.In)
self.connHandler(transp, Opt.some(observedAddr), Opt.some(localAddr), Direction.In)
method dial*(
self: TcpTransport,
@@ -320,14 +326,17 @@ method dial*(
safeCloseWait(transp)
raise newTransportClosedError()
let observedAddr =
let (observedAddr, localAddr) =
try:
MultiAddress.init(transp.remoteAddress).expect("remote address is valid")
(
MultiAddress.init(transp.remoteAddress).expect("remote address is valid"),
MultiAddress.init(transp.localAddress).expect("local address is valid"),
)
except TransportOsError as exc:
safeCloseWait(transp)
raise (ref TcpTransportError)(msg: "MultiAddress.init error in dial: " & exc.msg)
self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
self.connHandler(transp, Opt.some(observedAddr), Opt.some(localAddr), Direction.Out)
method handles*(t: TcpTransport, address: MultiAddress): bool {.raises: [].} =
if procCall Transport(t).handles(address):

View File

@@ -237,7 +237,9 @@ method dial*(
try:
transp = await connectToTorServer(self.transportAddress)
await dialPeer(transp, address)
return self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
return self.tcpTransport.connHandler(
transp, Opt.none(MultiAddress), Opt.none(MultiAddress), Direction.Out
)
except CancelledError as e:
safeCloseWait(transp)
raise e

View File

@@ -55,10 +55,16 @@ proc new*(
session: WSSession,
dir: Direction,
observedAddr: Opt[MultiAddress],
localAddr: Opt[MultiAddress],
timeout = 10.minutes,
): T =
let stream =
T(session: session, timeout: timeout, dir: dir, observedAddr: observedAddr)
let stream = T(
session: session,
timeout: timeout,
dir: dir,
observedAddr: observedAddr,
localAddr: localAddr,
)
stream.initStream()
return stream
@@ -239,9 +245,8 @@ proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async: (raises: [CatchableError]).} =
## Returning CatchableError is fine because we later handle different exceptions.
##
let observedAddr =
let (observedAddr, localAddr) =
try:
let
codec =
@@ -250,15 +255,19 @@ proc connHandler(
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
localAddr = stream.stream.reader.tsource.localAddress
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet()
(
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(),
MultiAddress.init(localAddr).tryGet() & codec.tryGet(),
)
except CatchableError as exc:
trace "Failed to create observedAddr", description = exc.msg
trace "Failed to create observedAddr or listenAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
safeClose(stream)
raise exc
let conn = WsStream.new(stream, dir, Opt.some(observedAddr))
let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr))
self.connections[dir].add(conn)
proc onClose() {.async: (raises: []).} =

View File

@@ -32,6 +32,9 @@ template commonTransportTest*(prov: TransportBuilder, ma1: string, ma2: string =
let conn = await transport1.accept()
if conn.observedAddr.isSome():
check transport1.handles(conn.observedAddr.get())
# skip IP check, only check transport and port
check conn.localAddr.get()[3] == transport1.addrs[0][3]
check conn.localAddr.get()[4] == transport1.addrs[0][4]
await conn.close()
let handlerWait = acceptHandler()

View File

@@ -17,7 +17,7 @@ import ../libp2p/[connmanager, stream/connection, crypto/crypto, muxers/muxer, p
import helpers
proc getMuxer(peerId: PeerId, dir: Direction = Direction.In): Muxer =
return Muxer(connection: Connection.new(peerId, dir, Opt.none(MultiAddress)))
return Muxer(connection: Connection.new(peerId, dir))
type TestMuxer = ref object of Muxer
peerId: PeerId
@@ -25,7 +25,7 @@ type TestMuxer = ref object of Muxer
method newStream*(
m: TestMuxer, name: string = "", lazy: bool = false
): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress))
Connection.new(m.peerId, Direction.Out)
suite "Connection Manager":
teardown:
@@ -124,7 +124,7 @@ suite "Connection Manager":
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
let connection = Connection.new(peerId, Direction.In)
muxer.peerId = peerId
muxer.connection = connection
@@ -144,7 +144,7 @@ suite "Connection Manager":
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
let connection = Connection.new(peerId, Direction.In)
muxer.peerId = peerId
muxer.connection = connection