diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 1a213705e..b2283ccf3 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -62,8 +62,15 @@ proc init*( dir: Direction, timeout = DefaultChronosStreamTimeout, observedAddr: Opt[MultiAddress], + localAddr: Opt[MultiAddress], ): ChronosStream = - result = C(client: client, timeout: timeout, dir: dir, observedAddr: observedAddr) + result = C( + client: client, + timeout: timeout, + dir: dir, + observedAddr: observedAddr, + localAddr: localAddr, + ) result.initStream() template withExceptions(body: untyped) = diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 0d05a8416..3fb5bab77 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -33,6 +33,7 @@ type timeoutHandler*: TimeoutHandler # timeout handler peerId*: PeerId observedAddr*: Opt[MultiAddress] + localAddr*: Opt[MultiAddress] protocol*: string # protocol used by the connection, used as metrics tag transportDir*: Direction # underlying transport (usually socket) direction when defined(libp2p_agents_metrics): @@ -148,7 +149,8 @@ proc new*( C: type Connection, peerId: PeerId, dir: Direction, - observedAddr: Opt[MultiAddress], + observedAddr: Opt[MultiAddress] = Opt.none(MultiAddress), + localAddr: Opt[MultiAddress] = Opt.none(MultiAddress), timeout: Duration = DefaultConnectionTimeout, timeoutHandler: TimeoutHandler = nil, ): Connection = @@ -158,6 +160,7 @@ proc new*( timeout: timeout, timeoutHandler: timeoutHandler, observedAddr: observedAddr, + localAddr: localAddr, ) result.initStream() diff --git a/libp2p/transports/quictransport.nim b/libp2p/transports/quictransport.nim index de30b49b4..57730917f 100644 --- a/libp2p/transports/quictransport.nim +++ b/libp2p/transports/quictransport.nim @@ -36,9 +36,14 @@ type QuicStream* = ref object of P2PConnection cached: seq[byte] proc new( - _: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId + _: type QuicStream, + stream: Stream, + oaddr: Opt[MultiAddress], + laddr: Opt[MultiAddress], + peerId: PeerId, ): QuicStream = - let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) + let quicstream = + QuicStream(stream: stream, observedAddr: oaddr, localAddr: laddr, peerId: peerId) procCall P2PConnection(quicstream).initStream() quicstream @@ -120,7 +125,8 @@ proc getStream*( stream = await session.connection.openStream() await stream.write(@[]) # QUIC streams do not exist until data is sent - let qs = QuicStream.new(stream, session.observedAddr, session.peerId) + let qs = + QuicStream.new(stream, session.observedAddr, session.localAddr, session.peerId) when defined(libp2p_agents_metrics): qs.shortAgent = session.shortAgent @@ -300,11 +306,17 @@ proc wrapConnection( transport: QuicTransport, connection: QuicConnection ): QuicSession {.raises: [TransportOsError, MaError].} = let - remoteAddr = connection.remoteAddress() observedAddr = - MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & + MultiAddress.init(connection.remoteAddress(), IPPROTO_UDP).get() & MultiAddress.init("/quic-v1").get() - session = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) + localAddr = + MultiAddress.init(connection.localAddress(), IPPROTO_UDP).get() & + MultiAddress.init("/quic-v1").get() + session = QuicSession( + connection: connection, + observedAddr: Opt.some(observedAddr), + localAddr: Opt.some(localAddr), + ) session.initStream() diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 8bd96d34a..f22af075c 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -47,6 +47,7 @@ proc connHandler*( self: TcpTransport, client: StreamTransport, observedAddr: Opt[MultiAddress], + localAddr: Opt[MultiAddress], dir: Direction, ): Connection = trace "Handling tcp connection", @@ -59,6 +60,7 @@ proc connHandler*( client = client, dir = dir, observedAddr = observedAddr, + localAddr = localAddr, timeout = self.connectionsTimeout, ) ) @@ -267,18 +269,22 @@ method accept*( safeCloseWait(transp) raise newTransportClosedError() - let remote = + let (localAddr, observedAddr) = try: - transp.remoteAddress + ( + MultiAddress.init(transp.localAddress).expect( + "Can initialize from local address" + ), + MultiAddress.init(transp.remoteAddress).expect( + "Can initialize from remote address" + ), + ) except TransportOsError as exc: # The connection had errors / was closed before `await` returned control safeCloseWait(transp) - debug "Cannot read remote address", description = exc.msg + debug "Cannot read address", description = exc.msg return nil - - let observedAddr = - MultiAddress.init(remote).expect("Can initialize from remote address") - self.connHandler(transp, Opt.some(observedAddr), Direction.In) + self.connHandler(transp, Opt.some(observedAddr), Opt.some(localAddr), Direction.In) method dial*( self: TcpTransport, @@ -320,14 +326,17 @@ method dial*( safeCloseWait(transp) raise newTransportClosedError() - let observedAddr = + let (observedAddr, localAddr) = try: - MultiAddress.init(transp.remoteAddress).expect("remote address is valid") + ( + MultiAddress.init(transp.remoteAddress).expect("remote address is valid"), + MultiAddress.init(transp.localAddress).expect("local address is valid"), + ) except TransportOsError as exc: safeCloseWait(transp) raise (ref TcpTransportError)(msg: "MultiAddress.init error in dial: " & exc.msg) - self.connHandler(transp, Opt.some(observedAddr), Direction.Out) + self.connHandler(transp, Opt.some(observedAddr), Opt.some(localAddr), Direction.Out) method handles*(t: TcpTransport, address: MultiAddress): bool {.raises: [].} = if procCall Transport(t).handles(address): diff --git a/libp2p/transports/tortransport.nim b/libp2p/transports/tortransport.nim index c586129fe..cd2b0e5dc 100644 --- a/libp2p/transports/tortransport.nim +++ b/libp2p/transports/tortransport.nim @@ -237,7 +237,9 @@ method dial*( try: transp = await connectToTorServer(self.transportAddress) await dialPeer(transp, address) - return self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out) + return self.tcpTransport.connHandler( + transp, Opt.none(MultiAddress), Opt.none(MultiAddress), Direction.Out + ) except CancelledError as e: safeCloseWait(transp) raise e diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index f57ef8aa3..9e49520ee 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -55,10 +55,16 @@ proc new*( session: WSSession, dir: Direction, observedAddr: Opt[MultiAddress], + localAddr: Opt[MultiAddress], timeout = 10.minutes, ): T = - let stream = - T(session: session, timeout: timeout, dir: dir, observedAddr: observedAddr) + let stream = T( + session: session, + timeout: timeout, + dir: dir, + observedAddr: observedAddr, + localAddr: localAddr, + ) stream.initStream() return stream @@ -239,9 +245,8 @@ proc connHandler( self: WsTransport, stream: WSSession, secure: bool, dir: Direction ): Future[Connection] {.async: (raises: [CatchableError]).} = ## Returning CatchableError is fine because we later handle different exceptions. - ## - let observedAddr = + let (observedAddr, localAddr) = try: let codec = @@ -250,15 +255,19 @@ proc connHandler( else: MultiAddress.init("/ws") remoteAddr = stream.stream.reader.tsource.remoteAddress + localAddr = stream.stream.reader.tsource.localAddress - MultiAddress.init(remoteAddr).tryGet() & codec.tryGet() + ( + MultiAddress.init(remoteAddr).tryGet() & codec.tryGet(), + MultiAddress.init(localAddr).tryGet() & codec.tryGet(), + ) except CatchableError as exc: - trace "Failed to create observedAddr", description = exc.msg + trace "Failed to create observedAddr or listenAddr", description = exc.msg if not (isNil(stream) and stream.stream.reader.closed): safeClose(stream) raise exc - let conn = WsStream.new(stream, dir, Opt.some(observedAddr)) + let conn = WsStream.new(stream, dir, Opt.some(observedAddr), Opt.some(localAddr)) self.connections[dir].add(conn) proc onClose() {.async: (raises: []).} = diff --git a/tests/commontransport.nim b/tests/commontransport.nim index 3aa1c0815..43eef8f78 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -32,6 +32,9 @@ template commonTransportTest*(prov: TransportBuilder, ma1: string, ma2: string = let conn = await transport1.accept() if conn.observedAddr.isSome(): check transport1.handles(conn.observedAddr.get()) + # skip IP check, only check transport and port + check conn.localAddr.get()[3] == transport1.addrs[0][3] + check conn.localAddr.get()[4] == transport1.addrs[0][4] await conn.close() let handlerWait = acceptHandler() diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index 55046f2a0..98f3ed0b7 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -17,7 +17,7 @@ import ../libp2p/[connmanager, stream/connection, crypto/crypto, muxers/muxer, p import helpers proc getMuxer(peerId: PeerId, dir: Direction = Direction.In): Muxer = - return Muxer(connection: Connection.new(peerId, dir, Opt.none(MultiAddress))) + return Muxer(connection: Connection.new(peerId, dir)) type TestMuxer = ref object of Muxer peerId: PeerId @@ -25,7 +25,7 @@ type TestMuxer = ref object of Muxer method newStream*( m: TestMuxer, name: string = "", lazy: bool = false ): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} = - Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress)) + Connection.new(m.peerId, Direction.Out) suite "Connection Manager": teardown: @@ -124,7 +124,7 @@ suite "Connection Manager": let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = new TestMuxer - let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress)) + let connection = Connection.new(peerId, Direction.In) muxer.peerId = peerId muxer.connection = connection @@ -144,7 +144,7 @@ suite "Connection Manager": let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = new TestMuxer - let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress)) + let connection = Connection.new(peerId, Direction.In) muxer.peerId = peerId muxer.connection = connection